Browse Source

migrate volume sync to gRpc

pull/753/head
Chris Lu 6 years ago
parent
commit
eec951cad2
  1. 25
      weed/operation/sync_volume.go
  2. 23
      weed/pb/volume_server.proto
  3. 283
      weed/pb/volume_server_pb/volume_server.pb.go
  4. 62
      weed/server/volume_grpc_sync.go
  5. 2
      weed/server/volume_server.go
  6. 20
      weed/server/volume_server_handlers_admin.go
  7. 74
      weed/server/volume_server_handlers_sync.go
  8. 39
      weed/storage/volume_sync.go

25
weed/operation/sync_volume.go

@ -2,7 +2,6 @@ package operation
import (
"context"
"net/url"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@ -21,18 +20,26 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
return
}
func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
values := make(url.Values)
values.Add("volume", vid)
line := make([]byte, NeedleEntrySize)
err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) {
func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})
if err != nil {
return err
}
dataSize := len(resp.IndexFileContent)
for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
line := resp.IndexFileContent[idx : idx+NeedleEntrySize]
key := BytesToNeedleId(line[:NeedleIdSize])
offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])
eachEntryFn(key, offset, size)
})
if err != nil {
return err
}
return nil
})
}

23
weed/pb/volume_server.proto

@ -21,8 +21,13 @@ service VolumeServer {
}
rpc AssignVolume (AssignVolumeRequest) returns (AssignVolumeResponse) {
}
rpc VolumeSyncStatus (VolumeSyncStatusRequest) returns (VolumeSyncStatusResponse) {
}
rpc VolumeSyncIndex (VolumeSyncIndexRequest) returns (VolumeSyncIndexResponse) {
}
rpc VolumeSyncData (VolumeSyncDataRequest) returns (VolumeSyncDataResponse) {
}
rpc VolumeMount (VolumeMountRequest) returns (VolumeMountResponse) {
}
@ -104,6 +109,24 @@ message VolumeSyncStatusResponse {
uint64 idx_file_size = 8;
}
message VolumeSyncIndexRequest {
uint32 volumd_id = 1;
}
message VolumeSyncIndexResponse {
bytes index_file_content = 1;
}
message VolumeSyncDataRequest {
uint32 volumd_id = 1;
uint32 revision = 2;
uint32 offset = 3;
uint32 size = 4;
string needle_id = 5;
}
message VolumeSyncDataResponse {
bytes file_content = 1;
}
message VolumeMountRequest {
uint32 volumd_id = 1;
}

283
weed/pb/volume_server_pb/volume_server.pb.go

