diff --git a/Makefile b/Makefile index 14c07fa86..bbca92151 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ SOURCE_DIR = ./go/weed/ all: build -.PHONY : clean deps build linux +.PHONY : clean deps build linux vet clean: go clean -i $(GO_FLAGS) $(SOURCE_DIR) @@ -17,6 +17,9 @@ deps: fmt: gofmt -w -s ./go/ +vet: + go vet ./go/... + build: deps fmt go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR) diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go deleted file mode 100644 index bfc53aa50..000000000 --- a/go/operation/data_struts.go +++ /dev/null @@ -1,7 +0,0 @@ -package operation - -type JoinResult struct { - VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` - SecretKey string `json:"secretKey,omitempty"` - Error string `json:"error,omitempty"` -} diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go deleted file mode 100644 index 2574b2af6..000000000 --- a/go/operation/system_message.pb.go +++ /dev/null @@ -1,278 +0,0 @@ -// Code generated by protoc-gen-go. -// source: system_message.proto -// DO NOT EDIT! - -/* -Package operation is a generated protocol buffer package. - -It is generated from these files: - system_message.proto - -It has these top-level messages: - VolumeInformationMessage - JoinMessage - CollectionSetting - GlobalSetting - JoinResponse -*/ -package operation - -import proto "github.com/golang/protobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type VolumeInformationMessage struct { - Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` - Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` - Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` - FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` - DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` - DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` - ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` - ReplicaPlacement *uint32 `protobuf:"varint,8,opt,name=replica_placement" json:"replica_placement,omitempty"` - Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` - Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } -func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } -func (*VolumeInformationMessage) ProtoMessage() {} - -const Default_VolumeInformationMessage_Version uint32 = 2 - -func (m *VolumeInformationMessage) GetId() uint32 { - if m != nil && m.Id != nil { - return *m.Id - } - return 0 -} - -func (m *VolumeInformationMessage) GetSize() uint64 { - if m != nil && m.Size != nil { - return *m.Size - } - return 0 -} - -func (m *VolumeInformationMessage) GetCollection() string { - if m != nil && m.Collection != nil { - return *m.Collection - } - return "" -} - -func (m *VolumeInformationMessage) GetFileCount() uint64 { - if m != nil && m.FileCount != nil { - return *m.FileCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeleteCount() uint64 { - if m != nil && m.DeleteCount != nil { - return *m.DeleteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { - if m != nil && m.DeletedByteCount != nil { - return *m.DeletedByteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetReadOnly() bool { - if m != nil && m.ReadOnly != nil { - return *m.ReadOnly - } - return false -} - -func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { - if m != nil && m.ReplicaPlacement != nil { - return *m.ReplicaPlacement - } - return 0 -} - -func (m *VolumeInformationMessage) GetVersion() uint32 { - if m != nil && m.Version != nil { - return *m.Version - } - return Default_VolumeInformationMessage_Version -} - -func (m *VolumeInformationMessage) GetTtl() uint32 { - if m != nil && m.Ttl != nil { - return *m.Ttl - } - return 0 -} - -type JoinMessage struct { - IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` - Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` - Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` - PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` - MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` - MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` - DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` - Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` - Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` - AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *JoinMessage) Reset() { *m = JoinMessage{} } -func (m *JoinMessage) String() string { return proto.CompactTextString(m) } -func (*JoinMessage) ProtoMessage() {} - -func (m *JoinMessage) GetIsInit() bool { - if m != nil && m.IsInit != nil { - return *m.IsInit - } - return false -} - -func (m *JoinMessage) GetIp() string { - if m != nil && m.Ip != nil { - return *m.Ip - } - return "" -} - -func (m *JoinMessage) GetPort() uint32 { - if m != nil && m.Port != nil { - return *m.Port - } - return 0 -} - -func (m *JoinMessage) GetPublicUrl() string { - if m != nil && m.PublicUrl != nil { - return *m.PublicUrl - } - return "" -} - -func (m *JoinMessage) GetMaxVolumeCount() uint32 { - if m != nil && m.MaxVolumeCount != nil { - return *m.MaxVolumeCount - } - return 0 -} - -func (m *JoinMessage) GetMaxFileKey() uint64 { - if m != nil && m.MaxFileKey != nil { - return *m.MaxFileKey - } - return 0 -} - -func (m *JoinMessage) GetDataCenter() string { - if m != nil && m.DataCenter != nil { - return *m.DataCenter - } - return "" -} - -func (m *JoinMessage) GetRack() string { - if m != nil && m.Rack != nil { - return *m.Rack - } - return "" -} - -func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { - if m != nil { - return m.Volumes - } - return nil -} - -func (m *JoinMessage) GetAdminPort() uint32 { - if m != nil && m.AdminPort != nil { - return *m.AdminPort - } - return 0 -} - -type CollectionSetting struct { - Collection *string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"` - ReplicaPlacement *string `protobuf:"bytes,2,opt,name=replica_placement" json:"replica_placement,omitempty"` - VacuumGarbageThreshold *float32 `protobuf:"fixed32,3,opt,name=vacuum_garbage_threshold" json:"vacuum_garbage_threshold,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *CollectionSetting) Reset() { *m = CollectionSetting{} } -func (m *CollectionSetting) String() string { return proto.CompactTextString(m) } -func (*CollectionSetting) ProtoMessage() {} - -func (m *CollectionSetting) GetCollection() string { - if m != nil && m.Collection != nil { - return *m.Collection - } - return "" -} - -func (m *CollectionSetting) GetReplicaPlacement() string { - if m != nil && m.ReplicaPlacement != nil { - return *m.ReplicaPlacement - } - return "" -} - -func (m *CollectionSetting) GetVacuumGarbageThreshold() float32 { - if m != nil && m.VacuumGarbageThreshold != nil { - return *m.VacuumGarbageThreshold - } - return 0 -} - -type GlobalSetting struct { - Settings []*CollectionSetting `protobuf:"bytes,1,rep,name=settings" json:"settings,omitempty"` - MasterPeers []string `protobuf:"bytes,2,rep,name=master_peers" json:"master_peers,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *GlobalSetting) Reset() { *m = GlobalSetting{} } -func (m *GlobalSetting) String() string { return proto.CompactTextString(m) } -func (*GlobalSetting) ProtoMessage() {} - -func (m *GlobalSetting) GetSettings() []*CollectionSetting { - if m != nil { - return m.Settings - } - return nil -} - -func (m *GlobalSetting) GetMasterPeers() []string { - if m != nil { - return m.MasterPeers - } - return nil -} - -type JoinResponse struct { - Settings *GlobalSetting `protobuf:"bytes,1,opt,name=settings" json:"settings,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *JoinResponse) Reset() { *m = JoinResponse{} } -func (m *JoinResponse) String() string { return proto.CompactTextString(m) } -func (*JoinResponse) ProtoMessage() {} - -func (m *JoinResponse) GetSettings() *GlobalSetting { - if m != nil { - return m.Settings - } - return nil -} - -func init() { -} diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto deleted file mode 100644 index 30dd2a22c..000000000 --- a/go/proto/system_message.proto +++ /dev/null @@ -1,45 +0,0 @@ -package operation; - -message VolumeInformationMessage { - required uint32 id = 1; - required uint64 size = 2; - optional string collection = 3; - required uint64 file_count = 4; - required uint64 delete_count = 5; - required uint64 deleted_byte_count = 6; - optional bool read_only = 7; - optional uint32 replica_placement = 8; - optional uint32 version = 9 [default=2]; - optional uint32 ttl = 10; -} - -message JoinMessage { - optional bool is_init = 1; - required string ip = 2; - required uint32 port = 3; - optional string public_url = 4; - required uint32 max_volume_count = 5; - required uint64 max_file_key = 6; - optional string data_center = 7; - optional string rack = 8; - repeated VolumeInformationMessage volumes = 9; - optional uint32 admin_port = 10; -} - - -message CollectionSetting { - optional string collection = 1; - optional string replica_placement = 2; - optional float vacuum_garbage_threshold = 3; -} - -message GlobalSetting { - repeated CollectionSetting settings = 1; - repeated string master_peers = 2; -} - -message JoinResponse { - optional GlobalSetting settings = 1; -} - - diff --git a/go/storage/store.go b/go/storage/store.go index b2fcdedc7..53f5edebf 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -1,7 +1,6 @@ package storage import ( - "encoding/json" "errors" "fmt" "math/rand" @@ -10,11 +9,12 @@ import ( "sync" + "encoding/json" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/util" - "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/go/weedpb" ) const ( @@ -71,13 +71,13 @@ func (mn *MasterNodes) findMaster() (string, error) { * A VolumeServer contains one Store */ type Store struct { - Ip string + joinKey string + ip string Port int PublicUrl string Locations []*DiskLocation dataCenter string //optional informaton, overwriting master setting if exists rack string //optional information, overwriting master setting if exists - connected bool volumeSizeLimit uint64 //read from the master masterNodes *MasterNodes needleMapKind NeedleMapType @@ -86,15 +86,15 @@ type Store struct { } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", - s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.IsConnected(), s.GetVolumeSizeLimit(), s.masterNodes) + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, joinKey:%v, volumeSizeLimit:%d, masterNodes:%s", + s.GetIP(), s.Port, s.PublicUrl, s.dataCenter, s.rack, s.GetJoinKey(), s.GetVolumeSizeLimit(), s.masterNodes) return } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { s = &Store{ Port: port, - Ip: ip, + ip: ip, PublicUrl: publicUrl, TaskManager: NewTaskManager(), needleMapKind: needleMapKind, @@ -219,7 +219,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S if e != nil { return } - var volumeMessages []*operation.VolumeInformationMessage + var volumeMessages []*weedpb.VolumeInformationMessage maxVolumeCount := 0 var maxFileKey uint64 for _, location := range s.Locations { @@ -230,16 +230,16 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S maxFileKey = v.nm.MaxFileKey() } if !v.expired(s.GetVolumeSizeLimit()) { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(v.Id)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.IsReadOnly()), - Version: proto.Uint32(uint32(v.Version())), - Ttl: proto.Uint32(v.Ttl.ToUint32()), + volumeMessage := &weedpb.VolumeInformationMessage{ + Id: uint32(v.Id), + Size: uint64(v.Size()), + Collection: v.Collection, + FileCount: uint64(v.nm.FileCount()), + DeleteCount: uint64(v.nm.DeletedCount()), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.IsReadOnly(), + Version: uint32(v.Version()), + Ttl: v.Ttl.ToUint32(), } volumeMessages = append(volumeMessages, volumeMessage) } else { @@ -257,44 +257,44 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } } - joinMessage := &operation.JoinMessage{ - IsInit: proto.Bool(!s.IsConnected()), - Ip: proto.String(s.Ip), - Port: proto.Uint32(uint32(s.Port)), - PublicUrl: proto.String(s.PublicUrl), - MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), - MaxFileKey: proto.Uint64(maxFileKey), - DataCenter: proto.String(s.dataCenter), - Rack: proto.String(s.rack), + joinMsgV2 := &weedpb.JoinMessageV2{ + JoinKey: s.GetJoinKey(), + Ip: s.GetIP(), + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCount: uint32(maxVolumeCount), + MaxFileKey: maxFileKey, + DataCenter: s.dataCenter, + Rack: s.rack, Volumes: volumeMessages, } - - data, err := proto.Marshal(joinMessage) - if err != nil { - return "", "", err - } - - joinUrl := util.MkUrl(masterNode, "/dir/join", nil) + ret := &weedpb.JoinResponse{} + joinUrl := util.MkUrl(masterNode, "/dir/join2", nil) glog.V(4).Infof("Connecting to %s ...", joinUrl) - - jsonBlob, err := util.PostBytes(joinUrl, data) - if err != nil { + if err := util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil { s.masterNodes.reset() return "", "", err } - var ret operation.JoinResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) - s.masterNodes.reset() - return masterNode, "", err - } + if ret.Error != "" { s.masterNodes.reset() return masterNode, "", errors.New(ret.Error) } - s.SetVolumeSizeLimit(ret.VolumeSizeLimit) + if ret.JoinKey != s.GetJoinKey() { + if glog.V(4) { + jsonData, _ := json.Marshal(ret) + glog.V(4).Infof("dir join sync settings: %v", string(jsonData)) + } + s.SetJoinKey(ret.JoinKey) + if ret.JoinIp != "" { + s.SetIP(ret.JoinIp) + } + if ret.VolumeSizeLimit != 0 { + s.SetVolumeSizeLimit(ret.VolumeSizeLimit) + } + } + //todo secretKey = security.Secret(ret.SecretKey) - s.SetConnected(true) return } func (s *Store) Close() { @@ -367,14 +367,26 @@ func (s *Store) SetVolumeSizeLimit(sz uint64) { s.volumeSizeLimit = sz } -func (s *Store) IsConnected() bool { +func (s *Store) GetIP() string { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.ip +} + +func (s *Store) SetIP(ip string) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.ip = ip +} + +func (s *Store) GetJoinKey() string { s.mutex.RLock() defer s.mutex.RUnlock() - return s.connected + return s.joinKey } -func (s *Store) SetConnected(b bool) { +func (s *Store) SetJoinKey(k string) { s.mutex.Lock() defer s.mutex.Unlock() - s.connected = b + s.joinKey = k } diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 1dab44cb2..f2973cb8f 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -4,7 +4,7 @@ import ( "fmt" "sort" - "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/weedpb" ) type VolumeInfo struct { @@ -19,18 +19,18 @@ type VolumeInfo struct { ReadOnly bool } -func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi *VolumeInfo, err error) { +func NewVolumeInfo(m *weedpb.VolumeInformationMessage) (vi *VolumeInfo, err error) { vi = &VolumeInfo{ - Id: VolumeId(*m.Id), - Size: *m.Size, - Collection: *m.Collection, - FileCount: int(*m.FileCount), - DeleteCount: int(*m.DeleteCount), - DeletedByteCount: *m.DeletedByteCount, - ReadOnly: *m.ReadOnly, - Version: Version(*m.Version), + Id: VolumeId(m.Id), + Size: m.Size, + Collection: m.Collection, + FileCount: int(m.FileCount), + DeleteCount: int(m.DeleteCount), + DeletedByteCount: m.DeletedByteCount, + ReadOnly: m.ReadOnly, + Version: Version(m.Version), } - vi.Ttl = LoadTTLFromUint32(*m.Ttl) + vi.Ttl = LoadTTLFromUint32(m.Ttl) return vi, nil } diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index 01d9b88ea..b030cd8ca 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -41,6 +41,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } u := util.MkUrl(location.Url, r.URL.Path, args) + glog.V(4).Infoln("write replication to", u) _, err := operation.Upload(u, string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), jwt) @@ -85,7 +86,7 @@ func distributedOperation(masterNode string, store *storage.Store, volumeId stor } if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String(), collection); lookupErr == nil { length := 0 - selfUrl := net.JoinHostPort(store.Ip, strconv.Itoa(store.Port)) + selfUrl := net.JoinHostPort(store.GetIP(), strconv.Itoa(store.Port)) results := make(chan bool) for _, location := range lookupResult.Locations { if location.Url != selfUrl { diff --git a/go/topology/topology.go b/go/topology/topology.go index b8a22e94f..f6d015f88 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -5,12 +5,15 @@ import ( "io/ioutil" "math/rand" + "strconv" + "time" + "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/sequence" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" + "github.com/chrislusf/seaweedfs/go/weedpb" ) type Topology struct { @@ -19,6 +22,7 @@ type Topology struct { collectionMap *util.ConcurrentMap pulse int64 volumeSizeLimit uint64 + joinKey string Sequence sequence.Sequencer CollectionSettings *storage.CollectionSettings configuration *Configuration @@ -39,6 +43,7 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit t.CollectionSettings = cs + t.ReGenJoinKey() t.Sequence = seq @@ -86,6 +91,25 @@ func (t *Topology) Leader() (string, error) { return l, nil } +func (t *Topology) GetJoinKey() string { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.joinKey +} + +func (t *Topology) ReGenJoinKey() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.joinKey = strconv.FormatInt(time.Now().UnixNano(), 16) +} + +func (t *Topology) GetVolumeSizeLimit() uint64 { + // volumeSizeLimit is only for read + //t.mutex.RLock() + //defer t.mutex.RUnlock() + return t.volumeSizeLimit +} + func (t *Topology) loadConfiguration(configurationFile string) error { b, e := ioutil.ReadFile(configurationFile) if e == nil { @@ -159,18 +183,18 @@ func (t *Topology) UnRegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(v, dn) } -func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { - t.Sequence.SetMax(*joinMessage.MaxFileKey) - dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) +func (t *Topology) ProcessJoinMessage(joinMessage *weedpb.JoinMessage) { + t.Sequence.SetMax(joinMessage.MaxFileKey) + dcName, rackName := t.configuration.Locate(joinMessage.Ip, joinMessage.DataCenter, joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) - if *joinMessage.IsInit && dn != nil { + dn := rack.FindDataNode(joinMessage.Ip, int(joinMessage.Port)) + if joinMessage.IsInit && dn != nil { t.UnRegisterDataNode(dn) } - dn = rack.GetOrCreateDataNode(*joinMessage.Ip, - int(*joinMessage.Port), *joinMessage.PublicUrl, - int(*joinMessage.MaxVolumeCount)) + dn = rack.GetOrCreateDataNode(joinMessage.Ip, + int(joinMessage.Port), joinMessage.PublicUrl, + int(joinMessage.MaxVolumeCount)) var volumeInfos []*storage.VolumeInfo for _, v := range joinMessage.Volumes { if vi, err := storage.NewVolumeInfo(v); err == nil { @@ -190,6 +214,37 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { } +func (t *Topology) ProcessJoinMessageV2(joinMsgV2 *weedpb.JoinMessageV2) { + t.Sequence.SetMax(joinMsgV2.MaxFileKey) + dcName, rackName := t.configuration.Locate(joinMsgV2.Ip, joinMsgV2.DataCenter, joinMsgV2.Rack) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn := rack.FindDataNode(joinMsgV2.Ip, int(joinMsgV2.Port)) + if joinMsgV2.JoinKey == "" && dn != nil { + t.UnRegisterDataNode(dn) + } + dn = rack.GetOrCreateDataNode(joinMsgV2.Ip, + int(joinMsgV2.Port), joinMsgV2.PublicUrl, + int(joinMsgV2.MaxVolumeCount)) + var volumeInfos []*storage.VolumeInfo + for _, v := range joinMsgV2.Volumes { + if vi, err := storage.NewVolumeInfo(v); err == nil { + volumeInfos = append(volumeInfos, vi) + } else { + glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) + } + } + + deletedVolumes := dn.UpdateVolumes(volumeInfos) + for _, v := range volumeInfos { + t.RegisterVolumeLayout(v, dn) + } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } + +} + func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { n := t.GetChildren(NodeId(dcName)) if n != nil { diff --git a/go/util/http_util.go b/go/util/http_util.go index 722c63276..13cc1d8ed 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -15,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" + "github.com/golang/protobuf/proto" "github.com/pierrec/lz4" "strconv" ) @@ -56,6 +57,34 @@ func PostBytes(url string, body []byte) ([]byte, error) { return b, nil } +func PostPbMsg(url string, msg proto.Message, ret proto.Message) error { + data, err := proto.Marshal(msg) + if err != nil { + return err + } + req, err := http.NewRequest("POST", url, bytes.NewReader(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/protobuf") + req.Header.Set("Accept", "application/protobuf") + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("Post to %s: %v", url, err) + } + defer resp.Body.Close() + retBlob, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("Read response body: %v", err) + } + + if err := proto.Unmarshal(retBlob, ret); err != nil { + glog.V(0).Infof("Failed to umarshal pb %s with response: %s", url, string(retBlob)) + return err + } + return nil +} + func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) { url := MkUrl(host, path, nil) glog.V(4).Infoln("Post", url+"?"+values.Encode()) diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 582e3e4ef..ada571f49 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -4,19 +4,21 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "net/http" "path/filepath" "strconv" "strings" "time" + "io/ioutil" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/stats" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" + "github.com/golang/protobuf/proto" ) var serverStats *stats.ServerStats @@ -28,42 +30,57 @@ func init() { } -func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { - var bytes []byte - if r.FormValue("pretty") != "" { - bytes, err = json.MarshalIndent(obj, "", " ") - } else { - bytes, err = json.Marshal(obj) - } +func readObjRequest(r *http.Request, obj interface{}) error { + body, err := ioutil.ReadAll(r.Body) if err != nil { - return + return err } - callback := r.FormValue("callback") - if callback == "" { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(httpStatus) - _, err = w.Write(bytes) + if pbMsg, ok := obj.(proto.Message); ok && strings.Contains(r.Header.Get("Content-Type"), "protobuf") { + if err := proto.Unmarshal(body, pbMsg); err != nil { + return err + } } else { - w.Header().Set("Content-Type", "application/javascript") - w.WriteHeader(httpStatus) - if _, err = w.Write([]uint8(callback)); err != nil { - return + if err := json.Unmarshal(body, obj); err != nil { + return err } - if _, err = w.Write([]uint8("(")); err != nil { - return + } + return nil +} + +func writeObjResponse(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { + var ( + bytes []byte + contentType string + ) + if pbMsg, ok := obj.(proto.Message); ok && strings.Contains(r.Header.Get("Accept"), "protobuf") { + bytes, err = proto.Marshal(pbMsg) + contentType = "application/protobuf" + } else { + if r.FormValue("pretty") != "" { + bytes, err = json.MarshalIndent(obj, "", " ") + } else { + bytes, err = json.Marshal(obj) } - fmt.Fprint(w, string(bytes)) - if _, err = w.Write([]uint8(")")); err != nil { - return + if callback := r.FormValue("callback"); callback != "" { + contentType = "application/javascript" + bytes = []byte(callback + "(" + string(bytes) + ")") + } else { + contentType = "application/json" } } + if err != nil { + return + } + w.Header().Set("Content-Type", contentType) + w.WriteHeader(httpStatus) + _, err = w.Write(bytes) return } // wrapper for writeJson - just logs errors func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { - if err := writeJson(w, r, httpStatus, obj); err != nil { + if err := writeObjResponse(w, r, httpStatus, obj); err != nil { glog.V(0).Infof("error writing JSON %s: %v", obj, err) } } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 07d9a4722..301ff2160 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -67,6 +67,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) + r.HandleFunc("/dir/join2", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoin2Handler))) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index b06440d85..767ecb85d 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -14,10 +14,10 @@ import ( "net" "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/topology" "github.com/chrislusf/seaweedfs/go/util" + "github.com/chrislusf/seaweedfs/go/weedpb" "github.com/golang/protobuf/proto" ) @@ -37,23 +37,24 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R ms.Topo.DeleteCollection(r.FormValue("collection")) } +// deprecated func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { writeJsonError(w, r, http.StatusBadRequest, err) return } - joinMessage := &operation.JoinMessage{} + joinMessage := &weedpb.JoinMessage{} if err = proto.Unmarshal(body, joinMessage); err != nil { writeJsonError(w, r, http.StatusBadRequest, err) return } - if *joinMessage.Ip == "" { + if joinMessage.Ip == "" { if ip, _, e := net.SplitHostPort(r.RemoteAddr); e == nil { - *joinMessage.Ip = ip + joinMessage.Ip = ip } else { glog.V(2).Infof("SplitHostPort (%s) error, %v", r.RemoteAddr, e) - *joinMessage.Ip = r.RemoteAddr + joinMessage.Ip = r.RemoteAddr } } if glog.V(4) { @@ -65,14 +66,51 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) } } - ms.Topo.ProcessJoinMessage(joinMessage) - writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ + type JoinResult struct { + VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` + SecretKey string `json:"secretKey,omitempty"` + Error string `json:"error,omitempty"` + } + writeJsonQuiet(w, r, http.StatusOK, JoinResult{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, SecretKey: string(ms.guard.SecretKey), }) } +func (ms *MasterServer) dirJoin2Handler(w http.ResponseWriter, r *http.Request) { + joinResp := &weedpb.JoinResponse{} + joinMsgV2 := &weedpb.JoinMessageV2{} + + if err := readObjRequest(r, joinMsgV2); err != nil { + joinResp.Error = err.Error() + writeObjResponse(w, r, http.StatusBadRequest, joinResp) + return + } + if joinMsgV2.Ip == "" { + if ip, _, e := net.SplitHostPort(r.RemoteAddr); e == nil { + joinMsgV2.Ip = ip + } else { + glog.V(2).Infof("SplitHostPort (%s) error, %v", r.RemoteAddr, e) + joinMsgV2.Ip = r.RemoteAddr + } + } + if glog.V(4) { + jsonData, _ := json.Marshal(joinMsgV2) + glog.V(4).Infoln("join proto:", string(jsonData)) + } + + ms.Topo.ProcessJoinMessageV2(joinMsgV2) + + joinResp.JoinKey = ms.Topo.GetJoinKey() + if joinMsgV2.JoinKey != joinResp.JoinKey { + joinResp.JoinIp = joinMsgV2.Ip + joinResp.VolumeSizeLimit = ms.Topo.GetVolumeSizeLimit() + joinResp.SecretKey = string(ms.guard.SecretKey) + } + writeObjResponse(w, r, http.StatusOK, joinResp) +} + func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 779d6f99d..40a4642e5 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -117,5 +117,5 @@ func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Re result["errors"] = errs } - writeJson(w, r, http.StatusAccepted, result) + writeObjResponse(w, r, http.StatusAccepted, result) } diff --git a/go/proto/Makefile b/go/weedpb/Makefile similarity index 74% rename from go/proto/Makefile rename to go/weedpb/Makefile index 73af851dd..3f6ec2cc6 100644 --- a/go/proto/Makefile +++ b/go/weedpb/Makefile @@ -1,4 +1,4 @@ -TARG=../operation +TARG=./ all: protoc --go_out=$(TARG) system_message.proto diff --git a/go/weedpb/system_message.pb.go b/go/weedpb/system_message.pb.go new file mode 100644 index 000000000..fa17b7bf0 --- /dev/null +++ b/go/weedpb/system_message.pb.go @@ -0,0 +1,180 @@ +// Code generated by protoc-gen-go. +// source: system_message.proto +// DO NOT EDIT! + +/* +Package weedpb is a generated protocol buffer package. + +It is generated from these files: + system_message.proto + +It has these top-level messages: + VolumeInformationMessage + JoinMessage + JoinMessageV2 + CollectionSetting + JoinResponse +*/ +package weedpb + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.ProtoPackageIsVersion1 + +type VolumeInformationMessage struct { + Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + Size uint64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"` + Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + FileCount uint64 `protobuf:"varint,4,opt,name=file_count,json=fileCount" json:"file_count,omitempty"` + DeleteCount uint64 `protobuf:"varint,5,opt,name=delete_count,json=deleteCount" json:"delete_count,omitempty"` + DeletedByteCount uint64 `protobuf:"varint,6,opt,name=deleted_byte_count,json=deletedByteCount" json:"deleted_byte_count,omitempty"` + ReadOnly bool `protobuf:"varint,7,opt,name=read_only,json=readOnly" json:"read_only,omitempty"` + ReplicaPlacement uint32 `protobuf:"varint,8,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"` + Version uint32 `protobuf:"varint,9,opt,name=version" json:"version,omitempty"` + Ttl uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` +} + +func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } +func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } +func (*VolumeInformationMessage) ProtoMessage() {} +func (*VolumeInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +// deprecated +type JoinMessage struct { + IsInit bool `protobuf:"varint,1,opt,name=is_init,json=isInit" json:"is_init,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"` + Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"` + PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"` + MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"` + MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"` + DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` + Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` + AdminPort uint32 `protobuf:"varint,10,opt,name=admin_port,json=adminPort" json:"admin_port,omitempty"` +} + +func (m *JoinMessage) Reset() { *m = JoinMessage{} } +func (m *JoinMessage) String() string { return proto.CompactTextString(m) } +func (*JoinMessage) ProtoMessage() {} +func (*JoinMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { + if m != nil { + return m.Volumes + } + return nil +} + +type JoinMessageV2 struct { + JoinKey string `protobuf:"bytes,1,opt,name=join_key,json=joinKey" json:"join_key,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"` + Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"` + PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"` + MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"` + MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"` + DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` + Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` +} + +func (m *JoinMessageV2) Reset() { *m = JoinMessageV2{} } +func (m *JoinMessageV2) String() string { return proto.CompactTextString(m) } +func (*JoinMessageV2) ProtoMessage() {} +func (*JoinMessageV2) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *JoinMessageV2) GetVolumes() []*VolumeInformationMessage { + if m != nil { + return m.Volumes + } + return nil +} + +type CollectionSetting struct { + Collection string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"` + ReplicaPlacement string `protobuf:"bytes,2,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"` +} + +func (m *CollectionSetting) Reset() { *m = CollectionSetting{} } +func (m *CollectionSetting) String() string { return proto.CompactTextString(m) } +func (*CollectionSetting) ProtoMessage() {} +func (*CollectionSetting) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +type JoinResponse struct { + Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` + JoinKey string `protobuf:"bytes,2,opt,name=join_key,json=joinKey" json:"join_key,omitempty"` + JoinIp string `protobuf:"bytes,3,opt,name=join_ip,json=joinIp" json:"join_ip,omitempty"` + VolumeSizeLimit uint64 `protobuf:"varint,4,opt,name=volume_size_limit,json=volumeSizeLimit" json:"volume_size_limit,omitempty"` + CollectionSettings []*CollectionSetting `protobuf:"bytes,5,rep,name=collection_settings,json=collectionSettings" json:"collection_settings,omitempty"` + SecretKey string `protobuf:"bytes,6,opt,name=secret_key,json=secretKey" json:"secret_key,omitempty"` +} + +func (m *JoinResponse) Reset() { *m = JoinResponse{} } +func (m *JoinResponse) String() string { return proto.CompactTextString(m) } +func (*JoinResponse) ProtoMessage() {} +func (*JoinResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *JoinResponse) GetCollectionSettings() []*CollectionSetting { + if m != nil { + return m.CollectionSettings + } + return nil +} + +func init() { + proto.RegisterType((*VolumeInformationMessage)(nil), "weedpb.VolumeInformationMessage") + proto.RegisterType((*JoinMessage)(nil), "weedpb.JoinMessage") + proto.RegisterType((*JoinMessageV2)(nil), "weedpb.JoinMessageV2") + proto.RegisterType((*CollectionSetting)(nil), "weedpb.CollectionSetting") + proto.RegisterType((*JoinResponse)(nil), "weedpb.JoinResponse") +} + +var fileDescriptor0 = []byte{ + // 593 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe4, 0x94, 0x5d, 0x6e, 0xd3, 0x40, + 0x10, 0xc7, 0x15, 0xb7, 0x4d, 0xe2, 0x49, 0x53, 0xd2, 0xa5, 0x12, 0xae, 0x10, 0x50, 0xf2, 0x54, + 0x01, 0x2a, 0x52, 0x79, 0xe3, 0xb1, 0x95, 0x90, 0x5a, 0x40, 0x54, 0x5b, 0xd1, 0x57, 0xe3, 0xd8, + 0xd3, 0x6a, 0xa9, 0xed, 0xb5, 0x76, 0x37, 0x05, 0x73, 0x24, 0xee, 0x81, 0xc4, 0x6d, 0xb8, 0x02, + 0xb3, 0xb3, 0x71, 0xe8, 0x07, 0x9c, 0x80, 0xb7, 0x99, 0xff, 0xcc, 0xda, 0x33, 0xbf, 0xd9, 0x59, + 0xd8, 0xb2, 0xad, 0x75, 0x58, 0xa5, 0x15, 0x5a, 0x9b, 0x5d, 0xe0, 0x5e, 0x63, 0xb4, 0xd3, 0xa2, + 0xff, 0x05, 0xb1, 0x68, 0x66, 0xd3, 0x9f, 0x11, 0x24, 0x67, 0xba, 0x9c, 0x57, 0x78, 0x54, 0x9f, + 0x6b, 0x53, 0x65, 0x4e, 0xe9, 0xfa, 0x7d, 0x48, 0x15, 0x1b, 0x10, 0xa9, 0x22, 0xe9, 0xed, 0xf4, + 0x76, 0xc7, 0x92, 0x2c, 0x21, 0x60, 0xd5, 0xaa, 0x6f, 0x98, 0x44, 0xa4, 0xac, 0x4a, 0xb6, 0xc5, + 0x63, 0x80, 0x5c, 0x97, 0x25, 0xe6, 0xfe, 0x60, 0xb2, 0x42, 0x91, 0x58, 0x5e, 0x53, 0xc4, 0x23, + 0x80, 0x73, 0x55, 0x62, 0x9a, 0xeb, 0x79, 0xed, 0x92, 0x55, 0x3e, 0x19, 0x7b, 0xe5, 0xd0, 0x0b, + 0xe2, 0x29, 0xac, 0x17, 0x58, 0xa2, 0xeb, 0x12, 0xd6, 0x38, 0x61, 0x14, 0xb4, 0x90, 0xf2, 0x02, + 0x44, 0x70, 0x8b, 0x74, 0xd6, 0x2e, 0x13, 0xfb, 0x9c, 0x38, 0x59, 0x44, 0x0e, 0xda, 0x2e, 0xfb, + 0x21, 0xc4, 0x06, 0xb3, 0x22, 0xd5, 0x75, 0xd9, 0x26, 0x03, 0x4a, 0x1a, 0xca, 0xa1, 0x17, 0x3e, + 0x90, 0x2f, 0x5e, 0xc2, 0xa6, 0xc1, 0xa6, 0x54, 0x79, 0x96, 0x36, 0x65, 0x96, 0x63, 0x85, 0xf4, + 0xa5, 0xa1, 0xef, 0xef, 0x20, 0x4a, 0x7a, 0x72, 0xb2, 0x08, 0x9e, 0x74, 0x31, 0x91, 0xc0, 0xe0, + 0x0a, 0x8d, 0xf5, 0xad, 0xc5, 0x8c, 0xa1, 0x73, 0xc5, 0x04, 0x56, 0x9c, 0x2b, 0x13, 0x60, 0xd5, + 0x9b, 0xd3, 0x1f, 0x11, 0x8c, 0x8e, 0xb5, 0x5a, 0xd2, 0x7b, 0x00, 0x03, 0x65, 0x53, 0x55, 0x2b, + 0xc7, 0x08, 0x87, 0xb2, 0xaf, 0xec, 0x11, 0x79, 0x8c, 0xb5, 0x61, 0x88, 0x31, 0x61, 0x6d, 0x3c, + 0xd6, 0x46, 0x1b, 0xc7, 0xf0, 0xc6, 0x92, 0x6d, 0x8f, 0xad, 0x99, 0xcf, 0xa8, 0x98, 0x74, 0x6e, + 0x4a, 0xc6, 0x16, 0xcb, 0x38, 0x28, 0x1f, 0x4d, 0x29, 0x76, 0x61, 0x52, 0x65, 0x5f, 0xd3, 0x2b, + 0x9e, 0xdc, 0x35, 0x74, 0x63, 0xb9, 0x41, 0x7a, 0x18, 0x68, 0xe0, 0xb1, 0x03, 0xeb, 0x3e, 0x93, + 0x67, 0x70, 0x89, 0xed, 0x82, 0x1b, 0x90, 0xf6, 0x86, 0xa4, 0xb7, 0xd8, 0x8a, 0x27, 0x30, 0x2a, + 0x32, 0x97, 0xa5, 0x39, 0x35, 0x8c, 0x86, 0x99, 0xd1, 0x08, 0xbd, 0x74, 0xc8, 0x8a, 0xaf, 0xcf, + 0x64, 0xf9, 0x25, 0x83, 0x8a, 0x25, 0xdb, 0xe2, 0x35, 0x81, 0xe1, 0xbf, 0x58, 0x02, 0xb3, 0xb2, + 0x3b, 0xda, 0xdf, 0xd9, 0x0b, 0x37, 0x6a, 0xef, 0x5f, 0xb7, 0x49, 0x76, 0x07, 0x7c, 0x6f, 0x59, + 0x51, 0xa9, 0x3a, 0xe5, 0xae, 0x03, 0xc1, 0x98, 0x95, 0x13, 0x12, 0xa6, 0xdf, 0x23, 0x18, 0x5f, + 0xe3, 0x78, 0xb6, 0x2f, 0xb6, 0x61, 0xf8, 0x99, 0x04, 0xae, 0xbf, 0xc7, 0x45, 0x0c, 0xbc, 0xef, + 0x8b, 0xff, 0xcf, 0x59, 0x4e, 0x3f, 0xc1, 0xe6, 0xe1, 0x72, 0xd9, 0x4e, 0xd1, 0x39, 0x55, 0x5f, + 0xdc, 0xda, 0xc9, 0xde, 0x9d, 0x9d, 0x7c, 0xfe, 0xb7, 0x35, 0x08, 0x0c, 0xef, 0xac, 0xc0, 0xf4, + 0x57, 0x0f, 0xd6, 0xfd, 0x38, 0x24, 0xda, 0x46, 0xd7, 0x16, 0xc5, 0x16, 0xac, 0xa1, 0x31, 0xda, + 0x2c, 0x3e, 0x1c, 0x9c, 0x1b, 0x33, 0x8a, 0x6e, 0xce, 0x88, 0x16, 0x81, 0x43, 0x34, 0xa8, 0xf0, + 0x3e, 0xf4, 0xbd, 0x7b, 0xd4, 0x88, 0x67, 0xb0, 0xb9, 0xa0, 0xee, 0x9f, 0x92, 0xb4, 0x54, 0x95, + 0xea, 0x9e, 0x88, 0x7b, 0x21, 0x70, 0x4a, 0xfa, 0x3b, 0x2f, 0x8b, 0x63, 0xb8, 0xff, 0xa7, 0x83, + 0xd4, 0x86, 0x4e, 0x2d, 0x0d, 0xca, 0x03, 0xdb, 0xee, 0x80, 0xdd, 0x61, 0x21, 0x45, 0x7e, 0x5b, + 0xe2, 0x0b, 0x68, 0x31, 0x37, 0xe8, 0x96, 0x53, 0xa4, 0x0b, 0x11, 0x14, 0xaa, 0x77, 0xd6, 0xe7, + 0x27, 0xf2, 0xd5, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdf, 0x7d, 0xe0, 0x05, 0x3a, 0x05, 0x00, + 0x00, +} diff --git a/go/weedpb/system_message.proto b/go/weedpb/system_message.proto new file mode 100644 index 000000000..b780bb929 --- /dev/null +++ b/go/weedpb/system_message.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; +package weedpb; + + +message VolumeInformationMessage { + uint32 id = 1; + uint64 size = 2; + string collection = 3; + uint64 file_count = 4; + uint64 delete_count = 5; + uint64 deleted_byte_count = 6; + bool read_only = 7; + uint32 replica_placement = 8 [deprecated=true]; + uint32 version = 9; + uint32 ttl = 10; +} + +// deprecated +message JoinMessage { + bool is_init = 1; + string ip = 2; + uint32 port = 3; + string public_url = 4; + uint32 max_volume_count = 5; + uint64 max_file_key = 6; + string data_center = 7; + string rack = 8; + repeated VolumeInformationMessage volumes = 9; + uint32 admin_port = 10; +} + +message JoinMessageV2 { + string join_key = 1; //if data node is init, set join key empty + string ip = 2; + uint32 port = 3; + string public_url = 4; + uint32 max_volume_count = 5; + uint64 max_file_key = 6; + string data_center = 7; + string rack = 8; + repeated VolumeInformationMessage volumes = 9; +} + +message CollectionSetting { + string collection = 1; + string replica_placement = 2; +// float vacuum_garbage_threshold = 3; +} + +message JoinResponse { + string error = 1; + string join_key = 2; + string join_ip = 3; + uint64 volume_size_limit = 4; + repeated CollectionSetting collection_settings = 5; + string secret_key = 6; +// repeated string master_peers = 7; + +} + + diff --git a/go/operation/system_message_test.go b/go/weedpb/system_message_test.go similarity index 50% rename from go/operation/system_message_test.go rename to go/weedpb/system_message_test.go index d18ca49a4..21d2108ea 100644 --- a/go/operation/system_message_test.go +++ b/go/weedpb/system_message_test.go @@ -1,4 +1,4 @@ -package operation +package weedpb import ( "encoding/json" @@ -10,28 +10,28 @@ import ( func TestSerialDeserial(t *testing.T) { volumeMessage := &VolumeInformationMessage{ - Id: proto.Uint32(12), - Size: proto.Uint64(2341234), - Collection: proto.String("benchmark"), - FileCount: proto.Uint64(2341234), - DeleteCount: proto.Uint64(234), - DeletedByteCount: proto.Uint64(21234), - ReadOnly: proto.Bool(false), - ReplicaPlacement: proto.Uint32(210), - Version: proto.Uint32(2), + Id: 12, + Size: 2341234, + Collection: "benchmark", + FileCount: 2341234, + DeleteCount: 234, + DeletedByteCount: 21234, + ReadOnly: false, + ReplicaPlacement: 210, + Version: 2, } var volumeMessages []*VolumeInformationMessage volumeMessages = append(volumeMessages, volumeMessage) joinMessage := &JoinMessage{ - IsInit: proto.Bool(true), - Ip: proto.String("127.0.3.12"), - Port: proto.Uint32(34546), - PublicUrl: proto.String("localhost:2342"), - MaxVolumeCount: proto.Uint32(210), - MaxFileKey: proto.Uint64(324234423), - DataCenter: proto.String("dc1"), - Rack: proto.String("rack2"), + IsInit: true, + Ip: "127.0.3.12", + Port: 34546, + PublicUrl: "localhost:2342", + MaxVolumeCount: 210, + MaxFileKey: 324234423, + DataCenter: "dc1", + Rack: "rack2", Volumes: volumeMessages, } @@ -53,7 +53,7 @@ func TestSerialDeserial(t *testing.T) { log.Println("The json data size is", len(jsonData), string(jsonData)) // Now test and newTest contain the same data. - if *joinMessage.PublicUrl != *newMessage.PublicUrl { - log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl) + if joinMessage.PublicUrl != newMessage.PublicUrl { + log.Fatalf("data mismatch %q != %q", joinMessage.PublicUrl, newMessage.PublicUrl) } }