Browse Source

Merge pull request #702 from chrislusf/add_topo_listener

Add volume id location change listener
pull/719/head
Chris Lu 7 years ago
committed by GitHub
parent
commit
452bd0b013
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      weed/filer2/filer.go
  2. 60
      weed/filer2/filer_master.go
  3. 8
      weed/operation/delete_content.go
  4. 167
      weed/pb/master_pb/seaweed.pb.go
  5. 13
      weed/pb/seaweed.proto
  6. 20
      weed/server/filer_grpc_server.go
  7. 5
      weed/server/filer_server_handlers_read.go
  8. 6
      weed/server/filer_server_handlers_write.go
  9. 108
      weed/server/master_grpc_server.go
  10. 6
      weed/server/master_server.go
  11. 11
      weed/topology/data_node.go
  12. 5
      weed/topology/topology.go
  13. 23
      weed/topology/topology_map.go
  14. 1
      weed/util/http_util.go
  15. 95
      weed/wdclient/masterclient.go
  16. 99
      weed/wdclient/vid_map.go
  17. 15
      weed/wdclient/wdclient.go

36
weed/filer2/filer.go

@ -1,30 +1,30 @@
package filer2 package filer2
import ( import (
"context"
"fmt" "fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/karlseguin/ccache" "github.com/karlseguin/ccache"
"os"
"path/filepath"
"strings"
"time"
) )
type Filer struct { type Filer struct {
masters []string
store FilerStore store FilerStore
directoryCache *ccache.Cache directoryCache *ccache.Cache
currentMaster string
MasterClient *wdclient.MasterClient
} }
func NewFiler(masters []string) *Filer { func NewFiler(masters []string) *Filer {
return &Filer{ return &Filer{
masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
} }
} }
@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() {
f.directoryCache = nil f.directoryCache = nil
} }
func (fs *Filer) GetMaster() string {
return fs.MasterClient.GetMaster()
}
func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster()
}
func (f *Filer) CreateEntry(entry *Entry) error { func (f *Filer) CreateEntry(entry *Entry) error {
dirParts := strings.Split(string(entry.FullPath), "/") dirParts := strings.Split(string(entry.FullPath), "/")
@ -198,9 +206,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks { for _, chunk := range chunks {
if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
f.DeleteFileByFileId(chunk.FileId)
}
}
func (f *Filer) DeleteFileByFileId(fileId string) {
fileUrlOnVolume, err := f.MasterClient.LookupFileId(fileId)
if err != nil {
glog.V(0).Infof("can not find file %s: %v", fileId, err)
} }
if err := operation.DeleteFromVolumeServer(fileUrlOnVolume, ""); err != nil {
glog.V(0).Infof("deleting file %s: %v", fileId, err)
} }
} }

60
weed/filer2/filer_master.go

@ -1,60 +0,0 @@
package filer2
import (
"context"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *Filer) GetMaster() string {
return fs.currentMaster
}
func (fs *Filer) KeepConnectedToMaster() {
glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
for _, master := range fs.masters {
glog.V(0).Infof("Connecting to %v", master)
withMasterClient(master, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
return err
}
glog.V(0).Infof("Connected to %v", master)
fs.currentMaster = master
for {
time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
if err = stream.Send(&master_pb.Empty{}); err != nil {
glog.V(0).Infof("failed to send to %s: %v", master, err)
return err
}
if _, err = stream.Recv(); err != nil {
glog.V(0).Infof("failed to receive from %s: %v", master, err)
return err
}
}
})
fs.currentMaster = ""
}
}
func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
grpcConnection, err := util.GrpcDial(master)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}
defer grpcConnection.Close()
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}

8
weed/operation/delete_content.go