@ -27,6 +27,10 @@ It has these top-level messages:
AssignVolumeResponse
VolumeSyncStatusRequest
VolumeSyncStatusResponse
VolumeSyncIndexRequest
VolumeSyncIndexResponse
VolumeSyncDataRequest
VolumeSyncDataResponse
VolumeMountRequest
VolumeMountResponse
VolumeUnmountRequest
@ -398,6 +402,102 @@ func (m *VolumeSyncStatusResponse) GetIdxFileSize() uint64 {
return 0
}
type VolumeSyncIndexRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
}
func (m *VolumeSyncIndexRequest) Reset() { *m = VolumeSyncIndexRequest{} }
func (m *VolumeSyncIndexRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeSyncIndexRequest) ProtoMessage() {}
func (*VolumeSyncIndexRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
func (m *VolumeSyncIndexRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
type VolumeSyncIndexResponse struct {
IndexFileContent []byte `protobuf:"bytes,1,opt,name=index_file_content,json=indexFileContent,proto3" json:"index_file_content,omitempty"`
}
func (m *VolumeSyncIndexResponse) Reset() { *m = VolumeSyncIndexResponse{} }
func (m *VolumeSyncIndexResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeSyncIndexResponse) ProtoMessage() {}
func (*VolumeSyncIndexResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} }
func (m *VolumeSyncIndexResponse) GetIndexFileContent() []byte {
if m != nil {
return m.IndexFileContent
}
return nil
}
type VolumeSyncDataRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
Revision uint32 `protobuf:"varint,2,opt,name=revision" json:"revision,omitempty"`
Offset uint32 `protobuf:"varint,3,opt,name=offset" json:"offset,omitempty"`
Size uint32 `protobuf:"varint,4,opt,name=size" json:"size,omitempty"`
NeedleId string `protobuf:"bytes,5,opt,name=needle_id,json=needleId" json:"needle_id,omitempty"`
}
func (m *VolumeSyncDataRequest) Reset() { *m = VolumeSyncDataRequest{} }
func (m *VolumeSyncDataRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeSyncDataRequest) ProtoMessage() {}
func (*VolumeSyncDataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} }
func (m *VolumeSyncDataRequest) GetVolumdId() uint32 {
if m != nil {
return m.VolumdId
}
return 0
}
func (m *VolumeSyncDataRequest) GetRevision() uint32 {
if m != nil {
return m.Revision
}
return 0
}
func (m *VolumeSyncDataRequest) GetOffset() uint32 {
if m != nil {
return m.Offset
}
return 0
}
func (m *VolumeSyncDataRequest) GetSize() uint32 {
if m != nil {
return m.Size
}
return 0
}
func (m *VolumeSyncDataRequest) GetNeedleId() string {
if m != nil {
return m.NeedleId
}
return ""
}
type VolumeSyncDataResponse struct {
FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"`
}
func (m *VolumeSyncDataResponse) Reset() { *m = VolumeSyncDataResponse{} }
func (m *VolumeSyncDataResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeSyncDataResponse) ProtoMessage() {}
func (*VolumeSyncDataResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} }
func (m *VolumeSyncDataResponse) GetFileContent() []byte {
if m != nil {
return m.FileContent
}
return nil
}
type VolumeMountRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
}
@ -405,7 +505,7 @@ type VolumeMountRequest struct {
func (m *VolumeMountRequest) Reset() { *m = VolumeMountRequest{} }
func (m *VolumeMountRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeMountRequest) ProtoMessage() {}
func (*VolumeMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
func (*VolumeMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} }
func (m *VolumeMountRequest) GetVolumdId() uint32 {
if m != nil {
@ -420,7 +520,7 @@ type VolumeMountResponse struct {
func (m *VolumeMountResponse) Reset() { *m = VolumeMountResponse{} }
func (m *VolumeMountResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeMountResponse) ProtoMessage() {}
func (*VolumeMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} }
func (*VolumeMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} }
type VolumeUnmountRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
@ -429,7 +529,7 @@ type VolumeUnmountRequest struct {
func (m *VolumeUnmountRequest) Reset() { *m = VolumeUnmountRequest{} }
func (m *VolumeUnmountRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeUnmountRequest) ProtoMessage() {}
func (*VolumeUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} }
func (*VolumeUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} }
func (m *VolumeUnmountRequest) GetVolumdId() uint32 {
if m != nil {
@ -444,7 +544,7 @@ type VolumeUnmountResponse struct {
func (m *VolumeUnmountResponse) Reset() { *m = VolumeUnmountResponse{} }
func (m *VolumeUnmountResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeUnmountResponse) ProtoMessage() {}
func (*VolumeUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} }
func (*VolumeUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} }
func init() {
proto.RegisterType((*BatchDeleteRequest)(nil), "volume_server_pb.BatchDeleteRequest")
@ -465,6 +565,10 @@ func init() {
proto.RegisterType((*AssignVolumeResponse)(nil), "volume_server_pb.AssignVolumeResponse")
proto.RegisterType((*VolumeSyncStatusRequest)(nil), "volume_server_pb.VolumeSyncStatusRequest")
proto.RegisterType((*VolumeSyncStatusResponse)(nil), "volume_server_pb.VolumeSyncStatusResponse")
proto.RegisterType((*VolumeSyncIndexRequest)(nil), "volume_server_pb.VolumeSyncIndexRequest")
proto.RegisterType((*VolumeSyncIndexResponse)(nil), "volume_server_pb.VolumeSyncIndexResponse")
proto.RegisterType((*VolumeSyncDataRequest)(nil), "volume_server_pb.VolumeSyncDataRequest")
proto.RegisterType((*VolumeSyncDataResponse)(nil), "volume_server_pb.VolumeSyncDataResponse")
proto.RegisterType((*VolumeMountRequest)(nil), "volume_server_pb.VolumeMountRequest")
proto.RegisterType((*VolumeMountResponse)(nil), "volume_server_pb.VolumeMountResponse")
proto.RegisterType((*VolumeUnmountRequest)(nil), "volume_server_pb.VolumeUnmountRequest")
@ -491,6 +595,8 @@ type VolumeServerClient interface {
DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error)
AssignVolume(ctx context.Context, in *AssignVolumeRequest, opts ...grpc.CallOption) (*AssignVolumeResponse, error)
VolumeSyncStatus(ctx context.Context, in *VolumeSyncStatusRequest, opts ...grpc.CallOption) (*VolumeSyncStatusResponse, error)
VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (*VolumeSyncIndexResponse, error)
VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (*VolumeSyncDataResponse, error)
VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error)
VolumeUnmount(ctx context.Context, in *VolumeUnmountRequest, opts ...grpc.CallOption) (*VolumeUnmountResponse, error)
}
@ -575,6 +681,24 @@ func (c *volumeServerClient) VolumeSyncStatus(ctx context.Context, in *VolumeSyn
return out, nil
}
func (c *volumeServerClient) VolumeSyncIndex(ctx context.Context, in *VolumeSyncIndexRequest, opts ...grpc.CallOption) (*VolumeSyncIndexResponse, error) {
out := new(VolumeSyncIndexResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeSyncIndex", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VolumeSyncData(ctx context.Context, in *VolumeSyncDataRequest, opts ...grpc.CallOption) (*VolumeSyncDataResponse, error) {
out := new(VolumeSyncDataResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeSyncData", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error) {
out := new(VolumeMountResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeMount", in, out, c.cc, opts...)
@ -605,6 +729,8 @@ type VolumeServerServer interface {
DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error)
AssignVolume(context.Context, *AssignVolumeRequest) (*AssignVolumeResponse, error)
VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error)
VolumeSyncIndex(context.Context, *VolumeSyncIndexRequest) (*VolumeSyncIndexResponse, error)
VolumeSyncData(context.Context, *VolumeSyncDataRequest) (*VolumeSyncDataResponse, error)
VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error)
VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error)
}
@ -757,6 +883,42 @@ func _VolumeServer_VolumeSyncStatus_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VolumeSyncIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeSyncIndexRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VolumeSyncIndex(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VolumeSyncIndex",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VolumeSyncIndex(ctx, req.(*VolumeSyncIndexRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VolumeSyncData_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeSyncDataRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).VolumeSyncData(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/VolumeSyncData",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).VolumeSyncData(ctx, req.(*VolumeSyncDataRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VolumeMount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(VolumeMountRequest)
if err := dec(in); err != nil {
@ -829,6 +991,14 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "VolumeSyncStatus",
Handler: _VolumeServer_VolumeSyncStatus_Handler,
},
{
MethodName: "VolumeSyncIndex",
Handler: _VolumeServer_VolumeSyncIndex_Handler,
},
{
MethodName: "VolumeSyncData",
Handler: _VolumeServer_VolumeSyncData_Handler,
},
{
MethodName: "VolumeMount",
Handler: _VolumeServer_VolumeMount_Handler,
@ -845,52 +1015,61 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 748 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x56, 0xdb, 0x6e, 0xd3, 0x40,
0x10, 0x8d, 0x9b, 0x6b, 0x27, 0x89, 0x08, 0x9b, 0xb6, 0x71, 0x5d, 0x28, 0x91, 0x81, 0x92, 0x5e,
0x28, 0xa2, 0x95, 0xa0, 0xbc, 0x01, 0x05, 0xa4, 0x3e, 0xa0, 0x4a, 0xae, 0xe8, 0x0b, 0x48, 0x96,
0xe3, 0x6c, 0xdb, 0x55, 0x37, 0xb6, 0xeb, 0x5d, 0x57, 0x2d, 0x5f, 0xc4, 0x27, 0xf0, 0x2f, 0xfc,
0x0c, 0xf2, 0xae, 0x13, 0x7c, 0x8b, 0x6c, 0xde, 0xec, 0xd9, 0x39, 0xe7, 0xcc, 0xac, 0x67, 0x4e,
0x02, 0xfd, 0x5b, 0x97, 0x06, 0x53, 0x6c, 0x32, 0xec, 0xdf, 0x62, 0x7f, 0xdf, 0xf3, 0x5d, 0xee,
0xa2, 0x5e, 0x22, 0x68, 0x7a, 0x63, 0xfd, 0x15, 0xa0, 0x8f, 0x16, 0xb7, 0xaf, 0x3e, 0x61, 0x8a,
0x39, 0x36, 0xf0, 0x4d, 0x80, 0x19, 0x47, 0xeb, 0xd0, 0xba, 0x20, 0x14, 0x9b, 0x64, 0xc2, 0x54,
0x65, 0x58, 0x1d, 0x2d, 0x1b, 0xcd, 0xf0, 0xfd, 0x64, 0xc2, 0xf4, 0x53, 0xe8, 0x27, 0x00, 0xcc,
0x73, 0x1d, 0x86, 0xd1, 0x11, 0x34, 0x7d, 0xcc, 0x02, 0xca, 0x25, 0xa0, 0x7d, 0xb0, 0xb9, 0x9f,
0xd6, 0xda, 0x9f, 0x43, 0x02, 0xca, 0x8d, 0x59, 0xba, 0x4e, 0xa0, 0x13, 0x3f, 0x40, 0x03, 0x68,
0x46, 0xda, 0xaa, 0x32, 0x54, 0x46, 0xcb, 0x46, 0x43, 0x4a, 0xa3, 0x35, 0x68, 0x30, 0x6e, 0xf1,
0x80, 0xa9, 0x4b, 0x43, 0x65, 0x54, 0x37, 0xa2, 0x37, 0xb4, 0x02, 0x75, 0xec, 0xfb, 0xae, 0xaf,
0x56, 0x45, 0xba, 0x7c, 0x41, 0x08, 0x6a, 0x8c, 0xfc, 0xc4, 0x6a, 0x6d, 0xa8, 0x8c, 0xba, 0x86,
0x78, 0xd6, 0x9b, 0x50, 0xff, 0x3c, 0xf5, 0xf8, 0xbd, 0xfe, 0x16, 0xd4, 0x73, 0xcb, 0x0e, 0x82,
0xe9, 0xb9, 0xa8, 0xf1, 0xf8, 0x0a, 0xdb, 0xd7, 0xb3, 0xde, 0x37, 0x60, 0x59, 0x54, 0x3e, 0x99,
0x55, 0xd0, 0x35, 0x5a, 0x32, 0x70, 0x32, 0xd1, 0xdf, 0xc3, 0x7a, 0x0e, 0x30, 0xba, 0x83, 0xa7,
0xd0, 0xbd, 0xb4, 0xfc, 0xb1, 0x75, 0x89, 0x4d, 0xdf, 0xe2, 0xc4, 0x15, 0x68, 0xc5, 0xe8, 0x44,
0x41, 0x23, 0x8c, 0xe9, 0xdf, 0x41, 0x4b, 0x30, 0xb8, 0x53, 0xcf, 0xb2, 0x79, 0x19, 0x71, 0x34,
0x84, 0xb6, 0xe7, 0x63, 0x8b, 0x52, 0xd7, 0xb6, 0x38, 0x16, 0xb7, 0x50, 0x35, 0xe2, 0x21, 0xfd,
0x31, 0x6c, 0xe4, 0x92, 0xcb, 0x02, 0xf5, 0xa3, 0x54, 0xf5, 0xee, 0x74, 0x4a, 0x4a, 0x49, 0xeb,
0x8f, 0x32, 0x55, 0x0b, 0x64, 0xc4, 0xfb, 0x2e, 0x75, 0x4a, 0xb1, 0xe5, 0x04, 0x5e, 0x29, 0xe2,
0x74, 0xc5, 0x33, 0xe8, 0x9c, 0x79, 0x20, 0x87, 0xe3, 0xd8, 0xa5, 0x14, 0xdb, 0x9c, 0xb8, 0xce,
0x8c, 0x76, 0x13, 0xc0, 0x9e, 0x07, 0xa3, 0x51, 0x89, 0x45, 0x74, 0x0d, 0xd4, 0x2c, 0x34, 0xa2,
0xfd, 0xa5, 0x40, 0xff, 0x03, 0x63, 0xe4, 0xd2, 0x91, 0xb2, 0xa5, 0xae, 0x3f, 0x29, 0xb8, 0x94,
0x16, 0x4c, 0x7f, 0x9e, 0x6a, 0xe6, 0xf3, 0x84, 0x19, 0x3e, 0xf6, 0x28, 0xb1, 0x2d, 0x41, 0x51,
0x13, 0x14, 0xf1, 0x10, 0xea, 0x41, 0x95, 0x73, 0xaa, 0xd6, 0xc5, 0x49, 0xf8, 0xa8, 0xaf, 0xc1,
0x4a, 0xb2, 0xd2, 0xa8, 0x85, 0x37, 0x30, 0x90, 0x91, 0xb3, 0x7b, 0xc7, 0x3e, 0x13, 0x9b, 0x50,
0xea, 0xc2, 0xff, 0x28, 0xa0, 0x66, 0x81, 0xd1, 0x04, 0x17, 0x8d, 0xdf, 0xff, 0x56, 0x8f, 0x9e,
0x40, 0x9b, 0x5b, 0x84, 0x9a, 0xee, 0xc5, 0x05, 0xc3, 0x5c, 0x6d, 0x0c, 0x95, 0x51, 0xcd, 0x80,
0x30, 0x74, 0x2a, 0x22, 0x68, 0x1b, 0x7a, 0xb6, 0x9c, 0x52, 0xd3, 0xc7, 0xb7, 0x84, 0x85, 0xcc,
0x4d, 0x21, 0xfc, 0xc0, 0x9e, 0x4d, 0xaf, 0x0c, 0x23, 0x1d, 0xba, 0x64, 0x72, 0x67, 0x0a, 0x73,
0x10, 0xab, 0xdd, 0x12, 0x6c, 0x6d, 0x32, 0xb9, 0xfb, 0x42, 0x28, 0x3e, 0x0b, 0x37, 0xfc, 0x35,
0x20, 0xd9, 0xdc, 0x57, 0x37, 0x70, 0xca, 0x8d, 0xf6, 0x2a, 0xf4, 0x13, 0x90, 0xe8, 0x7e, 0x0f,
0x61, 0x45, 0x86, 0xbf, 0x39, 0xd3, 0xd2, 0x5c, 0x03, 0x58, 0x4d, 0x81, 0x24, 0xdb, 0xc1, 0xef,
0x16, 0x74, 0xa2, 0x5b, 0x17, 0x76, 0x88, 0x7e, 0x40, 0x3b, 0x66, 0xa3, 0xe8, 0x59, 0xd6, 0x2d,
0xb3, 0xb6, 0xac, 0x3d, 0x2f, 0xc8, 0x8a, 0x4a, 0xaf, 0x20, 0x07, 0x1e, 0x66, 0x6c, 0x0a, 0xed,
0x64, 0xd1, 0x8b, 0x4c, 0x50, 0xdb, 0x2d, 0x95, 0x3b, 0xd7, 0xe3, 0xd0, 0xcf, 0xf1, 0x1d, 0xb4,
0x57, 0xc0, 0x92, 0xf0, 0x3e, 0xed, 0x65, 0xc9, 0xec, 0xb9, 0xea, 0x0d, 0xa0, 0xac, 0x29, 0xa1,
0xdd, 0x42, 0x9a, 0x7f, 0xa6, 0xa7, 0xed, 0x95, 0x4b, 0x5e, 0xd8, 0xa8, 0xb4, 0xab, 0xc2, 0x46,
0x13, 0x86, 0x58, 0xd8, 0x68, 0xca, 0x03, 0x2b, 0xe8, 0x1a, 0x7a, 0x69, 0x2b, 0x43, 0xdb, 0x8b,
0x7e, 0x5f, 0x33, 0x4e, 0xa9, 0xed, 0x94, 0x49, 0x9d, 0x8b, 0x99, 0xd0, 0x89, 0x1b, 0x0e, 0xca,
0x19, 0xba, 0x1c, 0xeb, 0xd4, 0xb6, 0x8a, 0xd2, 0xe2, 0xdd, 0xa4, 0x0d, 0x28, 0xaf, 0x9b, 0x05,
0xee, 0x96, 0xd7, 0xcd, 0x22, 0x3f, 0xd3, 0x2b, 0xe1, 0x9e, 0xc5, 0xb6, 0x3b, 0x6f, 0xcf, 0xb2,
0x7e, 0x91, 0xb7, 0x67, 0x79, 0x16, 0x51, 0x41, 0x63, 0xe8, 0x26, 0xf6, 0x1d, 0x6d, 0x2d, 0x42,
0x26, 0x5d, 0x44, 0x7b, 0x51, 0x98, 0x37, 0xd3, 0x18, 0x37, 0xc4, 0x5f, 0xb7, 0xc3, 0xbf, 0x01,
0x00, 0x00, 0xff, 0xff, 0x32, 0xfe, 0x38, 0x2b, 0xd1, 0x09, 0x00, 0x00,
// 892 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x56, 0xdd, 0x6e, 0xeb, 0x44,
0x10, 0x8e, 0x9b, 0xdf, 0x4e, 0x12, 0x4e, 0xd8, 0xf4, 0x34, 0x3e, 0x3e, 0x70, 0x08, 0x0b, 0xb4,
0xe9, 0x0f, 0x45, 0xb4, 0x02, 0x8a, 0xb8, 0x01, 0x5a, 0x40, 0xb9, 0x40, 0x95, 0x5c, 0xd1, 0x1b,
0x2a, 0x59, 0x8e, 0xbd, 0x69, 0xad, 0x3a, 0xb6, 0x6b, 0xaf, 0xab, 0x96, 0x37, 0xe0, 0x11, 0x78,
0x03, 0xde, 0x89, 0x97, 0x41, 0xde, 0x5d, 0xbb, 0xfe, 0x4b, 0xed, 0x73, 0xe7, 0x9d, 0x9d, 0xf9,
0xbe, 0x99, 0xf1, 0xf8, 0x1b, 0xc3, 0xf8, 0xc1, 0xb5, 0xc3, 0x15, 0xd1, 0x02, 0xe2, 0x3f, 0x10,
0xff, 0xc8, 0xf3, 0x5d, 0xea, 0xa2, 0x51, 0xc6, 0xa8, 0x79, 0x0b, 0xfc, 0x15, 0xa0, 0x9f, 0x75,
0x6a, 0xdc, 0x9e, 0x13, 0x9b, 0x50, 0xa2, 0x92, 0xfb, 0x90, 0x04, 0x14, 0xbd, 0x81, 0xde, 0xd2,
0xb2, 0x89, 0x66, 0x99, 0x81, 0x2c, 0x4d, 0x9b, 0xb3, 0x4d, 0xb5, 0x1b, 0x9d, 0xe7, 0x66, 0x80,
0x2f, 0x60, 0x9c, 0x09, 0x08, 0x3c, 0xd7, 0x09, 0x08, 0x3a, 0x85, 0xae, 0x4f, 0x82, 0xd0, 0xa6,
0x3c, 0xa0, 0x7f, 0xfc, 0xee, 0x28, 0xcf, 0x75, 0x94, 0x84, 0x84, 0x36, 0x55, 0x63, 0x77, 0x6c,
0xc1, 0x20, 0x7d, 0x81, 0x26, 0xd0, 0x15, 0xdc, 0xb2, 0x34, 0x95, 0x66, 0x9b, 0x6a, 0x87, 0x53,
0xa3, 0x6d, 0xe8, 0x04, 0x54, 0xa7, 0x61, 0x20, 0x6f, 0x4c, 0xa5, 0x59, 0x5b, 0x15, 0x27, 0xb4,
0x05, 0x6d, 0xe2, 0xfb, 0xae, 0x2f, 0x37, 0x99, 0x3b, 0x3f, 0x20, 0x04, 0xad, 0xc0, 0xfa, 0x8b,
0xc8, 0xad, 0xa9, 0x34, 0x1b, 0xaa, 0xec, 0x19, 0x77, 0xa1, 0xfd, 0xcb, 0xca, 0xa3, 0x4f, 0xf8,
0x3b, 0x90, 0xaf, 0x74, 0x23, 0x0c, 0x57, 0x57, 0x2c, 0xc7, 0xb3, 0x5b, 0x62, 0xdc, 0xc5, 0xb5,
0xbf, 0x85, 0x4d, 0x96, 0xb9, 0x19, 0x67, 0x30, 0x54, 0x7b, 0xdc, 0x30, 0x37, 0xf1, 0x8f, 0xf0,
0xa6, 0x24, 0x50, 0xf4, 0xe0, 0x33, 0x18, 0xde, 0xe8, 0xfe, 0x42, 0xbf, 0x21, 0x9a, 0xaf, 0x53,
0xcb, 0x65, 0xd1, 0x92, 0x3a, 0x10, 0x46, 0x35, 0xb2, 0xe1, 0x3f, 0x41, 0xc9, 0x20, 0xb8, 0x2b,
0x4f, 0x37, 0x68, 0x1d, 0x72, 0x34, 0x85, 0xbe, 0xe7, 0x13, 0xdd, 0xb6, 0x5d, 0x43, 0xa7, 0x84,
0x75, 0xa1, 0xa9, 0xa6, 0x4d, 0xf8, 0x63, 0x78, 0x5b, 0x0a, 0xce, 0x13, 0xc4, 0xa7, 0xb9, 0xec,
0xdd, 0xd5, 0xca, 0xaa, 0x45, 0x8d, 0x3f, 0x2a, 0x64, 0xcd, 0x22, 0x05, 0xee, 0xf7, 0xb9, 0x5b,
0x9b, 0xe8, 0x4e, 0xe8, 0xd5, 0x02, 0xce, 0x67, 0x1c, 0x87, 0x26, 0xc8, 0x13, 0x3e, 0x1c, 0x67,
0xae, 0x6d, 0x13, 0x83, 0x5a, 0xae, 0x13, 0xc3, 0xbe, 0x03, 0x30, 0x12, 0xa3, 0x18, 0x95, 0x94,
0x05, 0x2b, 0x20, 0x17, 0x43, 0x05, 0xec, 0xbf, 0x12, 0x8c, 0x7f, 0x0a, 0x02, 0xeb, 0xc6, 0xe1,
0xb4, 0xb5, 0xda, 0x9f, 0x25, 0xdc, 0xc8, 0x13, 0xe6, 0x5f, 0x4f, 0xb3, 0xf0, 0x7a, 0x22, 0x0f,
0x9f, 0x78, 0xb6, 0x65, 0xe8, 0x0c, 0xa2, 0xc5, 0x20, 0xd2, 0x26, 0x34, 0x82, 0x26, 0xa5, 0xb6,
0xdc, 0x66, 0x37, 0xd1, 0x23, 0xde, 0x86, 0xad, 0x6c, 0xa6, 0xa2, 0x84, 0x6f, 0x61, 0xc2, 0x2d,
0x97, 0x4f, 0x8e, 0x71, 0xc9, 0xbe, 0x84, 0x5a, 0x0d, 0xff, 0x4f, 0x02, 0xb9, 0x18, 0x28, 0x26,
0xb8, 0x6a, 0xfc, 0xde, 0x37, 0x7b, 0xf4, 0x09, 0xf4, 0xa9, 0x6e, 0xd9, 0x9a, 0xbb, 0x5c, 0x06,
0x84, 0xca, 0x9d, 0xa9, 0x34, 0x6b, 0xa9, 0x10, 0x99, 0x2e, 0x98, 0x05, 0xed, 0xc1, 0xc8, 0xe0,
0x53, 0xaa, 0xf9, 0xe4, 0xc1, 0x0a, 0x22, 0xe4, 0x2e, 0x23, 0x7e, 0x65, 0xc4, 0xd3, 0xcb, 0xcd,
0x08, 0xc3, 0xd0, 0x32, 0x1f, 0x35, 0x26, 0x0e, 0xec, 0xd3, 0xee, 0x31, 0xb4, 0xbe, 0x65, 0x3e,
0xfe, 0x6a, 0xd9, 0xe4, 0x32, 0xfa, 0xc2, 0xbf, 0x81, 0xed, 0xe7, 0xe2, 0xe6, 0x8e, 0x49, 0x1e,
0x6b, 0x35, 0xe5, 0xb7, 0x74, 0x33, 0x45, 0x98, 0x68, 0xc9, 0x21, 0x20, 0x2b, 0x32, 0x70, 0x5e,
0xc3, 0x75, 0x28, 0x71, 0x28, 0x03, 0x18, 0xa8, 0x23, 0x76, 0x13, 0x91, 0x9f, 0x71, 0x3b, 0xfe,
0x47, 0x82, 0xd7, 0xcf, 0x48, 0xe7, 0x3a, 0xd5, 0x6b, 0x8d, 0x96, 0x02, 0xbd, 0xa4, 0xfa, 0x0d,
0x7e, 0x17, 0x9f, 0x23, 0xd9, 0x13, 0xdd, 0x6b, 0xb2, 0x1b, 0x71, 0x2a, 0x13, 0xb8, 0x88, 0xc4,
0x21, 0xc4, 0xe4, 0xea, 0xc9, 0x5f, 0x43, 0x8f, 0x1b, 0xe6, 0x26, 0xfe, 0x21, 0xdd, 0x1b, 0x9e,
0x9a, 0xa8, 0xf1, 0x53, 0x18, 0x94, 0x54, 0xd7, 0x5f, 0xa6, 0x0a, 0xfb, 0x1a, 0x10, 0x0f, 0xfe,
0xdd, 0x0d, 0x9d, 0x7a, 0x9a, 0xf1, 0x1a, 0xc6, 0x99, 0x10, 0x31, 0xb8, 0x27, 0xb0, 0xc5, 0xcd,
0x7f, 0x38, 0xab, 0xda, 0x58, 0x93, 0xb8, 0xad, 0x49, 0x10, 0x47, 0x3b, 0xfe, 0x1b, 0x60, 0x20,
0xaa, 0x62, 0x7b, 0x06, 0x5d, 0x43, 0x3f, 0xb5, 0x9f, 0xd0, 0xe7, 0xc5, 0x35, 0x54, 0xdc, 0x77,
0xca, 0x17, 0x15, 0x5e, 0x22, 0xf5, 0x06, 0x72, 0xe0, 0xc3, 0x82, 0xfe, 0xa3, 0xfd, 0x62, 0xf4,
0xba, 0xed, 0xa2, 0x1c, 0xd4, 0xf2, 0x4d, 0xf8, 0x28, 0x8c, 0x4b, 0x04, 0x1d, 0x1d, 0x56, 0xa0,
0x64, 0x96, 0x8a, 0xf2, 0x65, 0x4d, 0xef, 0x84, 0xf5, 0x1e, 0x50, 0x51, 0xed, 0xd1, 0x41, 0x25,
0xcc, 0xf3, 0x36, 0x51, 0x0e, 0xeb, 0x39, 0xaf, 0x2d, 0x94, 0xef, 0x81, 0xca, 0x42, 0x33, 0x9b,
0xa6, 0xb2, 0xd0, 0xdc, 0x72, 0x69, 0xa0, 0x3b, 0x18, 0xe5, 0x77, 0x04, 0xda, 0x5b, 0xf7, 0xe3,
0x52, 0x58, 0x41, 0xca, 0x7e, 0x1d, 0xd7, 0x84, 0x4c, 0x83, 0x41, 0x5a, 0xc9, 0x51, 0xc9, 0xd0,
0x95, 0xec, 0x24, 0x65, 0xa7, 0xca, 0x2d, 0x5d, 0x4d, 0x5e, 0xd9, 0xcb, 0xaa, 0x59, 0xb3, 0x36,
0xca, 0xaa, 0x59, 0xb7, 0x28, 0x70, 0x03, 0xdd, 0xc2, 0xab, 0x9c, 0x64, 0xa2, 0xd9, 0x4b, 0x00,
0x69, 0x31, 0x56, 0xf6, 0x6a, 0x78, 0x26, 0x4c, 0x04, 0x3e, 0xc8, 0xea, 0x16, 0xda, 0x7d, 0x29,
0x3c, 0x25, 0xba, 0xca, 0xac, 0xda, 0x31, 0xa1, 0xb9, 0x86, 0x7e, 0x4a, 0xae, 0xca, 0x84, 0xa3,
0x28, 0x80, 0x65, 0xc2, 0x51, 0xa6, 0x79, 0x0d, 0xb4, 0x80, 0x61, 0x46, 0xc0, 0xd0, 0xce, 0xba,
0xc8, 0xac, 0x2c, 0x2a, 0xbb, 0x95, 0x7e, 0x31, 0xc7, 0xa2, 0xc3, 0x7e, 0xf2, 0x4f, 0xfe, 0x0f,
0x00, 0x00, 0xff, 0xff, 0x5b, 0x29, 0xa9, 0x95, 0xfb, 0x0b, 0x00, 0x00,
}

62
weed/server/volume_grpc_sync.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
@ -23,3 +24,64 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
return resp, nil
}
func (vs *VolumeServer) VolumeSyncIndex(ctx context.Context, req *volume_server_pb.VolumeSyncIndexRequest) (*volume_server_pb.VolumeSyncIndexResponse, error) {
resp := &volume_server_pb.VolumeSyncIndexResponse{}
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
}
content, err := v.IndexFileContent()
if err != nil {
glog.Errorf("sync volume %d index: %v", req.VolumdId, err)
} else {
glog.V(2).Infof("sync volume %d index", req.VolumdId)
}
resp.IndexFileContent = content
return resp, nil
}
func (vs *VolumeServer) VolumeSyncData(ctx context.Context, req *volume_server_pb.VolumeSyncDataRequest) (*volume_server_pb.VolumeSyncDataResponse, error) {
resp := &volume_server_pb.VolumeSyncDataResponse{}
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
}
if uint32(v.SuperBlock.CompactRevision) != req.Revision {
return nil, fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
}
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version())
if err != nil {
return nil, fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
}
id, err := types.ParseNeedleId(req.NeedleId)
if err != nil {
return nil, fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
}
n := new(storage.Needle)
n.ParseNeedleHeader(content)
if id != n.Id {
return nil, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
}
if err != nil {
glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
}
resp.FileContent = content
return resp, nil
}

