Browse Source

upgrade proto2 to proto3

master add a new interface `/dir/join2` to process new join message, it use a `JoinKey` sync the settings betweed master and volume
pull/279/head
tnextday 10 years ago
parent
commit
9016fbcc57
  1. 5
      Makefile
  2. 7
      go/operation/data_struts.go
  3. 278
      go/operation/system_message.pb.go
  4. 45
      go/proto/system_message.proto
  5. 110
      go/storage/store.go
  6. 22
      go/storage/volume_info.go
  7. 3
      go/topology/store_replicate.go
  8. 73
      go/topology/topology.go
  9. 29
      go/util/http_util.go
  10. 65
      go/weed/weed_server/common.go
  11. 1
      go/weed/weed_server/master_server.go
  12. 52
      go/weed/weed_server/master_server_handlers_admin.go
  13. 2
      go/weed/weed_server/volume_server_handlers_admin.go
  14. 2
      go/weedpb/Makefile
  15. 180
      go/weedpb/system_message.pb.go
  16. 61
      go/weedpb/system_message.proto
  17. 40
      go/weedpb/system_message_test.go

5
Makefile

@ -5,7 +5,7 @@ SOURCE_DIR = ./go/weed/
all: build
.PHONY : clean deps build linux
.PHONY : clean deps build linux vet
clean:
go clean -i $(GO_FLAGS) $(SOURCE_DIR)
@ -17,6 +17,9 @@ deps:
fmt:
gofmt -w -s ./go/
vet:
go vet ./go/...
build: deps fmt
go build $(GO_FLAGS) -o $(BINARY) $(SOURCE_DIR)

7
go/operation/data_struts.go

@ -1,7 +0,0 @@
package operation
type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"`
}

278
go/operation/system_message.pb.go