@ -21,6 +21,14 @@ type DeleteResult struct {
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error {
err := util.Delete(fileUrlOnVolume, jwt)
if err != nil {
return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err)
}
return nil
}
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId) fileUrl, err := LookupFileId(master, fileId)
if err != nil { if err != nil {

167
weed/pb/master_pb/seaweed.pb.go

@ -14,6 +14,8 @@ It has these top-level messages:
VolumeInformationMessage VolumeInformationMessage
Empty Empty
SuperBlockExtra SuperBlockExtra
ClientListenRequest
VolumeLocation
*/ */
package master_pb package master_pb
@ -295,6 +297,62 @@ func (m *SuperBlockExtra_ErasureCoding) GetVolumeIds() []uint32 {
return nil return nil
} }
type ClientListenRequest struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
}
func (m *ClientListenRequest) Reset() { *m = ClientListenRequest{} }
func (m *ClientListenRequest) String() string { return proto.CompactTextString(m) }
func (*ClientListenRequest) ProtoMessage() {}
func (*ClientListenRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *ClientListenRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
type VolumeLocation struct {
Url string `protobuf:"bytes,1,opt,name=url" json:"url,omitempty"`
PublicUrl string `protobuf:"bytes,2,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
NewVids []uint32 `protobuf:"varint,3,rep,packed,name=new_vids,json=newVids" json:"new_vids,omitempty"`
DeletedVids []uint32 `protobuf:"varint,4,rep,packed,name=deleted_vids,json=deletedVids" json:"deleted_vids,omitempty"`
}
func (m *VolumeLocation) Reset() { *m = VolumeLocation{} }
func (m *VolumeLocation) String() string { return proto.CompactTextString(m) }
func (*VolumeLocation) ProtoMessage() {}
func (*VolumeLocation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *VolumeLocation) GetUrl() string {
if m != nil {
return m.Url
}
return ""
}
func (m *VolumeLocation) GetPublicUrl() string {
if m != nil {
return m.PublicUrl
}
return ""
}
func (m *VolumeLocation) GetNewVids() []uint32 {
if m != nil {
return m.NewVids
}
return nil
}
func (m *VolumeLocation) GetDeletedVids() []uint32 {
if m != nil {
return m.DeletedVids
}
return nil
}
func init() { func init() {
proto.RegisterType((*Heartbeat)(nil), "master_pb.Heartbeat") proto.RegisterType((*Heartbeat)(nil), "master_pb.Heartbeat")
proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse") proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse")
@ -302,6 +360,8 @@ func init() {
proto.RegisterType((*Empty)(nil), "master_pb.Empty") proto.RegisterType((*Empty)(nil), "master_pb.Empty")
proto.RegisterType((*SuperBlockExtra)(nil), "master_pb.SuperBlockExtra") proto.RegisterType((*SuperBlockExtra)(nil), "master_pb.SuperBlockExtra")
proto.RegisterType((*SuperBlockExtra_ErasureCoding)(nil), "master_pb.SuperBlockExtra.ErasureCoding") proto.RegisterType((*SuperBlockExtra_ErasureCoding)(nil), "master_pb.SuperBlockExtra.ErasureCoding")
proto.RegisterType((*ClientListenRequest)(nil), "master_pb.ClientListenRequest")
proto.RegisterType((*VolumeLocation)(nil), "master_pb.VolumeLocation")
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -368,8 +428,8 @@ func (c *seaweedClient) KeepConnected(ctx context.Context, opts ...grpc.CallOpti
} }
type Seaweed_KeepConnectedClient interface { type Seaweed_KeepConnectedClient interface {
Send(*Empty) error
Recv() (*Empty, error)
Send(*ClientListenRequest) error
Recv() (*VolumeLocation, error)
grpc.ClientStream grpc.ClientStream
} }
@ -377,12 +437,12 @@ type seaweedKeepConnectedClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *seaweedKeepConnectedClient) Send(m *Empty) error {
func (x *seaweedKeepConnectedClient) Send(m *ClientListenRequest) error {
return x.ClientStream.SendMsg(m) return x.ClientStream.SendMsg(m)
} }
func (x *seaweedKeepConnectedClient) Recv() (*Empty, error) {
m := new(Empty)
func (x *seaweedKeepConnectedClient) Recv() (*VolumeLocation, error) {
m := new(VolumeLocation)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
@ -431,8 +491,8 @@ func _Seaweed_KeepConnected_Handler(srv interface{}, stream grpc.ServerStream) e
} }
type Seaweed_KeepConnectedServer interface { type Seaweed_KeepConnectedServer interface {
Send(*Empty) error
Recv() (*Empty, error)
Send(*VolumeLocation) error
Recv() (*ClientListenRequest, error)
grpc.ServerStream grpc.ServerStream
} }
@ -440,12 +500,12 @@ type seaweedKeepConnectedServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *seaweedKeepConnectedServer) Send(m *Empty) error {
func (x *seaweedKeepConnectedServer) Send(m *VolumeLocation) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
func (x *seaweedKeepConnectedServer) Recv() (*Empty, error) {
m := new(Empty)
func (x *seaweedKeepConnectedServer) Recv() (*ClientListenRequest, error) {
m := new(ClientListenRequest)
if err := x.ServerStream.RecvMsg(m); err != nil { if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err return nil, err
} }
@ -476,45 +536,50 @@ var _Seaweed_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) } func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 627 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x94, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0x71, 0x92, 0x26, 0xf5, 0xa4, 0x6e, 0xd3, 0x15, 0x42, 0x56, 0x29, 0x10, 0xc2, 0xc5,
0x12, 0x28, 0x42, 0xe5, 0xc4, 0x81, 0x4b, 0xa3, 0x22, 0xaa, 0x82, 0x5a, 0x39, 0x82, 0x03, 0x17,
0x6b, 0xe3, 0x9d, 0x56, 0xab, 0xae, 0xff, 0x68, 0x77, 0x53, 0xe2, 0xbe, 0x04, 0x4f, 0xc2, 0x2b,
0x70, 0xe2, 0xc1, 0xd0, 0x8e, 0xed, 0x34, 0x14, 0xb8, 0xcd, 0xfc, 0x66, 0xc6, 0x3b, 0xf9, 0xbe,
0xdd, 0x40, 0x60, 0x90, 0x7f, 0x43, 0x14, 0xd3, 0x52, 0x17, 0xb6, 0x60, 0x7e, 0xc6, 0x8d, 0x45,
0x9d, 0x94, 0x8b, 0xc9, 0x8f, 0x0e, 0xf8, 0x1f, 0x90, 0x6b, 0xbb, 0x40, 0x6e, 0xd9, 0x2e, 0x74,
0x64, 0x19, 0x7a, 0x63, 0x2f, 0xf2, 0xe3, 0x8e, 0x2c, 0x19, 0x83, 0x5e, 0x59, 0x68, 0x1b, 0x76,
0xc6, 0x5e, 0x14, 0xc4, 0x14, 0xb3, 0x27, 0x00, 0xe5, 0x72, 0xa1, 0x64, 0x9a, 0x2c, 0xb5, 0x0a,
0xbb, 0xd4, 0xeb, 0xd7, 0xe4, 0xb3, 0x56, 0x2c, 0x82, 0x51, 0xc6, 0x57, 0xc9, 0x4d, 0xa1, 0x96,
0x19, 0x26, 0x69, 0xb1, 0xcc, 0x6d, 0xd8, 0xa3, 0xf1, 0xdd, 0x8c, 0xaf, 0xbe, 0x10, 0x9e, 0x39,
0xca, 0xc6, 0xb0, 0xe3, 0x3a, 0x2f, 0xa5, 0xc2, 0xe4, 0x1a, 0xab, 0x70, 0x6b, 0xec, 0x45, 0xbd,
0x18, 0x32, 0xbe, 0x7a, 0x2f, 0x15, 0x9e, 0x61, 0xc5, 0x9e, 0xc1, 0x50, 0x70, 0xcb, 0x93, 0x14,
0x73, 0x8b, 0x3a, 0xec, 0xd3, 0x59, 0xe0, 0xd0, 0x8c, 0x88, 0xdb, 0x4f, 0xf3, 0xf4, 0x3a, 0x1c,
0x50, 0x85, 0x62, 0xb7, 0x1f, 0x17, 0x99, 0xcc, 0x13, 0xda, 0x7c, 0x9b, 0x8e, 0xf6, 0x89, 0x5c,
0xb8, 0xf5, 0xdf, 0xc1, 0xa0, 0xde, 0xcd, 0x84, 0xfe, 0xb8, 0x1b, 0x0d, 0x8f, 0x5e, 0x4c, 0xd7,
0x6a, 0x4c, 0xeb, 0xf5, 0x4e, 0xf3, 0xcb, 0x42, 0x67, 0xdc, 0xca, 0x22, 0xff, 0x84, 0xc6, 0xf0,
0x2b, 0x8c, 0xdb, 0x99, 0x89, 0x81, 0xfd, 0xb5, 0x5c, 0x31, 0x9a, 0xb2, 0xc8, 0x0d, 0xb2, 0x08,
0xf6, 0xea, 0xfa, 0x5c, 0xde, 0xe2, 0x47, 0x99, 0x49, 0x4b, 0x1a, 0xf6, 0xe2, 0xfb, 0x98, 0x1d,
0x82, 0x6f, 0x30, 0xd5, 0x68, 0xcf, 0xb0, 0x22, 0x55, 0xfd, 0xf8, 0x0e, 0xb0, 0x47, 0xd0, 0x57,
0xc8, 0x05, 0xea, 0x46, 0xd6, 0x26, 0x9b, 0xfc, 0xea, 0x40, 0xf8, 0xbf, 0xd5, 0xc8, 0x33, 0x41,
0xe7, 0x05, 0x71, 0x47, 0x0a, 0xa7, 0x89, 0x91, 0xb7, 0x48, 0x5f, 0xef, 0xc5, 0x14, 0xb3, 0xa7,
0x00, 0x69, 0xa1, 0x14, 0xa6, 0x6e, 0xb0, 0xf9, 0xf8, 0x06, 0x71, 0x9a, 0x91, 0x0d, 0x77, 0x76,
0xf5, 0x62, 0xdf, 0x91, 0xda, 0xa9, 0xe7, 0xb0, 0x23, 0x50, 0xa1, 0x6d, 0x1b, 0x6a, 0xa7, 0x86,
0x35, 0xab, 0x5b, 0x5e, 0x01, 0xab, 0x53, 0x91, 0x2c, 0xaa, 0x75, 0x63, 0x9f, 0x1a, 0x47, 0x4d,
0xe5, 0xb8, 0x6a, 0xbb, 0x1f, 0x83, 0xaf, 0x91, 0x8b, 0xa4, 0xc8, 0x55, 0x45, 0xe6, 0x6d, 0xc7,
0xdb, 0x0e, 0x9c, 0xe7, 0xaa, 0x62, 0x2f, 0x61, 0x5f, 0x63, 0xa9, 0x64, 0xca, 0x93, 0x52, 0xf1,
0x14, 0x33, 0xcc, 0x5b, 0x1f, 0x47, 0x4d, 0xe1, 0xa2, 0xe5, 0x2c, 0x84, 0xc1, 0x0d, 0x6a, 0xe3,
0x7e, 0x96, 0x4f, 0x2d, 0x6d, 0xca, 0x46, 0xd0, 0xb5, 0x56, 0x85, 0x40, 0xd4, 0x85, 0x93, 0x01,
0x6c, 0x9d, 0x64, 0xa5, 0xad, 0x26, 0x3f, 0x3d, 0xd8, 0x9b, 0x2f, 0x4b, 0xd4, 0xc7, 0xaa, 0x48,
0xaf, 0x4f, 0x56, 0x56, 0x73, 0x76, 0x0e, 0xbb, 0xa8, 0xb9, 0x59, 0x6a, 0xb7, 0xbb, 0x90, 0xf9,
0x15, 0x49, 0x3a, 0x3c, 0x8a, 0x36, 0xae, 0xc7, 0xbd, 0x99, 0xe9, 0x49, 0x3d, 0x30, 0xa3, 0xfe,
0x38, 0xc0, 0xcd, 0xf4, 0xe0, 0x2b, 0x04, 0x7f, 0xd4, 0x9d, 0x31, 0xee, 0xea, 0x36, 0x56, 0x51,
0xec, 0x1c, 0x2f, 0xb9, 0x96, 0xb6, 0x6a, 0x9e, 0x58, 0x93, 0x39, 0x43, 0x9a, 0x17, 0x24, 0x85,
0x09, 0xbb, 0xe3, 0xae, 0xbb, 0xc4, 0x35, 0x39, 0x15, 0xe6, 0xe8, 0xbb, 0x07, 0x83, 0x79, 0xfd,
0xa4, 0xd9, 0x29, 0x04, 0x73, 0xcc, 0xc5, 0xdd, 0x23, 0x7e, 0xb8, 0xb1, 0xf1, 0x9a, 0x1e, 0x1c,
0xfe, 0x8b, 0xb6, 0x37, 0x78, 0xf2, 0x20, 0xf2, 0x5e, 0x7b, 0xec, 0x2d, 0x04, 0x67, 0x88, 0xe5,
0xac, 0xc8, 0x73, 0x4c, 0x2d, 0x0a, 0x36, 0xda, 0x18, 0x22, 0xe9, 0x0e, 0xfe, 0x22, 0xf5, 0xe8,
0xa2, 0x4f, 0xff, 0x2c, 0x6f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0x73, 0x2b, 0x68, 0x15, 0x6a,
0x04, 0x00, 0x00,
// 708 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x54, 0x41, 0x6f, 0xf3, 0x44,
0x10, 0xc5, 0x49, 0xbe, 0x24, 0x9e, 0x7c, 0xc9, 0x97, 0x2e, 0x08, 0xb9, 0xa5, 0x94, 0x60, 0x2e,
0x46, 0xa0, 0x08, 0x95, 0x33, 0x97, 0x46, 0x45, 0x54, 0x2d, 0x6a, 0xe5, 0x88, 0x1e, 0xb8, 0x58,
0x1b, 0xef, 0xb4, 0x5a, 0x75, 0xbd, 0x36, 0xbb, 0x9b, 0x36, 0xee, 0x85, 0x7f, 0xc3, 0x8d, 0xbf,
0xc0, 0x89, 0x1f, 0x86, 0x76, 0xd7, 0x4e, 0x42, 0x28, 0xb7, 0xd9, 0x37, 0x6f, 0xbc, 0xe3, 0xf7,
0x66, 0x16, 0xc6, 0x1a, 0xe9, 0x0b, 0x22, 0x9b, 0x57, 0xaa, 0x34, 0x25, 0x09, 0x0b, 0xaa, 0x0d,
0xaa, 0xac, 0x5a, 0xc5, 0x7f, 0x76, 0x20, 0xfc, 0x09, 0xa9, 0x32, 0x2b, 0xa4, 0x86, 0x4c, 0xa0,
0xc3, 0xab, 0x28, 0x98, 0x05, 0x49, 0x98, 0x76, 0x78, 0x45, 0x08, 0xf4, 0xaa, 0x52, 0x99, 0xa8,
0x33, 0x0b, 0x92, 0x71, 0xea, 0x62, 0xf2, 0x39, 0x40, 0xb5, 0x5e, 0x09, 0x9e, 0x67, 0x6b, 0x25,
0xa2, 0xae, 0xe3, 0x86, 0x1e, 0xf9, 0x45, 0x09, 0x92, 0xc0, 0xb4, 0xa0, 0x9b, 0xec, 0xb9, 0x14,
0xeb, 0x02, 0xb3, 0xbc, 0x5c, 0x4b, 0x13, 0xf5, 0x5c, 0xf9, 0xa4, 0xa0, 0x9b, 0x7b, 0x07, 0x2f,
0x2c, 0x4a, 0x66, 0xf0, 0xde, 0x32, 0x1f, 0xb8, 0xc0, 0xec, 0x09, 0xeb, 0xe8, 0xdd, 0x2c, 0x48,
0x7a, 0x29, 0x14, 0x74, 0xf3, 0x23, 0x17, 0x78, 0x8d, 0x35, 0xf9, 0x02, 0x46, 0x8c, 0x1a, 0x9a,
0xe5, 0x28, 0x0d, 0xaa, 0xa8, 0xef, 0xee, 0x02, 0x0b, 0x2d, 0x1c, 0x62, 0xfb, 0x53, 0x34, 0x7f,
0x8a, 0x06, 0x2e, 0xe3, 0x62, 0xdb, 0x1f, 0x65, 0x05, 0x97, 0x99, 0xeb, 0x7c, 0xe8, 0xae, 0x0e,
0x1d, 0x72, 0x67, 0xdb, 0xff, 0x01, 0x06, 0xbe, 0x37, 0x1d, 0x85, 0xb3, 0x6e, 0x32, 0x3a, 0xff,
0x6a, 0xbe, 0x55, 0x63, 0xee, 0xdb, 0xbb, 0x92, 0x0f, 0xa5, 0x2a, 0xa8, 0xe1, 0xa5, 0xfc, 0x19,
0xb5, 0xa6, 0x8f, 0x98, 0xb6, 0x35, 0xb1, 0x86, 0xa3, 0xad, 0x5c, 0x29, 0xea, 0xaa, 0x94, 0x1a,
0x49, 0x02, 0x1f, 0x7c, 0x7e, 0xc9, 0x5f, 0xf1, 0x86, 0x17, 0xdc, 0x38, 0x0d, 0x7b, 0xe9, 0x21,
0x4c, 0x4e, 0x21, 0xd4, 0x98, 0x2b, 0x34, 0xd7, 0x58, 0x3b, 0x55, 0xc3, 0x74, 0x07, 0x90, 0x4f,
0xa1, 0x2f, 0x90, 0x32, 0x54, 0x8d, 0xac, 0xcd, 0x29, 0xfe, 0xbb, 0x03, 0xd1, 0xff, 0xb5, 0xe6,
0x3c, 0x63, 0xee, 0xbe, 0x71, 0xda, 0xe1, 0xcc, 0x6a, 0xa2, 0xf9, 0x2b, 0xba, 0xaf, 0xf7, 0x52,
0x17, 0x93, 0x33, 0x80, 0xbc, 0x14, 0x02, 0x73, 0x5b, 0xd8, 0x7c, 0x7c, 0x0f, 0xb1, 0x9a, 0x39,
0x1b, 0x76, 0x76, 0xf5, 0xd2, 0xd0, 0x22, 0xde, 0xa9, 0x2f, 0xe1, 0x3d, 0x43, 0x81, 0xa6, 0x25,
0x78, 0xa7, 0x46, 0x1e, 0xf3, 0x94, 0x6f, 0x81, 0xf8, 0x23, 0xcb, 0x56, 0xf5, 0x96, 0xd8, 0x77,
0xc4, 0x69, 0x93, 0xb9, 0xa8, 0x5b, 0xf6, 0x67, 0x10, 0x2a, 0xa4, 0x2c, 0x2b, 0xa5, 0xa8, 0x9d,
0x79, 0xc3, 0x74, 0x68, 0x81, 0x5b, 0x29, 0x6a, 0xf2, 0x0d, 0x1c, 0x29, 0xac, 0x04, 0xcf, 0x69,
0x56, 0x09, 0x9a, 0x63, 0x81, 0xb2, 0xf5, 0x71, 0xda, 0x24, 0xee, 0x5a, 0x9c, 0x44, 0x30, 0x78,
0x46, 0xa5, 0xed, 0x6f, 0x85, 0x8e, 0xd2, 0x1e, 0xc9, 0x14, 0xba, 0xc6, 0x88, 0x08, 0x1c, 0x6a,
0xc3, 0x78, 0x00, 0xef, 0x2e, 0x8b, 0xca, 0xd4, 0xf1, 0x5f, 0x01, 0x7c, 0x58, 0xae, 0x2b, 0x54,
0x17, 0xa2, 0xcc, 0x9f, 0x2e, 0x37, 0x46, 0x51, 0x72, 0x0b, 0x13, 0x54, 0x54, 0xaf, 0x95, 0xed,
0x9d, 0x71, 0xf9, 0xe8, 0x24, 0x1d, 0x9d, 0x27, 0x7b, 0xe3, 0x71, 0x50, 0x33, 0xbf, 0xf4, 0x05,
0x0b, 0xc7, 0x4f, 0xc7, 0xb8, 0x7f, 0x3c, 0xf9, 0x15, 0xc6, 0xff, 0xca, 0x5b, 0x63, 0xec, 0xe8,
0x36, 0x56, 0xb9, 0xd8, 0x3a, 0x5e, 0x51, 0xc5, 0x4d, 0xdd, 0xac, 0x58, 0x73, 0xb2, 0x86, 0x34,
0x1b, 0xc4, 0x99, 0x8e, 0xba, 0xb3, 0xae, 0x1d, 0x62, 0x8f, 0x5c, 0x31, 0x1d, 0x7f, 0x0d, 0x1f,
0x2f, 0x04, 0x47, 0x69, 0x6e, 0xb8, 0x36, 0x28, 0x53, 0xfc, 0x6d, 0x8d, 0xda, 0xd8, 0x1b, 0x24,
0x2d, 0xb0, 0x59, 0x60, 0x17, 0xc7, 0xbf, 0xc3, 0xc4, 0x8f, 0xce, 0x4d, 0x99, 0xbb, 0xb9, 0xb1,
0xc2, 0xd8, 0xcd, 0xf5, 0x24, 0x1b, 0x1e, 0xac, 0x74, 0xe7, 0x70, 0xa5, 0x8f, 0x61, 0x28, 0xf1,
0x25, 0x7b, 0xde, 0xb5, 0x32, 0x90, 0xf8, 0x72, 0xcf, 0x99, 0xde, 0x4d, 0x06, 0xf3, 0xe9, 0x9e,
0x4b, 0x37, 0x93, 0xc1, 0x2c, 0xe5, 0xfc, 0x8f, 0x00, 0x06, 0x4b, 0xff, 0xfc, 0x90, 0x2b, 0x18,
0x2f, 0x51, 0xb2, 0xdd, 0x83, 0xf3, 0xc9, 0x9e, 0xba, 0x5b, 0xf4, 0xe4, 0xf4, 0x2d, 0xb4, 0xdd,
0xb6, 0xf8, 0xa3, 0x24, 0xf8, 0x2e, 0x20, 0x77, 0x30, 0xbe, 0x46, 0xac, 0x16, 0xa5, 0x94, 0x98,
0x1b, 0x64, 0xe4, 0x6c, 0xaf, 0xe8, 0x0d, 0x71, 0x4e, 0x8e, 0xff, 0xb3, 0xe7, 0xad, 0x22, 0xfe,
0x8b, 0xab, 0xbe, 0x7b, 0x1c, 0xbf, 0xff, 0x27, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x83, 0xd9, 0x6b,
0x2d, 0x05, 0x00, 0x00,
} }

13
weed/pb/seaweed.proto

@ -7,7 +7,7 @@ package master_pb;
service Seaweed { service Seaweed {
rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) { rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
} }
rpc KeepConnected (stream Empty) returns (stream Empty) {
rpc KeepConnected (stream ClientListenRequest) returns (stream VolumeLocation) {
} }
} }
@ -55,3 +55,14 @@ message SuperBlockExtra {
} }
ErasureCoding erasure_coding = 1; ErasureCoding erasure_coding = 1;
} }
message ClientListenRequest {
string name = 1;
}
message VolumeLocation {
string url = 1;
string public_url = 2;
repeated uint32 new_vids = 3;
repeated uint32 deleted_vids = 4;
}

20
weed/server/filer_grpc_server.go

@ -99,24 +99,24 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
if err != nil {
return nil, err
}
resp := &filer_pb.LookupVolumeResponse{ resp := &filer_pb.LookupVolumeResponse{
LocationsMap: make(map[string]*filer_pb.Locations), LocationsMap: make(map[string]*filer_pb.Locations),
} }
for vid, locations := range lookupResult {
for _, vidString := range req.VolumeIds {
vid, err := strconv.Atoi(vidString)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
return nil, err
}
var locs []*filer_pb.Location var locs []*filer_pb.Location
for _, loc := range locations.Locations {
for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
locs = append(locs, &filer_pb.Location{ locs = append(locs, &filer_pb.Location{
Url: loc.Url, Url: loc.Url,
PublicUrl: loc.PublicUrl, PublicUrl: loc.PublicUrl,
}) })
} }
resp.LocationsMap[vid] = &filer_pb.Locations{
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: locs, Locations: locs,
} }
} }
@ -176,11 +176,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
if err = fs.filer.UpdateEntry(newEntry); err == nil { if err = fs.filer.UpdateEntry(newEntry); err == nil {
for _, garbage := range unusedChunks { for _, garbage := range unusedChunks {
glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
fs.filer.DeleteFileByFileId(garbage.FileId)
} }
for _, garbage := range garbages { for _, garbage := range garbages {
glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
fs.filer.DeleteFileByFileId(garbage.FileId)
} }
} }

5
weed/server/filer_server_handlers_read.go

@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"mime" "mime"
"mime/multipart" "mime/multipart"
@ -63,7 +62,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
fileId := entry.Chunks[0].FileId fileId := entry.Chunks[0].FileId
urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId)
urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -223,7 +222,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int
for _, chunkView := range chunkViews { for _, chunkView := range chunkViews {
urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId)
urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err

6
weed/server/filer_server_handlers_write.go

@ -49,7 +49,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} else { } else {
fileId = entry.Chunks[0].FileId fileId = entry.Chunks[0].FileId
urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId)
urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" { if ret.Name != "" {
path += ret.Name path += ret.Name
} else { } else {
operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
fs.filer.DeleteFileByFileId(fileId)
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError, writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name")) errors.New("Can not to write to folder "+path+" without a file name"))
@ -205,7 +205,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}}, }},
} }
if db_err := fs.filer.CreateEntry(entry); db_err != nil { if db_err := fs.filer.CreateEntry(entry); db_err != nil {
operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
fs.filer.DeleteFileByFileId(fileId)
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
return return

108
weed/server/master_grpc_server.go

@ -1,9 +1,11 @@
package weed_server package weed_server
import ( import (
"fmt"
"net" "net"
"strings" "strings"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
@ -16,8 +18,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
defer func() { defer func() {
if dn != nil { if dn != nil {
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
t.UnRegisterDataNode(dn) t.UnRegisterDataNode(dn)
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
for _, v := range dn.GetVolumes() {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
if len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for _, ch := range ms.clientChans {
ch <- message
}
ms.clientChansLock.RUnlock()
}
} }
}() }()
@ -49,7 +69,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
} }
t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
for _, v := range newVolumes {
message.NewVids = append(message.NewVids, uint32(v.Id))
}
for _, v := range deletedVolumes {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for _, ch := range ms.clientChans {
ch <- message
}
ms.clientChansLock.RUnlock()
}
} else { } else {
return err return err
@ -67,15 +106,76 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
} }
// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
// KeepConnected keep a stream gRPC call to the master. Used by clients to know the master is up.
// And clients gets the up-to-date list of volume locations
func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error { func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
if !ms.Topo.IsLeader() {
return raft.NotLeaderError
}
// remember client address
ctx := stream.Context()
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
glog.Error("failed to get peer from ctx")
return fmt.Errorf("failed to get peer from ctx")
}
if pr.Addr == net.Addr(nil) {
glog.Error("failed to get peer address")
return fmt.Errorf("failed to get peer address")
}
clientName := req.Name + pr.Addr.String()
glog.V(0).Infof("+ client %v", clientName)
messageChan := make(chan *master_pb.VolumeLocation)
stopChan := make(chan bool)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
ms.clientChansLock.Unlock()
defer func() {
glog.V(0).Infof("- client %v", clientName)
ms.clientChansLock.Lock()
delete(ms.clientChans, clientName)
ms.clientChansLock.Unlock()
}()
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
return err
}
}
go func() {
for { for {
_, err := stream.Recv() _, err := stream.Recv()
if err != nil { if err != nil {
return err
glog.V(2).Infof("- client %v: %v", clientName, err)
stopChan <- true
break
}
} }
if err := stream.Send(&master_pb.Empty{}); err != nil {
}()
for {
select {
case message := <-messageChan:
if err := stream.Send(message); err != nil {
return err return err
} }
case <-stopChan:
return nil
} }
} }
return nil
}

6
weed/server/master_server.go

@ -9,6 +9,7 @@ import (
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
@ -31,6 +32,10 @@ type MasterServer struct {
vgLock sync.Mutex vgLock sync.Mutex
bounedLeaderChan chan int bounedLeaderChan chan int
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
} }
func NewMasterServer(r *mux.Router, port int, metaFolder string, func NewMasterServer(r *mux.Router, port int, metaFolder string,
@ -54,6 +59,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement, defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold, garbageThreshold: garbageThreshold,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer() seq := sequence.NewMemorySequencer()

11
weed/topology/data_node.go

@ -32,7 +32,7 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
} }
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
dn.Lock() dn.Lock()
defer dn.Unlock() defer dn.Unlock()
if _, ok := dn.volumes[v.Id]; !ok { if _, ok := dn.volumes[v.Id]; !ok {
@ -42,12 +42,14 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustActiveVolumeCountDelta(1)
} }
dn.UpAdjustMaxVolumeId(v.Id) dn.UpAdjustMaxVolumeId(v.Id)
isNew = true
} else { } else {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
} }
return
} }
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
@ -64,7 +66,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
} }
dn.Unlock() dn.Unlock()
for _, v := range actualVolumes { for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
isNew := dn.AddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
} }
return return
} }

5
weed/topology/topology.go

@ -151,7 +151,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
return dc return dc
} }
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) {
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
var volumeInfos []storage.VolumeInfo var volumeInfos []storage.VolumeInfo
for _, v := range volumes { for _, v := range volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil { if vi, err := storage.NewVolumeInfo(v); err == nil {
@ -160,11 +160,12 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
glog.V(0).Infof("Fail to convert joined volume information: %v", err) glog.V(0).Infof("Fail to convert joined volume information: %v", err)
} }
} }
deletedVolumes := dn.UpdateVolumes(volumeInfos)
newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos { for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn) t.RegisterVolumeLayout(v, dn)
} }
for _, v := range deletedVolumes { for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn) t.UnRegisterVolumeLayout(v, dn)
} }
return
} }

23
weed/topology/topology_map.go

@ -1,5 +1,7 @@
package topology package topology
import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
func (t *Topology) ToMap() interface{} { func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount() m["Max"] = t.GetMaxVolumeCount()
@ -51,3 +53,24 @@ func (t *Topology) ToVolumeMap() interface{} {
m["DataCenters"] = dcs m["DataCenters"] = dcs
return m return m
} }
func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocation) {
for _, c := range t.Children() {
dc := c.(*DataCenter)
for _, r := range dc.Children() {
rack := r.(*Rack)
for _, d := range rack.Children() {
dn := d.(*DataNode)
volumeLocation := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
for _, v := range dn.GetVolumes() {
volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id))
}
volumeLocations = append(volumeLocations, volumeLocation)
}
}
}
return
}

1
weed/util/http_util.go

@ -83,6 +83,7 @@ func Head(url string) (http.Header, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer r.Body.Close()
if r.StatusCode >= 400 { if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status) return nil, fmt.Errorf("%s: %s", url, r.Status)
} }

95
weed/wdclient/masterclient.go

@ -0,0 +1,95 @@
package wdclient
import (
"context"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
type MasterClient struct {
ctx context.Context
name string
currentMaster string
masters []string
vidMap
}
func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
return &MasterClient{
ctx: ctx,
name: clientName,
masters: masters,
vidMap: newVidMap(),
}
}
func (mc *MasterClient) GetMaster() string {
return mc.currentMaster
}
func (mc *MasterClient) KeepConnectedToMaster() {
glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
for {
mc.tryAllMasters()
time.Sleep(time.Second)
}
}
func (mc *MasterClient) tryAllMasters() {
for _, master := range mc.masters {
glog.V(0).Infof("Connecting to %v", master)
withMasterClient(master, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
return err
}
glog.V(0).Infof("Connected to %v", master)
mc.currentMaster = master
if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
glog.V(0).Infof("failed to send to %s: %v", master, err)
return err
}
for {
if volumeLocation, err := stream.Recv(); err != nil {
glog.V(0).Infof("failed to receive from %s: %v", master, err)
return err
} else {
glog.V(0).Infof("volume location: %+v", volumeLocation)
loc := Location{
Url: volumeLocation.Url,
PublicUrl: volumeLocation.PublicUrl,
}
for _, newVid := range volumeLocation.NewVids {
mc.addLocation(newVid, loc)
}
for _, deletedVid := range volumeLocation.DeletedVids {
mc.deleteLocation(deletedVid, loc)
}
}
}
})
mc.currentMaster = ""
}
}
func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
grpcConnection, err := util.GrpcDial(master)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}
defer grpcConnection.Close()
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}

99
weed/wdclient/vid_map.go

@ -0,0 +1,99 @@
package wdclient
import (
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
)
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
}
type vidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
}
func newVidMap() vidMap {
return vidMap{
vid2Locations: make(map[uint32][]Location),
}
}
func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
return "", err
}
locations := vc.GetLocations(uint32(id))
if len(locations) == 0 {
return "", fmt.Errorf("volume %d not found", id)
}
return locations[rand.Intn(len(locations))].Url, nil
}
func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
if lookupError != nil {
return "", lookupError
}
return "http://" + serverUrl + "/" + fileId, nil
}
func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
vc.RLock()
defer vc.RUnlock()
return vc.vid2Locations[vid]
}
func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
locations, found := vc.vid2Locations[vid]
if !found {
vc.vid2Locations[vid] = []Location{location}
return
}
for _, loc := range locations {
if loc.Url == location.Url {
return
}
}
vc.vid2Locations[vid] = append(locations, location)
}
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
locations, found := vc.vid2Locations[vid]
if !found {
return
}
for i, loc := range locations {
if loc.Url == location.Url {
vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
}
}
}

15
weed/wdclient/wdclient.go

@ -0,0 +1,15 @@
package wdclient
import (
"context"
)
type SeaweedClient struct {
*MasterClient
}
func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
return &SeaweedClient{
MasterClient: NewMasterClient(ctx, clientName, masters),
}
}
Loading…
Cancel
Save