2
weed/server/volume_server.go

@ -47,8 +47,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
handleStaticResources(adminMux)
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

20
weed/server/volume_server_handlers_admin.go

@ -1,12 +1,11 @@
package weed_server
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
@ -28,16 +27,3 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
m["DiskStatuses"] = ds
writeJsonQuiet(w, r, http.StatusOK, m)
}
// TODO delete this when volume sync is all moved to grpc
func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) {
vid, err := vs.getVolumeId(volumeParameterName, r)
if err != nil {
return nil, err
}
v := vs.store.GetVolume(vid)
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %d", vid)
}
return v, nil
}

74
weed/server/volume_server_handlers_sync.go

@ -1,74 +0,0 @@
package weed_server
import (
"fmt"
"net/http"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
content, err := v.IndexFileContent()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
w.Write(content)
}
func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err))
return
}
if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) {
writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision))
return
}
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0))
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*types.NeedlePaddingSize, size, v.Version())
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
id, err := types.ParseNeedleId(r.FormValue("id"))
if err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
n := new(storage.Needle)
n.ParseNeedleHeader(content)
if id != n.Id {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id))
return
}
w.Write(content)
}
func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) (storage.VolumeId, error) {
volumeIdString := r.FormValue(volumeParameterName)
if volumeIdString == "" {
err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName)
return 0, err
}
vid, err := storage.NewVolumeId(volumeIdString)
if err != nil {
err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
return 0, err
}
return vid, err
}