@ -1,278 +0,0 @@
// Code generated by protoc-gen-go.
// source: system_message.proto
// DO NOT EDIT!
/*
Package operation is a generated protocol buffer package.
It is generated from these files:
system_message.proto
It has these top-level messages:
VolumeInformationMessage
JoinMessage
CollectionSetting
GlobalSetting
JoinResponse
*/
package operation
import proto "github.com/golang/protobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type VolumeInformationMessage struct {
Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,opt,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeInformationMessage) ProtoMessage() {}
const Default_VolumeInformationMessage_Version uint32 = 2
func (m *VolumeInformationMessage) GetId() uint32 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *VolumeInformationMessage) GetSize() uint64 {
if m != nil && m.Size != nil {
return *m.Size
}
return 0
}
func (m *VolumeInformationMessage) GetCollection() string {
if m != nil && m.Collection != nil {
return *m.Collection
}
return ""
}
func (m *VolumeInformationMessage) GetFileCount() uint64 {
if m != nil && m.FileCount != nil {
return *m.FileCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
if m != nil && m.DeleteCount != nil {
return *m.DeleteCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
if m != nil && m.DeletedByteCount != nil {
return *m.DeletedByteCount
}
return 0
}
func (m *VolumeInformationMessage) GetReadOnly() bool {
if m != nil && m.ReadOnly != nil {
return *m.ReadOnly
}
return false
}
func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
if m != nil && m.ReplicaPlacement != nil {
return *m.ReplicaPlacement
}
return 0
}
func (m *VolumeInformationMessage) GetVersion() uint32 {
if m != nil && m.Version != nil {
return *m.Version
}
return Default_VolumeInformationMessage_Version
}
func (m *VolumeInformationMessage) GetTtl() uint32 {
if m != nil && m.Ttl != nil {
return *m.Ttl
}
return 0
}
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinMessage) Reset() { *m = JoinMessage{} }
func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
func (*JoinMessage) ProtoMessage() {}
func (m *JoinMessage) GetIsInit() bool {
if m != nil && m.IsInit != nil {
return *m.IsInit
}
return false
}
func (m *JoinMessage) GetIp() string {
if m != nil && m.Ip != nil {
return *m.Ip
}
return ""
}
func (m *JoinMessage) GetPort() uint32 {
if m != nil && m.Port != nil {
return *m.Port
}
return 0
}
func (m *JoinMessage) GetPublicUrl() string {
if m != nil && m.PublicUrl != nil {
return *m.PublicUrl
}
return ""
}
func (m *JoinMessage) GetMaxVolumeCount() uint32 {
if m != nil && m.MaxVolumeCount != nil {
return *m.MaxVolumeCount
}
return 0
}
func (m *JoinMessage) GetMaxFileKey() uint64 {
if m != nil && m.MaxFileKey != nil {
return *m.MaxFileKey
}
return 0
}
func (m *JoinMessage) GetDataCenter() string {
if m != nil && m.DataCenter != nil {
return *m.DataCenter
}
return ""
}
func (m *JoinMessage) GetRack() string {
if m != nil && m.Rack != nil {
return *m.Rack
}
return ""
}
func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
func (m *JoinMessage) GetAdminPort() uint32 {
if m != nil && m.AdminPort != nil {
return *m.AdminPort
}
return 0
}
type CollectionSetting struct {
Collection *string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"`
ReplicaPlacement *string `protobuf:"bytes,2,opt,name=replica_placement" json:"replica_placement,omitempty"`
VacuumGarbageThreshold *float32 `protobuf:"fixed32,3,opt,name=vacuum_garbage_threshold" json:"vacuum_garbage_threshold,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CollectionSetting) Reset() { *m = CollectionSetting{} }
func (m *CollectionSetting) String() string { return proto.CompactTextString(m) }
func (*CollectionSetting) ProtoMessage() {}
func (m *CollectionSetting) GetCollection() string {
if m != nil && m.Collection != nil {
return *m.Collection
}
return ""
}
func (m *CollectionSetting) GetReplicaPlacement() string {
if m != nil && m.ReplicaPlacement != nil {
return *m.ReplicaPlacement
}
return ""
}
func (m *CollectionSetting) GetVacuumGarbageThreshold() float32 {
if m != nil && m.VacuumGarbageThreshold != nil {
return *m.VacuumGarbageThreshold
}
return 0
}
type GlobalSetting struct {
Settings []*CollectionSetting `protobuf:"bytes,1,rep,name=settings" json:"settings,omitempty"`
MasterPeers []string `protobuf:"bytes,2,rep,name=master_peers" json:"master_peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GlobalSetting) Reset() { *m = GlobalSetting{} }
func (m *GlobalSetting) String() string { return proto.CompactTextString(m) }
func (*GlobalSetting) ProtoMessage() {}
func (m *GlobalSetting) GetSettings() []*CollectionSetting {
if m != nil {
return m.Settings
}
return nil
}
func (m *GlobalSetting) GetMasterPeers() []string {
if m != nil {
return m.MasterPeers
}
return nil
}
type JoinResponse struct {
Settings *GlobalSetting `protobuf:"bytes,1,opt,name=settings" json:"settings,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinResponse) Reset() { *m = JoinResponse{} }
func (m *JoinResponse) String() string { return proto.CompactTextString(m) }
func (*JoinResponse) ProtoMessage() {}
func (m *JoinResponse) GetSettings() *GlobalSetting {
if m != nil {
return m.Settings
}
return nil
}
func init() {
}

45
go/proto/system_message.proto

@ -1,45 +0,0 @@
package operation;
message VolumeInformationMessage {
required uint32 id = 1;
required uint64 size = 2;
optional string collection = 3;
required uint64 file_count = 4;
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
optional uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
}
message JoinMessage {
optional bool is_init = 1;
required string ip = 2;
required uint32 port = 3;
optional string public_url = 4;
required uint32 max_volume_count = 5;
required uint64 max_file_key = 6;
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
optional uint32 admin_port = 10;
}
message CollectionSetting {
optional string collection = 1;
optional string replica_placement = 2;
optional float vacuum_garbage_threshold = 3;
}
message GlobalSetting {
repeated CollectionSetting settings = 1;
repeated string master_peers = 2;
}
message JoinResponse {
optional GlobalSetting settings = 1;
}

110
go/storage/store.go

@ -1,7 +1,6 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -10,11 +9,12 @@ import (
"sync"
"encoding/json"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/go/weedpb"
)
const (
@ -71,13 +71,13 @@ func (mn *MasterNodes) findMaster() (string, error) {
* A VolumeServer contains one Store
*/
type Store struct {
Ip string
joinKey string
ip string
Port int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
masterNodes *MasterNodes
needleMapKind NeedleMapType
@ -86,15 +86,15 @@ type Store struct {
}
func (s *Store) String() (str string) {
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s",
s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.IsConnected(), s.GetVolumeSizeLimit(), s.masterNodes)
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, joinKey:%v, volumeSizeLimit:%d, masterNodes:%s",
s.GetIP(), s.Port, s.PublicUrl, s.dataCenter, s.rack, s.GetJoinKey(), s.GetVolumeSizeLimit(), s.masterNodes)
return
}
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{
Port: port,
Ip: ip,
ip: ip,
PublicUrl: publicUrl,
TaskManager: NewTaskManager(),
needleMapKind: needleMapKind,
@ -219,7 +219,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
if e != nil {
return
}
var volumeMessages []*operation.VolumeInformationMessage
var volumeMessages []*weedpb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
@ -230,16 +230,16 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(v.Id)),
Size: proto.Uint64(uint64(v.Size())),
Collection: proto.String(v.Collection),
FileCount: proto.Uint64(uint64(v.nm.FileCount())),
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.IsReadOnly()),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
volumeMessage := &weedpb.VolumeInformationMessage{
Id: uint32(v.Id),
Size: uint64(v.Size()),
Collection: v.Collection,
FileCount: uint64(v.nm.FileCount()),
DeleteCount: uint64(v.nm.DeletedCount()),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.IsReadOnly(),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {
@ -257,44 +257,44 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
}
joinMessage := &operation.JoinMessage{
IsInit: proto.Bool(!s.IsConnected()),
Ip: proto.String(s.Ip),
Port: proto.Uint32(uint32(s.Port)),
PublicUrl: proto.String(s.PublicUrl),
MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
MaxFileKey: proto.Uint64(maxFileKey),
DataCenter: proto.String(s.dataCenter),
Rack: proto.String(s.rack),
joinMsgV2 := &weedpb.JoinMessageV2{
JoinKey: s.GetJoinKey(),
Ip: s.GetIP(),
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: maxFileKey,
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
}
data, err := proto.Marshal(joinMessage)
if err != nil {
return "", "", err
}
joinUrl := util.MkUrl(masterNode, "/dir/join", nil)
ret := &weedpb.JoinResponse{}
joinUrl := util.MkUrl(masterNode, "/dir/join2", nil)
glog.V(4).Infof("Connecting to %s ...", joinUrl)
jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil {
if err := util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil {
s.masterNodes.reset()
return "", "", err
}
var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
s.masterNodes.reset()
return masterNode, "", err
}
if ret.Error != "" {
s.masterNodes.reset()
return masterNode, "", errors.New(ret.Error)
}
s.SetVolumeSizeLimit(ret.VolumeSizeLimit)
if ret.JoinKey != s.GetJoinKey() {
if glog.V(4) {
jsonData, _ := json.Marshal(ret)
glog.V(4).Infof("dir join sync settings: %v", string(jsonData))
}
s.SetJoinKey(ret.JoinKey)
if ret.JoinIp != "" {
s.SetIP(ret.JoinIp)
}
if ret.VolumeSizeLimit != 0 {
s.SetVolumeSizeLimit(ret.VolumeSizeLimit)
}
}
//todo
secretKey = security.Secret(ret.SecretKey)
s.SetConnected(true)
return
}
func (s *Store) Close() {
@ -367,14 +367,26 @@ func (s *Store) SetVolumeSizeLimit(sz uint64) {
s.volumeSizeLimit = sz
}
func (s *Store) IsConnected() bool {
func (s *Store) GetIP() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.ip
}
func (s *Store) SetIP(ip string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.ip = ip
}
func (s *Store) GetJoinKey() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.connected
return s.joinKey
}
func (s *Store) SetConnected(b bool) {
func (s *Store) SetJoinKey(k string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.connected = b
s.joinKey = k
}

22
go/storage/volume_info.go

@ -4,7 +4,7 @@ import (
"fmt"
"sort"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/weedpb"
)
type VolumeInfo struct {
@ -19,18 +19,18 @@ type VolumeInfo struct {
ReadOnly bool
}
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi *VolumeInfo, err error) {
func NewVolumeInfo(m *weedpb.VolumeInformationMessage) (vi *VolumeInfo, err error) {
vi = &VolumeInfo{
Id: VolumeId(*m.Id),
Size: *m.Size,
Collection: *m.Collection,
FileCount: int(*m.FileCount),
DeleteCount: int(*m.DeleteCount),
DeletedByteCount: *m.DeletedByteCount,
ReadOnly: *m.ReadOnly,
Version: Version(*m.Version),
Id: VolumeId(m.Id),
Size: m.Size,
Collection: m.Collection,
FileCount: int(m.FileCount),
DeleteCount: int(m.DeleteCount),
DeletedByteCount: m.DeletedByteCount,
ReadOnly: m.ReadOnly,
Version: Version(m.Version),
}
vi.Ttl = LoadTTLFromUint32(*m.Ttl)
vi.Ttl = LoadTTLFromUint32(m.Ttl)
return vi, nil
}

3
go/topology/store_replicate.go

@ -41,6 +41,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
u := util.MkUrl(location.Url, r.URL.Path, args)
glog.V(4).Infoln("write replication to", u)
_, err := operation.Upload(u,
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
@ -85,7 +86,7 @@ func distributedOperation(masterNode string, store *storage.Store, volumeId stor
}
if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String(), collection); lookupErr == nil {
length := 0
selfUrl := net.JoinHostPort(store.Ip, strconv.Itoa(store.Port))
selfUrl := net.JoinHostPort(store.GetIP(), strconv.Itoa(store.Port))
results := make(chan bool)
for _, location := range lookupResult.Locations {
if location.Url != selfUrl {

73
go/topology/topology.go

@ -5,12 +5,15 @@ import (
"io/ioutil"
"math/rand"
"strconv"
"time"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/sequence"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/chrislusf/seaweedfs/go/weedpb"
)
type Topology struct {
@ -19,6 +22,7 @@ type Topology struct {
collectionMap *util.ConcurrentMap
pulse int64
volumeSizeLimit uint64
joinKey string
Sequence sequence.Sequencer
CollectionSettings *storage.CollectionSettings
configuration *Configuration
@ -39,6 +43,7 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.CollectionSettings = cs
t.ReGenJoinKey()
t.Sequence = seq
@ -86,6 +91,25 @@ func (t *Topology) Leader() (string, error) {
return l, nil
}
func (t *Topology) GetJoinKey() string {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.joinKey
}
func (t *Topology) ReGenJoinKey() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.joinKey = strconv.FormatInt(time.Now().UnixNano(), 16)
}
func (t *Topology) GetVolumeSizeLimit() uint64 {
// volumeSizeLimit is only for read
//t.mutex.RLock()
//defer t.mutex.RUnlock()
return t.volumeSizeLimit
}
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@ -159,18 +183,18 @@ func (t *Topology) UnRegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
func (t *Topology) ProcessJoinMessage(joinMessage *weedpb.JoinMessage) {
t.Sequence.SetMax(joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(joinMessage.Ip, joinMessage.DataCenter, joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
if *joinMessage.IsInit && dn != nil {
dn := rack.FindDataNode(joinMessage.Ip, int(joinMessage.Port))
if joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount))
dn = rack.GetOrCreateDataNode(joinMessage.Ip,
int(joinMessage.Port), joinMessage.PublicUrl,
int(joinMessage.MaxVolumeCount))
var volumeInfos []*storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
@ -190,6 +214,37 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
}
func (t *Topology) ProcessJoinMessageV2(joinMsgV2 *weedpb.JoinMessageV2) {
t.Sequence.SetMax(joinMsgV2.MaxFileKey)
dcName, rackName := t.configuration.Locate(joinMsgV2.Ip, joinMsgV2.DataCenter, joinMsgV2.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(joinMsgV2.Ip, int(joinMsgV2.Port))
if joinMsgV2.JoinKey == "" && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(joinMsgV2.Ip,
int(joinMsgV2.Port), joinMsgV2.PublicUrl,
int(joinMsgV2.MaxVolumeCount))
var volumeInfos []*storage.VolumeInfo
for _, v := range joinMsgV2.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
} else {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
n := t.GetChildren(NodeId(dcName))
if n != nil {

29
go/util/http_util.go

@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/golang/protobuf/proto"
"github.com/pierrec/lz4"
"strconv"
)
@ -56,6 +57,34 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return b, nil
}
func PostPbMsg(url string, msg proto.Message, ret proto.Message) error {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Header.Set("Accept", "application/protobuf")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("Post to %s: %v", url, err)
}
defer resp.Body.Close()
retBlob, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("Read response body: %v", err)
}
if err := proto.Unmarshal(retBlob, ret); err != nil {
glog.V(0).Infof("Failed to umarshal pb %s with response: %s", url, string(retBlob))
return err
}
return nil
}
func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) {
url := MkUrl(host, path, nil)
glog.V(4).Infoln("Post", url+"?"+values.Encode())

65
go/weed/weed_server/common.go

@ -4,19 +4,21 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"io/ioutil"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/stats"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/golang/protobuf/proto"
)
var serverStats *stats.ServerStats
@ -28,42 +30,57 @@ func init() {
}
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte
if r.FormValue("pretty") != "" {
bytes, err = json.MarshalIndent(obj, "", " ")
} else {
bytes, err = json.Marshal(obj)
}
func readObjRequest(r *http.Request, obj interface{}) error {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
return err
}
callback := r.FormValue("callback")
if callback == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpStatus)
_, err = w.Write(bytes)
if pbMsg, ok := obj.(proto.Message); ok && strings.Contains(r.Header.Get("Content-Type"), "protobuf") {
if err := proto.Unmarshal(body, pbMsg); err != nil {
return err
}
} else {
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(httpStatus)
if _, err = w.Write([]uint8(callback)); err != nil {
return
if err := json.Unmarshal(body, obj); err != nil {
return err
}
if _, err = w.Write([]uint8("(")); err != nil {
return
}
return nil
}
func writeObjResponse(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var (
bytes []byte
contentType string
)
if pbMsg, ok := obj.(proto.Message); ok && strings.Contains(r.Header.Get("Accept"), "protobuf") {
bytes, err = proto.Marshal(pbMsg)
contentType = "application/protobuf"
} else {
if r.FormValue("pretty") != "" {
bytes, err = json.MarshalIndent(obj, "", " ")
} else {
bytes, err = json.Marshal(obj)
}
fmt.Fprint(w, string(bytes))
if _, err = w.Write([]uint8(")")); err != nil {
return
if callback := r.FormValue("callback"); callback != "" {
contentType = "application/javascript"
bytes = []byte(callback + "(" + string(bytes) + ")")
} else {
contentType = "application/json"
}
}
if err != nil {
return
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(httpStatus)
_, err = w.Write(bytes)
return
}
// wrapper for writeJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil {
if err := writeObjResponse(w, r, httpStatus, obj); err != nil {
glog.V(0).Infof("error writing JSON %s: %v", obj, err)
}
}

1
go/weed/weed_server/master_server.go

@ -67,6 +67,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
r.HandleFunc("/dir/join2", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoin2Handler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))

52
go/weed/weed_server/master_server_handlers_admin.go

@ -14,10 +14,10 @@ import (
"net"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/topology"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/chrislusf/seaweedfs/go/weedpb"
"github.com/golang/protobuf/proto"
)
@ -37,23 +37,24 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
ms.Topo.DeleteCollection(r.FormValue("collection"))
}
// deprecated
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
joinMessage := &operation.JoinMessage{}
joinMessage := &weedpb.JoinMessage{}
if err = proto.Unmarshal(body, joinMessage); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
if *joinMessage.Ip == "" {
if joinMessage.Ip == "" {
if ip, _, e := net.SplitHostPort(r.RemoteAddr); e == nil {
*joinMessage.Ip = ip
joinMessage.Ip = ip
} else {
glog.V(2).Infof("SplitHostPort (%s) error, %v", r.RemoteAddr, e)
*joinMessage.Ip = r.RemoteAddr
joinMessage.Ip = r.RemoteAddr
}
}
if glog.V(4) {
@ -65,14 +66,51 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
}
}
ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"`
}
writeJsonQuiet(w, r, http.StatusOK, JoinResult{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
})
}
func (ms *MasterServer) dirJoin2Handler(w http.ResponseWriter, r *http.Request) {
joinResp := &weedpb.JoinResponse{}
joinMsgV2 := &weedpb.JoinMessageV2{}
if err := readObjRequest(r, joinMsgV2); err != nil {
joinResp.Error = err.Error()
writeObjResponse(w, r, http.StatusBadRequest, joinResp)
return
}
if joinMsgV2.Ip == "" {
if ip, _, e := net.SplitHostPort(r.RemoteAddr); e == nil {
joinMsgV2.Ip = ip
} else {
glog.V(2).Infof("SplitHostPort (%s) error, %v", r.RemoteAddr, e)
joinMsgV2.Ip = r.RemoteAddr
}
}
if glog.V(4) {
jsonData, _ := json.Marshal(joinMsgV2)
glog.V(4).Infoln("join proto:", string(jsonData))
}
ms.Topo.ProcessJoinMessageV2(joinMsgV2)
joinResp.JoinKey = ms.Topo.GetJoinKey()
if joinMsgV2.JoinKey != joinResp.JoinKey {
joinResp.JoinIp = joinMsgV2.Ip
joinResp.VolumeSizeLimit = ms.Topo.GetVolumeSizeLimit()
joinResp.SecretKey = string(ms.guard.SecretKey)
}
writeObjResponse(w, r, http.StatusOK, joinResp)
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION

2
go/weed/weed_server/volume_server_handlers_admin.go

@ -117,5 +117,5 @@ func (vs *VolumeServer) setVolumeOptionHandler(w http.ResponseWriter, r *http.Re
result["errors"] = errs
}
writeJson(w, r, http.StatusAccepted, result)
writeObjResponse(w, r, http.StatusAccepted, result)
}

