Browse Source

Change to protocol buffer for volume-join-masster message

Reduced size to about 1/5 of the previous json format message
pull/2/head
Chris Lu 11 years ago
parent
commit
1818a2a2da
  1. 189
      go/operation/system_message.pb.go
  2. 58
      go/operation/system_message_test.go
  3. 4
      go/proto/Makefile
  4. 25
      go/proto/system_message.proto
  5. 60
      go/storage/store.go
  6. 23
      go/storage/volume_info.go
  7. 21
      go/topology/topology.go
  8. 16
      go/util/http_util.go
  9. 40
      go/weed/weed_server/master_server_handlers_admin.go

189
go/operation/system_message.pb.go

@ -0,0 +1,189 @@
// Code generated by protoc-gen-go.
// source: system_message.proto
// DO NOT EDIT!
/*
Package operation is a generated protocol buffer package.
It is generated from these files:
system_message.proto
It has these top-level messages:
VolumeInformationMessage
JoinMessage
*/
package operation
import proto "code.google.com/p/goprotobuf/proto"
import json "encoding/json"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type VolumeInformationMessage struct {
Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeInformationMessage) ProtoMessage() {}
const Default_VolumeInformationMessage_Version uint32 = 2
func (m *VolumeInformationMessage) GetId() uint32 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *VolumeInformationMessage) GetSize() uint64 {
if m != nil && m.Size != nil {
return *m.Size
}
return 0
}
func (m *VolumeInformationMessage) GetCollection() string {
if m != nil && m.Collection != nil {
return *m.Collection
}
return ""
}
func (m *VolumeInformationMessage) GetFileCount() uint64 {
if m != nil && m.FileCount != nil {
return *m.FileCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
if m != nil && m.DeleteCount != nil {
return *m.DeleteCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
if m != nil && m.DeletedByteCount != nil {
return *m.DeletedByteCount
}
return 0
}
func (m *VolumeInformationMessage) GetReadOnly() bool {
if m != nil && m.ReadOnly != nil {
return *m.ReadOnly
}
return false
}
func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
if m != nil && m.ReplicaPlacement != nil {
return *m.ReplicaPlacement
}
return 0
}
func (m *VolumeInformationMessage) GetVersion() uint32 {
if m != nil && m.Version != nil {
return *m.Version
}
return Default_VolumeInformationMessage_Version
}
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinMessage) Reset() { *m = JoinMessage{} }
func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
func (*JoinMessage) ProtoMessage() {}
func (m *JoinMessage) GetIsInit() bool {
if m != nil && m.IsInit != nil {
return *m.IsInit
}
return false
}
func (m *JoinMessage) GetIp() string {
if m != nil && m.Ip != nil {
return *m.Ip
}
return ""
}
func (m *JoinMessage) GetPort() uint32 {
if m != nil && m.Port != nil {
return *m.Port
}
return 0
}
func (m *JoinMessage) GetPublicUrl() string {
if m != nil && m.PublicUrl != nil {
return *m.PublicUrl
}
return ""
}
func (m *JoinMessage) GetMaxVolumeCount() uint32 {
if m != nil && m.MaxVolumeCount != nil {
return *m.MaxVolumeCount
}
return 0
}
func (m *JoinMessage) GetMaxFileKey() uint64 {
if m != nil && m.MaxFileKey != nil {
return *m.MaxFileKey
}
return 0
}
func (m *JoinMessage) GetDataCenter() string {
if m != nil && m.DataCenter != nil {
return *m.DataCenter
}
return ""
}
func (m *JoinMessage) GetRack() string {
if m != nil && m.Rack != nil {
return *m.Rack
}
return ""
}
func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
func init() {
}

58
go/operation/system_message_test.go

@ -0,0 +1,58 @@
package operation
import (
proto "code.google.com/p/goprotobuf/proto"
"encoding/json"
"log"
"testing"
)
func TestSerialDeserial(t *testing.T) {
volumeMessage := &VolumeInformationMessage{
Id: proto.Uint32(12),
Size: proto.Uint64(2341234),
Collection: proto.String("benchmark"),
FileCount: proto.Uint64(2341234),
DeleteCount: proto.Uint64(234),
DeletedByteCount: proto.Uint64(21234),
ReadOnly: proto.Bool(false),
ReplicaPlacement: proto.Uint32(210),
Version: proto.Uint32(2),
}
var volumeMessages []*VolumeInformationMessage
volumeMessages = append(volumeMessages, volumeMessage)
joinMessage := &JoinMessage{
IsInit: proto.Bool(true),
Ip: proto.String("127.0.3.12"),
Port: proto.Uint32(34546),
PublicUrl: proto.String("localhost:2342"),
MaxVolumeCount: proto.Uint32(210),
MaxFileKey: proto.Uint64(324234423),
DataCenter: proto.String("dc1"),
Rack: proto.String("rack2"),
Volumes: volumeMessages,
}
data, err := proto.Marshal(joinMessage)
if err != nil {
log.Fatal("marshaling error: ", err)
}
newMessage := &JoinMessage{}
err = proto.Unmarshal(data, newMessage)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}
log.Println("The pb data size is", len(data))
jsonData, jsonError := json.Marshal(joinMessage)
if jsonError != nil {
log.Fatal("json marshaling error: ", jsonError)
}
log.Println("The json data size is", len(jsonData), string(jsonData))
// Now test and newTest contain the same data.
if *joinMessage.PublicUrl != *newMessage.PublicUrl {
log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
}
}