39
weed/storage/volume_sync.go

@ -1,20 +1,16 @@
package storage
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"sort"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
// The volume sync with a master volume via 2 steps:
@ -122,7 +118,6 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
// make up the delta
fetchCount := 0
volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
for _, needleValue := range delta {
if needleValue.Size == 0 {
// remove file entry from local
@ -130,7 +125,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
continue
}
// add master file entry to local data file
if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
if err := v.fetchNeedle(volumeServer, needleValue, compactRevision); err != nil {
glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
return err
}
@ -149,7 +144,7 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.Compac
}
total := 0
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key NeedleId, offset Offset, size uint32) {
err = operation.GetVolumeIdxEntries(volumeServer, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize {
m.Set(NeedleId(key), offset, size)
@ -190,22 +185,21 @@ func (v *Volume) removeNeedle(key NeedleId) {
// fetchNeedle fetches a remote volume needle by vid, id, offset
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
needleValue needle.NeedleValue, compactRevision uint16) error {
// add master file entry to local data file
values := make(url.Values)
values.Add("revision", strconv.Itoa(int(compactRevision)))
values.Add("volume", v.Id.String())
values.Add("id", needleValue.Key.String())
values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
glog.V(4).Infof("Fetch %+v", needleValue)
return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
b, err := ioutil.ReadAll(r)
func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
Revision: uint32(compactRevision),
Offset: uint32(needleValue.Offset),
Size: uint32(needleValue.Size),
NeedleId: needleValue.Key.String(),
})
if err != nil {
return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
return err
}
offset, err := v.AppendBlob(b)
offset, err := v.AppendBlob(resp.FileContent)
if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
}
@ -213,4 +207,5 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
return nil
})
}
Loading…
Cancel
Save