2
go/proto/Makefile → go/weedpb/Makefile

@ -1,4 +1,4 @@
TARG=../operation
TARG=./
all:
protoc --go_out=$(TARG) system_message.proto

180
go/weedpb/system_message.pb.go

@ -0,0 +1,180 @@
// Code generated by protoc-gen-go.
// source: system_message.proto
// DO NOT EDIT!
/*
Package weedpb is a generated protocol buffer package.
It is generated from these files:
system_message.proto
It has these top-level messages:
VolumeInformationMessage
JoinMessage
JoinMessageV2
CollectionSetting
JoinResponse
*/
package weedpb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.ProtoPackageIsVersion1
type VolumeInformationMessage struct {
Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Size uint64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
FileCount uint64 `protobuf:"varint,4,opt,name=file_count,json=fileCount" json:"file_count,omitempty"`
DeleteCount uint64 `protobuf:"varint,5,opt,name=delete_count,json=deleteCount" json:"delete_count,omitempty"`
DeletedByteCount uint64 `protobuf:"varint,6,opt,name=deleted_byte_count,json=deletedByteCount" json:"deleted_byte_count,omitempty"`
ReadOnly bool `protobuf:"varint,7,opt,name=read_only,json=readOnly" json:"read_only,omitempty"`
ReplicaPlacement uint32 `protobuf:"varint,8,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"`
Version uint32 `protobuf:"varint,9,opt,name=version" json:"version,omitempty"`
Ttl uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
}
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeInformationMessage) ProtoMessage() {}
func (*VolumeInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
// deprecated
type JoinMessage struct {
IsInit bool `protobuf:"varint,1,opt,name=is_init,json=isInit" json:"is_init,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"`
Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"`
PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"`
MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"`
DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
AdminPort uint32 `protobuf:"varint,10,opt,name=admin_port,json=adminPort" json:"admin_port,omitempty"`
}
func (m *JoinMessage) Reset() { *m = JoinMessage{} }
func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
func (*JoinMessage) ProtoMessage() {}
func (*JoinMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
type JoinMessageV2 struct {
JoinKey string `protobuf:"bytes,1,opt,name=join_key,json=joinKey" json:"join_key,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"`
Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"`
PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"`
MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"`
DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
}
func (m *JoinMessageV2) Reset() { *m = JoinMessageV2{} }
func (m *JoinMessageV2) String() string { return proto.CompactTextString(m) }
func (*JoinMessageV2) ProtoMessage() {}
func (*JoinMessageV2) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *JoinMessageV2) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
type CollectionSetting struct {
Collection string `protobuf:"bytes,1,opt,name=collection" json:"collection,omitempty"`
ReplicaPlacement string `protobuf:"bytes,2,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"`
}
func (m *CollectionSetting) Reset() { *m = CollectionSetting{} }
func (m *CollectionSetting) String() string { return proto.CompactTextString(m) }
func (*CollectionSetting) ProtoMessage() {}
func (*CollectionSetting) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
type JoinResponse struct {
Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
JoinKey string `protobuf:"bytes,2,opt,name=join_key,json=joinKey" json:"join_key,omitempty"`
JoinIp string `protobuf:"bytes,3,opt,name=join_ip,json=joinIp" json:"join_ip,omitempty"`
VolumeSizeLimit uint64 `protobuf:"varint,4,opt,name=volume_size_limit,json=volumeSizeLimit" json:"volume_size_limit,omitempty"`
CollectionSettings []*CollectionSetting `protobuf:"bytes,5,rep,name=collection_settings,json=collectionSettings" json:"collection_settings,omitempty"`
SecretKey string `protobuf:"bytes,6,opt,name=secret_key,json=secretKey" json:"secret_key,omitempty"`
}
func (m *JoinResponse) Reset() { *m = JoinResponse{} }
func (m *JoinResponse) String() string { return proto.CompactTextString(m) }
func (*JoinResponse) ProtoMessage() {}
func (*JoinResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *JoinResponse) GetCollectionSettings() []*CollectionSetting {
if m != nil {
return m.CollectionSettings
}
return nil
}
func init() {
proto.RegisterType((*VolumeInformationMessage)(nil), "weedpb.VolumeInformationMessage")
proto.RegisterType((*JoinMessage)(nil), "weedpb.JoinMessage")
proto.RegisterType((*JoinMessageV2)(nil), "weedpb.JoinMessageV2")
proto.RegisterType((*CollectionSetting)(nil), "weedpb.CollectionSetting")
proto.RegisterType((*JoinResponse)(nil), "weedpb.JoinResponse")
}
var fileDescriptor0 = []byte{
// 593 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe4, 0x94, 0x5d, 0x6e, 0xd3, 0x40,
0x10, 0xc7, 0x15, 0xb7, 0x4d, 0xe2, 0x49, 0x53, 0xd2, 0xa5, 0x12, 0xae, 0x10, 0x50, 0xf2, 0x54,
0x01, 0x2a, 0x52, 0x79, 0xe3, 0xb1, 0x95, 0x90, 0x5a, 0x40, 0x54, 0x5b, 0xd1, 0x57, 0xe3, 0xd8,
0xd3, 0x6a, 0xa9, 0xed, 0xb5, 0x76, 0x37, 0x05, 0x73, 0x24, 0xee, 0x81, 0xc4, 0x6d, 0xb8, 0x02,
0xb3, 0xb3, 0x71, 0xe8, 0x07, 0x9c, 0x80, 0xb7, 0x99, 0xff, 0xcc, 0xda, 0x33, 0xbf, 0xd9, 0x59,
0xd8, 0xb2, 0xad, 0x75, 0x58, 0xa5, 0x15, 0x5a, 0x9b, 0x5d, 0xe0, 0x5e, 0x63, 0xb4, 0xd3, 0xa2,
0xff, 0x05, 0xb1, 0x68, 0x66, 0xd3, 0x9f, 0x11, 0x24, 0x67, 0xba, 0x9c, 0x57, 0x78, 0x54, 0x9f,
0x6b, 0x53, 0x65, 0x4e, 0xe9, 0xfa, 0x7d, 0x48, 0x15, 0x1b, 0x10, 0xa9, 0x22, 0xe9, 0xed, 0xf4,
0x76, 0xc7, 0x92, 0x2c, 0x21, 0x60, 0xd5, 0xaa, 0x6f, 0x98, 0x44, 0xa4, 0xac, 0x4a, 0xb6, 0xc5,
0x63, 0x80, 0x5c, 0x97, 0x25, 0xe6, 0xfe, 0x60, 0xb2, 0x42, 0x91, 0x58, 0x5e, 0x53, 0xc4, 0x23,
0x80, 0x73, 0x55, 0x62, 0x9a, 0xeb, 0x79, 0xed, 0x92, 0x55, 0x3e, 0x19, 0x7b, 0xe5, 0xd0, 0x0b,
0xe2, 0x29, 0xac, 0x17, 0x58, 0xa2, 0xeb, 0x12, 0xd6, 0x38, 0x61, 0x14, 0xb4, 0x90, 0xf2, 0x02,
0x44, 0x70, 0x8b, 0x74, 0xd6, 0x2e, 0x13, 0xfb, 0x9c, 0x38, 0x59, 0x44, 0x0e, 0xda, 0x2e, 0xfb,
0x21, 0xc4, 0x06, 0xb3, 0x22, 0xd5, 0x75, 0xd9, 0x26, 0x03, 0x4a, 0x1a, 0xca, 0xa1, 0x17, 0x3e,
0x90, 0x2f, 0x5e, 0xc2, 0xa6, 0xc1, 0xa6, 0x54, 0x79, 0x96, 0x36, 0x65, 0x96, 0x63, 0x85, 0xf4,
0xa5, 0xa1, 0xef, 0xef, 0x20, 0x4a, 0x7a, 0x72, 0xb2, 0x08, 0x9e, 0x74, 0x31, 0x91, 0xc0, 0xe0,
0x0a, 0x8d, 0xf5, 0xad, 0xc5, 0x8c, 0xa1, 0x73, 0xc5, 0x04, 0x56, 0x9c, 0x2b, 0x13, 0x60, 0xd5,
0x9b, 0xd3, 0x1f, 0x11, 0x8c, 0x8e, 0xb5, 0x5a, 0xd2, 0x7b, 0x00, 0x03, 0x65, 0x53, 0x55, 0x2b,
0xc7, 0x08, 0x87, 0xb2, 0xaf, 0xec, 0x11, 0x79, 0x8c, 0xb5, 0x61, 0x88, 0x31, 0x61, 0x6d, 0x3c,
0xd6, 0x46, 0x1b, 0xc7, 0xf0, 0xc6, 0x92, 0x6d, 0x8f, 0xad, 0x99, 0xcf, 0xa8, 0x98, 0x74, 0x6e,
0x4a, 0xc6, 0x16, 0xcb, 0x38, 0x28, 0x1f, 0x4d, 0x29, 0x76, 0x61, 0x52, 0x65, 0x5f, 0xd3, 0x2b,
0x9e, 0xdc, 0x35, 0x74, 0x63, 0xb9, 0x41, 0x7a, 0x18, 0x68, 0xe0, 0xb1, 0x03, 0xeb, 0x3e, 0x93,
0x67, 0x70, 0x89, 0xed, 0x82, 0x1b, 0x90, 0xf6, 0x86, 0xa4, 0xb7, 0xd8, 0x8a, 0x27, 0x30, 0x2a,
0x32, 0x97, 0xa5, 0x39, 0x35, 0x8c, 0x86, 0x99, 0xd1, 0x08, 0xbd, 0x74, 0xc8, 0x8a, 0xaf, 0xcf,
0x64, 0xf9, 0x25, 0x83, 0x8a, 0x25, 0xdb, 0xe2, 0x35, 0x81, 0xe1, 0xbf, 0x58, 0x02, 0xb3, 0xb2,
0x3b, 0xda, 0xdf, 0xd9, 0x0b, 0x37, 0x6a, 0xef, 0x5f, 0xb7, 0x49, 0x76, 0x07, 0x7c, 0x6f, 0x59,
0x51, 0xa9, 0x3a, 0xe5, 0xae, 0x03, 0xc1, 0x98, 0x95, 0x13, 0x12, 0xa6, 0xdf, 0x23, 0x18, 0x5f,
0xe3, 0x78, 0xb6, 0x2f, 0xb6, 0x61, 0xf8, 0x99, 0x04, 0xae, 0xbf, 0xc7, 0x45, 0x0c, 0xbc, 0xef,
0x8b, 0xff, 0xcf, 0x59, 0x4e, 0x3f, 0xc1, 0xe6, 0xe1, 0x72, 0xd9, 0x4e, 0xd1, 0x39, 0x55, 0x5f,
0xdc, 0xda, 0xc9, 0xde, 0x9d, 0x9d, 0x7c, 0xfe, 0xb7, 0x35, 0x08, 0x0c, 0xef, 0xac, 0xc0, 0xf4,
0x57, 0x0f, 0xd6, 0xfd, 0x38, 0x24, 0xda, 0x46, 0xd7, 0x16, 0xc5, 0x16, 0xac, 0xa1, 0x31, 0xda,
0x2c, 0x3e, 0x1c, 0x9c, 0x1b, 0x33, 0x8a, 0x6e, 0xce, 0x88, 0x16, 0x81, 0x43, 0x34, 0xa8, 0xf0,
0x3e, 0xf4, 0xbd, 0x7b, 0xd4, 0x88, 0x67, 0xb0, 0xb9, 0xa0, 0xee, 0x9f, 0x92, 0xb4, 0x54, 0x95,
0xea, 0x9e, 0x88, 0x7b, 0x21, 0x70, 0x4a, 0xfa, 0x3b, 0x2f, 0x8b, 0x63, 0xb8, 0xff, 0xa7, 0x83,
0xd4, 0x86, 0x4e, 0x2d, 0x0d, 0xca, 0x03, 0xdb, 0xee, 0x80, 0xdd, 0x61, 0x21, 0x45, 0x7e, 0x5b,
0xe2, 0x0b, 0x68, 0x31, 0x37, 0xe8, 0x96, 0x53, 0xa4, 0x0b, 0x11, 0x14, 0xaa, 0x77, 0xd6, 0xe7,
0x27, 0xf2, 0xd5, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdf, 0x7d, 0xe0, 0x05, 0x3a, 0x05, 0x00,
0x00,
}

61
go/weedpb/system_message.proto

@ -0,0 +1,61 @@
syntax = "proto3";
package weedpb;
message VolumeInformationMessage {
uint32 id = 1;
uint64 size = 2;
string collection = 3;
uint64 file_count = 4;
uint64 delete_count = 5;
uint64 deleted_byte_count = 6;
bool read_only = 7;
uint32 replica_placement = 8 [deprecated=true];
uint32 version = 9;
uint32 ttl = 10;
}
// deprecated
message JoinMessage {
bool is_init = 1;
string ip = 2;
uint32 port = 3;
string public_url = 4;
uint32 max_volume_count = 5;
uint64 max_file_key = 6;
string data_center = 7;
string rack = 8;
repeated VolumeInformationMessage volumes = 9;
uint32 admin_port = 10;
}
message JoinMessageV2 {
string join_key = 1; //if data node is init, set join key empty
string ip = 2;
uint32 port = 3;
string public_url = 4;
uint32 max_volume_count = 5;
uint64 max_file_key = 6;
string data_center = 7;
string rack = 8;
repeated VolumeInformationMessage volumes = 9;
}
message CollectionSetting {
string collection = 1;
string replica_placement = 2;
// float vacuum_garbage_threshold = 3;
}
message JoinResponse {
string error = 1;
string join_key = 2;
string join_ip = 3;
uint64 volume_size_limit = 4;
repeated CollectionSetting collection_settings = 5;
string secret_key = 6;
// repeated string master_peers = 7;
}

40
go/operation/system_message_test.go → go/weedpb/system_message_test.go

@ -1,4 +1,4 @@
package operation
package weedpb
import (
"encoding/json"
@ -10,28 +10,28 @@ import (
func TestSerialDeserial(t *testing.T) {
volumeMessage := &VolumeInformationMessage{
Id: proto.Uint32(12),
Size: proto.Uint64(2341234),
Collection: proto.String("benchmark"),
FileCount: proto.Uint64(2341234),
DeleteCount: proto.Uint64(234),
DeletedByteCount: proto.Uint64(21234),
ReadOnly: proto.Bool(false),
ReplicaPlacement: proto.Uint32(210),
Version: proto.Uint32(2),
Id: 12,
Size: 2341234,
Collection: "benchmark",
FileCount: 2341234,
DeleteCount: 234,
DeletedByteCount: 21234,
ReadOnly: false,
ReplicaPlacement: 210,
Version: 2,
}
var volumeMessages []*VolumeInformationMessage
volumeMessages = append(volumeMessages, volumeMessage)
joinMessage := &JoinMessage{
IsInit: proto.Bool(true),
Ip: proto.String("127.0.3.12"),
Port: proto.Uint32(34546),
PublicUrl: proto.String("localhost:2342"),
MaxVolumeCount: proto.Uint32(210),
MaxFileKey: proto.Uint64(324234423),
DataCenter: proto.String("dc1"),
Rack: proto.String("rack2"),
IsInit: true,
Ip: "127.0.3.12",
Port: 34546,
PublicUrl: "localhost:2342",
MaxVolumeCount: 210,
MaxFileKey: 324234423,
DataCenter: "dc1",
Rack: "rack2",
Volumes: volumeMessages,
}
@ -53,7 +53,7 @@ func TestSerialDeserial(t *testing.T) {
log.Println("The json data size is", len(jsonData), string(jsonData))
// Now test and newTest contain the same data.
if *joinMessage.PublicUrl != *newMessage.PublicUrl {
log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
if joinMessage.PublicUrl != newMessage.PublicUrl {
log.Fatalf("data mismatch %q != %q", joinMessage.PublicUrl, newMessage.PublicUrl)
}
}
Loading…
Cancel
Save