4
go/proto/Makefile

@ -0,0 +1,4 @@
TARG=../operation
all:
protoc --go_out=$(TARG) system_message.proto

25
go/proto/system_message.proto

@ -0,0 +1,25 @@
package operation;
message VolumeInformationMessage {
required uint32 id = 1;
required uint64 size = 2;
optional string collection = 3;
required uint64 file_count = 4;
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
}
message JoinMessage {
optional bool is_init = 1;
required string ip = 2;
required uint32 port = 3;
optional string public_url = 4;
required uint32 max_volume_count = 5;
required uint64 max_file_key = 6;
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
}

60
go/storage/store.go

@ -1,6 +1,7 @@
package storage
import (
proto "code.google.com/p/goprotobuf/proto"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
@ -9,7 +10,6 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net/url"
"strconv"
"strings"
)
@ -269,40 +269,50 @@ func (s *Store) Join() (masterNode string, e error) {
if e != nil {
return
}
stats := new([]*VolumeInfo)
var volumeMessages []*operation.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s)
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(k)),
Size: proto.Uint64(uint64(v.Size())),
Collection: proto.String(v.Collection),
FileCount: proto.Uint64(uint64(v.nm.FileCount())),
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.readOnly),
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
Version: proto.Uint32(uint32(v.Version())),
}
volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
}
}
bytes, _ := json.Marshal(stats)
values := make(url.Values)
if !s.connected {
values.Add("init", "true")
}
values.Add("port", strconv.Itoa(s.Port))
values.Add("ip", s.Ip)
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)
joinMessage := &operation.JoinMessage{
IsInit: proto.Bool(!s.connected),
Ip: proto.String(s.Ip),
Port: proto.Uint32(uint32(s.Port)),
PublicUrl: proto.String(s.PublicUrl),
MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
MaxFileKey: proto.Uint64(maxFileKey),
DataCenter: proto.String(s.dataCenter),
Rack: proto.String(s.rack),
Volumes: volumeMessages,
}
data, err := proto.Marshal(joinMessage)
if err != nil {
return "", err
}
println("join data size", len(data))
jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
if err != nil {
s.masterNodes.reset()
return "", err

23
go/storage/volume_info.go

@ -1,6 +1,8 @@
package storage
import ()
import (
"code.google.com/p/weed-fs/go/operation"
)
type VolumeInfo struct {
Id VolumeId
@ -13,3 +15,22 @@ type VolumeInfo struct {
DeletedByteCount uint64
ReadOnly bool
}
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
vi = VolumeInfo{
Id: VolumeId(*m.Id),
Size: *m.Size,
Collection: *m.Collection,
FileCount: int(*m.FileCount),
DeleteCount: int(*m.DeleteCount),
DeletedByteCount: *m.DeletedByteCount,
ReadOnly: *m.ReadOnly,
Version: Version(*m.Version),
}
rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
if e != nil {
return vi, e
}
vi.ReplicaPlacement = rp
return vi, nil
}

21
go/topology/topology.go

@ -2,6 +2,7 @@ package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
@ -143,16 +144,24 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) {
t.Sequence.SetMax(maxFileKey)
dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(ip, port)
if init && dn != nil {
dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
} else {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)

16
go/util/http_util.go

@ -1,6 +1,7 @@
package util
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"fmt"
"io/ioutil"
@ -21,6 +22,21 @@ func init() {
client = &http.Client{Transport: Transport}
}
func PostBytes(url string, body []byte) ([]byte, error) {
r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
glog.V(0).Infoln("read post result from", url, err)
return nil, err
}
return b, nil
}
func Post(url string, values url.Values) ([]byte, error) {
r, err := client.PostForm(url, values)
if err != nil {

40
go/weed/weed_server/master_server_handlers_admin.go

@ -1,12 +1,15 @@
package weed_server
import (
proto "code.google.com/p/goprotobuf/proto"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"strconv"
"strings"
@ -29,23 +32,30 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
}
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
init := r.FormValue("init") == "true"
ip := r.FormValue("ip")
if ip == "" {
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
}
port, _ := strconv.Atoi(r.FormValue("port"))
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64)
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil {
writeJsonQuiet(w, r, operation.JoinResult{Error: "Cannot unmarshal \"volumes\": " + err.Error()})
body, err := ioutil.ReadAll(r.Body)
if err != nil {
writeJsonError(w, r, err)
return
}
joinMessage := &operation.JoinMessage{}
if err = proto.Unmarshal(body, joinMessage); err != nil {
writeJsonError(w, r, err)
return
}
debug(s, "volumes", r.FormValue("volumes"))
ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack"))
if *joinMessage.Ip == "" {
*joinMessage.Ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
}
if glog.V(4) {
if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
glog.V(0).Infoln("json marshaling error: ", jsonError)
writeJsonError(w, r, jsonError)
return
} else {
glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
}
}
ms.Topo.RegisterVolumes(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}

Loading…
Cancel
Save