From 1818a2a2da412710f57d58f0228009f782b760af Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 21 Apr 2014 02:11:10 -0700 Subject: [PATCH] Change to protocol buffer for volume-join-masster message Reduced size to about 1/5 of the previous json format message --- go/operation/system_message.pb.go | 189 ++++++++++++++++++ go/operation/system_message_test.go | 58 ++++++ go/proto/Makefile | 4 + go/proto/system_message.proto | 25 +++ go/storage/store.go | 60 +++--- go/storage/volume_info.go | 23 ++- go/topology/topology.go | 21 +- go/util/http_util.go | 16 ++ .../master_server_handlers_admin.go | 40 ++-- 9 files changed, 389 insertions(+), 47 deletions(-) create mode 100644 go/operation/system_message.pb.go create mode 100644 go/operation/system_message_test.go create mode 100644 go/proto/Makefile create mode 100644 go/proto/system_message.proto diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go new file mode 100644 index 000000000..45ae8a648 --- /dev/null +++ b/go/operation/system_message.pb.go @@ -0,0 +1,189 @@ +// Code generated by protoc-gen-go. +// source: system_message.proto +// DO NOT EDIT! + +/* +Package operation is a generated protocol buffer package. + +It is generated from these files: + system_message.proto + +It has these top-level messages: + VolumeInformationMessage + JoinMessage +*/ +package operation + +import proto "code.google.com/p/goprotobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type VolumeInformationMessage struct { + Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` + Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` + Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` + DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` + DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` + ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` + ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` + Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } +func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } +func (*VolumeInformationMessage) ProtoMessage() {} + +const Default_VolumeInformationMessage_Version uint32 = 2 + +func (m *VolumeInformationMessage) GetId() uint32 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + +func (m *VolumeInformationMessage) GetSize() uint64 { + if m != nil && m.Size != nil { + return *m.Size + } + return 0 +} + +func (m *VolumeInformationMessage) GetCollection() string { + if m != nil && m.Collection != nil { + return *m.Collection + } + return "" +} + +func (m *VolumeInformationMessage) GetFileCount() uint64 { + if m != nil && m.FileCount != nil { + return *m.FileCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeleteCount() uint64 { + if m != nil && m.DeleteCount != nil { + return *m.DeleteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { + if m != nil && m.DeletedByteCount != nil { + return *m.DeletedByteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetReadOnly() bool { + if m != nil && m.ReadOnly != nil { + return *m.ReadOnly + } + return false +} + +func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { + if m != nil && m.ReplicaPlacement != nil { + return *m.ReplicaPlacement + } + return 0 +} + +func (m *VolumeInformationMessage) GetVersion() uint32 { + if m != nil && m.Version != nil { + return *m.Version + } + return Default_VolumeInformationMessage_Version +} + +type JoinMessage struct { + IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` + Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` + Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` + PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` + MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` + MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` + DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` + Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` + Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *JoinMessage) Reset() { *m = JoinMessage{} } +func (m *JoinMessage) String() string { return proto.CompactTextString(m) } +func (*JoinMessage) ProtoMessage() {} + +func (m *JoinMessage) GetIsInit() bool { + if m != nil && m.IsInit != nil { + return *m.IsInit + } + return false +} + +func (m *JoinMessage) GetIp() string { + if m != nil && m.Ip != nil { + return *m.Ip + } + return "" +} + +func (m *JoinMessage) GetPort() uint32 { + if m != nil && m.Port != nil { + return *m.Port + } + return 0 +} + +func (m *JoinMessage) GetPublicUrl() string { + if m != nil && m.PublicUrl != nil { + return *m.PublicUrl + } + return "" +} + +func (m *JoinMessage) GetMaxVolumeCount() uint32 { + if m != nil && m.MaxVolumeCount != nil { + return *m.MaxVolumeCount + } + return 0 +} + +func (m *JoinMessage) GetMaxFileKey() uint64 { + if m != nil && m.MaxFileKey != nil { + return *m.MaxFileKey + } + return 0 +} + +func (m *JoinMessage) GetDataCenter() string { + if m != nil && m.DataCenter != nil { + return *m.DataCenter + } + return "" +} + +func (m *JoinMessage) GetRack() string { + if m != nil && m.Rack != nil { + return *m.Rack + } + return "" +} + +func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { + if m != nil { + return m.Volumes + } + return nil +} + +func init() { +} diff --git a/go/operation/system_message_test.go b/go/operation/system_message_test.go new file mode 100644 index 000000000..2731d0b2f --- /dev/null +++ b/go/operation/system_message_test.go @@ -0,0 +1,58 @@ +package operation + +import ( + proto "code.google.com/p/goprotobuf/proto" + "encoding/json" + "log" + "testing" +) + +func TestSerialDeserial(t *testing.T) { + volumeMessage := &VolumeInformationMessage{ + Id: proto.Uint32(12), + Size: proto.Uint64(2341234), + Collection: proto.String("benchmark"), + FileCount: proto.Uint64(2341234), + DeleteCount: proto.Uint64(234), + DeletedByteCount: proto.Uint64(21234), + ReadOnly: proto.Bool(false), + ReplicaPlacement: proto.Uint32(210), + Version: proto.Uint32(2), + } + var volumeMessages []*VolumeInformationMessage + volumeMessages = append(volumeMessages, volumeMessage) + + joinMessage := &JoinMessage{ + IsInit: proto.Bool(true), + Ip: proto.String("127.0.3.12"), + Port: proto.Uint32(34546), + PublicUrl: proto.String("localhost:2342"), + MaxVolumeCount: proto.Uint32(210), + MaxFileKey: proto.Uint64(324234423), + DataCenter: proto.String("dc1"), + Rack: proto.String("rack2"), + Volumes: volumeMessages, + } + + data, err := proto.Marshal(joinMessage) + if err != nil { + log.Fatal("marshaling error: ", err) + } + newMessage := &JoinMessage{} + err = proto.Unmarshal(data, newMessage) + if err != nil { + log.Fatal("unmarshaling error: ", err) + } + log.Println("The pb data size is", len(data)) + + jsonData, jsonError := json.Marshal(joinMessage) + if jsonError != nil { + log.Fatal("json marshaling error: ", jsonError) + } + log.Println("The json data size is", len(jsonData), string(jsonData)) + + // Now test and newTest contain the same data. + if *joinMessage.PublicUrl != *newMessage.PublicUrl { + log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl) + } +} diff --git a/go/proto/Makefile b/go/proto/Makefile new file mode 100644 index 000000000..73af851dd --- /dev/null +++ b/go/proto/Makefile @@ -0,0 +1,4 @@ +TARG=../operation + +all: + protoc --go_out=$(TARG) system_message.proto diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto new file mode 100644 index 000000000..15574ad56 --- /dev/null +++ b/go/proto/system_message.proto @@ -0,0 +1,25 @@ +package operation; + +message VolumeInformationMessage { + required uint32 id = 1; + required uint64 size = 2; + optional string collection = 3; + required uint64 file_count = 4; + required uint64 delete_count = 5; + required uint64 deleted_byte_count = 6; + optional bool read_only = 7; + required uint32 replica_placement = 8; + optional uint32 version = 9 [default=2]; +} + +message JoinMessage { + optional bool is_init = 1; + required string ip = 2; + required uint32 port = 3; + optional string public_url = 4; + required uint32 max_volume_count = 5; + required uint64 max_file_key = 6; + optional string data_center = 7; + optional string rack = 8; + repeated VolumeInformationMessage volumes = 9; +} diff --git a/go/storage/store.go b/go/storage/store.go index 8fe8f2c4c..54764cc56 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -1,6 +1,7 @@ package storage import ( + proto "code.google.com/p/goprotobuf/proto" "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/util" @@ -9,7 +10,6 @@ import ( "fmt" "io/ioutil" "math/rand" - "net/url" "strconv" "strings" ) @@ -269,40 +269,50 @@ func (s *Store) Join() (masterNode string, e error) { if e != nil { return } - stats := new([]*VolumeInfo) + var volumeMessages []*operation.VolumeInformationMessage maxVolumeCount := 0 var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { - s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), - Collection: v.Collection, - ReplicaPlacement: v.ReplicaPlacement, - Version: v.Version(), - FileCount: v.nm.FileCount(), - DeleteCount: v.nm.DeletedCount(), - DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly} - *stats = append(*stats, s) + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + } + volumeMessages = append(volumeMessages, volumeMessage) if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } } } - bytes, _ := json.Marshal(stats) - values := make(url.Values) - if !s.connected { - values.Add("init", "true") - } - values.Add("port", strconv.Itoa(s.Port)) - values.Add("ip", s.Ip) - values.Add("publicUrl", s.PublicUrl) - values.Add("volumes", string(bytes)) - values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) - values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10)) - values.Add("dataCenter", s.dataCenter) - values.Add("rack", s.rack) - jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values) + + joinMessage := &operation.JoinMessage{ + IsInit: proto.Bool(!s.connected), + Ip: proto.String(s.Ip), + Port: proto.Uint32(uint32(s.Port)), + PublicUrl: proto.String(s.PublicUrl), + MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), + MaxFileKey: proto.Uint64(maxFileKey), + DataCenter: proto.String(s.dataCenter), + Rack: proto.String(s.rack), + Volumes: volumeMessages, + } + + data, err := proto.Marshal(joinMessage) + if err != nil { + return "", err + } + + println("join data size", len(data)) + + jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) if err != nil { s.masterNodes.reset() return "", err diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 1dfb3dcae..165af1a19 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -1,6 +1,8 @@ package storage -import () +import ( + "code.google.com/p/weed-fs/go/operation" +) type VolumeInfo struct { Id VolumeId @@ -13,3 +15,22 @@ type VolumeInfo struct { DeletedByteCount uint64 ReadOnly bool } + +func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { + vi = VolumeInfo{ + Id: VolumeId(*m.Id), + Size: *m.Size, + Collection: *m.Collection, + FileCount: int(*m.FileCount), + DeleteCount: int(*m.DeleteCount), + DeletedByteCount: *m.DeletedByteCount, + ReadOnly: *m.ReadOnly, + Version: Version(*m.Version), + } + rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) + if e != nil { + return vi, e + } + vi.ReplicaPlacement = rp + return vi, nil +} diff --git a/go/topology/topology.go b/go/topology/topology.go index 9db3e78ae..f1daffb53 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -2,6 +2,7 @@ package topology import ( "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "errors" @@ -143,16 +144,24 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) { - t.Sequence.SetMax(maxFileKey) - dcName, rackName = t.configuration.Locate(ip, dcName, rackName) +func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { + t.Sequence.SetMax(*joinMessage.MaxFileKey) + dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(ip, port) - if init && dn != nil { + dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) + if *joinMessage.IsInit && dn != nil { t.UnRegisterDataNode(dn) } - dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount)) + var volumeInfos []storage.VolumeInfo + for _, v := range joinMessage.Volumes { + if vi, err := storage.NewVolumeInfo(v); err == nil { + volumeInfos = append(volumeInfos, vi) + } else { + glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) + } + } dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) diff --git a/go/util/http_util.go b/go/util/http_util.go index a33db9199..6562e964c 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -1,6 +1,7 @@ package util import ( + "bytes" "code.google.com/p/weed-fs/go/glog" "fmt" "io/ioutil" @@ -21,6 +22,21 @@ func init() { client = &http.Client{Transport: Transport} } +func PostBytes(url string, body []byte) ([]byte, error) { + r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) + if err != nil { + glog.V(0).Infoln(err) + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + glog.V(0).Infoln("read post result from", url, err) + return nil, err + } + return b, nil +} + func Post(url string, values url.Values) ([]byte, error) { r, err := client.PostForm(url, values) if err != nil { diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index e549a1dfb..bd6747e99 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -1,12 +1,15 @@ package weed_server import ( + proto "code.google.com/p/goprotobuf/proto" + "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" + "io/ioutil" "net/http" "strconv" "strings" @@ -29,23 +32,30 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R } func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { - writeJsonQuiet(w, r, operation.JoinResult{Error: "Cannot unmarshal \"volumes\": " + err.Error()}) + body, err := ioutil.ReadAll(r.Body) + if err != nil { + writeJsonError(w, r, err) + return + } + joinMessage := &operation.JoinMessage{} + if err = proto.Unmarshal(body, joinMessage); err != nil { + writeJsonError(w, r, err) return } - debug(s, "volumes", r.FormValue("volumes")) - ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack")) + if *joinMessage.Ip == "" { + *joinMessage.Ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + if glog.V(4) { + if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil { + glog.V(0).Infoln("json marshaling error: ", jsonError) + writeJsonError(w, r, jsonError) + return + } else { + glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) + } + } + + ms.Topo.RegisterVolumes(joinMessage) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) }