diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index e8d4ae61e..3bb643db1 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -50,6 +50,8 @@ service VolumeServer { // erasure coding rpc VolumeEcShardsGenerate (VolumeEcShardsGenerateRequest) returns (VolumeEcShardsGenerateResponse) { } + rpc VolumeEcShardsRebuild (VolumeEcShardsRebuildRequest) returns (VolumeEcShardsRebuildResponse) { + } rpc VolumeEcShardsCopy (VolumeEcShardsCopyRequest) returns (VolumeEcShardsCopyResponse) { } rpc VolumeEcShardsDelete (VolumeEcShardsDeleteRequest) returns (VolumeEcShardsDeleteResponse) { @@ -61,7 +63,6 @@ service VolumeServer { rpc VolumeEcShardRead (VolumeEcShardReadRequest) returns (stream VolumeEcShardReadResponse) { } - } ////////////////////////////////////////////////// @@ -180,6 +181,8 @@ message CopyFileRequest { string ext = 2; uint32 compaction_revision = 3; uint64 stop_offset = 4; + string collection = 5; + bool is_ec_volume = 6; } message CopyFileResponse { bytes file_content = 1; @@ -212,10 +215,19 @@ message VolumeEcShardsGenerateRequest { message VolumeEcShardsGenerateResponse { } +message VolumeEcShardsRebuildRequest { + uint32 volume_id = 1; + string collection = 2; +} +message VolumeEcShardsRebuildResponse { + repeated uint32 rebuilt_shard_ids = 1; +} + message VolumeEcShardsCopyRequest { uint32 volume_id = 1; string collection = 2; repeated uint32 shard_ids = 3; + bool copy_ecx_file = 4; string source_data_node = 5; } message VolumeEcShardsCopyResponse { @@ -223,7 +235,7 @@ message VolumeEcShardsCopyResponse { message VolumeEcShardsDeleteRequest { uint32 volume_id = 1; - bool should_delete_ecx = 2; + string collection = 2; repeated uint32 shard_ids = 3; } message VolumeEcShardsDeleteResponse { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 180fbc0af..f31b5f2fd 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -45,6 +45,8 @@ It has these top-level messages: VolumeTailReceiverResponse VolumeEcShardsGenerateRequest VolumeEcShardsGenerateResponse + VolumeEcShardsRebuildRequest + VolumeEcShardsRebuildResponse VolumeEcShardsCopyRequest VolumeEcShardsCopyResponse VolumeEcShardsDeleteRequest @@ -615,6 +617,8 @@ type CopyFileRequest struct { Ext string `protobuf:"bytes,2,opt,name=ext" json:"ext,omitempty"` CompactionRevision uint32 `protobuf:"varint,3,opt,name=compaction_revision,json=compactionRevision" json:"compaction_revision,omitempty"` StopOffset uint64 `protobuf:"varint,4,opt,name=stop_offset,json=stopOffset" json:"stop_offset,omitempty"` + Collection string `protobuf:"bytes,5,opt,name=collection" json:"collection,omitempty"` + IsEcVolume bool `protobuf:"varint,6,opt,name=is_ec_volume,json=isEcVolume" json:"is_ec_volume,omitempty"` } func (m *CopyFileRequest) Reset() { *m = CopyFileRequest{} } @@ -650,6 +654,20 @@ func (m *CopyFileRequest) GetStopOffset() uint64 { return 0 } +func (m *CopyFileRequest) GetCollection() string { + if m != nil { + return m.Collection + } + return "" +} + +func (m *CopyFileRequest) GetIsEcVolume() bool { + if m != nil { + return m.IsEcVolume + } + return false +} + type CopyFileResponse struct { FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"` } @@ -810,17 +828,58 @@ func (m *VolumeEcShardsGenerateResponse) String() string { return pro func (*VolumeEcShardsGenerateResponse) ProtoMessage() {} func (*VolumeEcShardsGenerateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{35} } +type VolumeEcShardsRebuildRequest struct { + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` +} + +func (m *VolumeEcShardsRebuildRequest) Reset() { *m = VolumeEcShardsRebuildRequest{} } +func (m *VolumeEcShardsRebuildRequest) String() string { return proto.CompactTextString(m) } +func (*VolumeEcShardsRebuildRequest) ProtoMessage() {} +func (*VolumeEcShardsRebuildRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{36} } + +func (m *VolumeEcShardsRebuildRequest) GetVolumeId() uint32 { + if m != nil { + return m.VolumeId + } + return 0 +} + +func (m *VolumeEcShardsRebuildRequest) GetCollection() string { + if m != nil { + return m.Collection + } + return "" +} + +type VolumeEcShardsRebuildResponse struct { + RebuiltShardIds []uint32 `protobuf:"varint,1,rep,packed,name=rebuilt_shard_ids,json=rebuiltShardIds" json:"rebuilt_shard_ids,omitempty"` +} + +func (m *VolumeEcShardsRebuildResponse) Reset() { *m = VolumeEcShardsRebuildResponse{} } +func (m *VolumeEcShardsRebuildResponse) String() string { return proto.CompactTextString(m) } +func (*VolumeEcShardsRebuildResponse) ProtoMessage() {} +func (*VolumeEcShardsRebuildResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{37} } + +func (m *VolumeEcShardsRebuildResponse) GetRebuiltShardIds() []uint32 { + if m != nil { + return m.RebuiltShardIds + } + return nil +} + type VolumeEcShardsCopyRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` ShardIds []uint32 `protobuf:"varint,3,rep,packed,name=shard_ids,json=shardIds" json:"shard_ids,omitempty"` + CopyEcxFile bool `protobuf:"varint,4,opt,name=copy_ecx_file,json=copyEcxFile" json:"copy_ecx_file,omitempty"` SourceDataNode string `protobuf:"bytes,5,opt,name=source_data_node,json=sourceDataNode" json:"source_data_node,omitempty"` } func (m *VolumeEcShardsCopyRequest) Reset() { *m = VolumeEcShardsCopyRequest{} } func (m *VolumeEcShardsCopyRequest) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsCopyRequest) ProtoMessage() {} -func (*VolumeEcShardsCopyRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{36} } +func (*VolumeEcShardsCopyRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{38} } func (m *VolumeEcShardsCopyRequest) GetVolumeId() uint32 { if m != nil { @@ -843,6 +902,13 @@ func (m *VolumeEcShardsCopyRequest) GetShardIds() []uint32 { return nil } +func (m *VolumeEcShardsCopyRequest) GetCopyEcxFile() bool { + if m != nil { + return m.CopyEcxFile + } + return false +} + func (m *VolumeEcShardsCopyRequest) GetSourceDataNode() string { if m != nil { return m.SourceDataNode @@ -856,18 +922,18 @@ type VolumeEcShardsCopyResponse struct { func (m *VolumeEcShardsCopyResponse) Reset() { *m = VolumeEcShardsCopyResponse{} } func (m *VolumeEcShardsCopyResponse) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsCopyResponse) ProtoMessage() {} -func (*VolumeEcShardsCopyResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{37} } +func (*VolumeEcShardsCopyResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{39} } type VolumeEcShardsDeleteRequest struct { - VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` - ShouldDeleteEcx bool `protobuf:"varint,2,opt,name=should_delete_ecx,json=shouldDeleteEcx" json:"should_delete_ecx,omitempty"` - ShardIds []uint32 `protobuf:"varint,3,rep,packed,name=shard_ids,json=shardIds" json:"shard_ids,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` + ShardIds []uint32 `protobuf:"varint,3,rep,packed,name=shard_ids,json=shardIds" json:"shard_ids,omitempty"` } func (m *VolumeEcShardsDeleteRequest) Reset() { *m = VolumeEcShardsDeleteRequest{} } func (m *VolumeEcShardsDeleteRequest) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsDeleteRequest) ProtoMessage() {} -func (*VolumeEcShardsDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{38} } +func (*VolumeEcShardsDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{40} } func (m *VolumeEcShardsDeleteRequest) GetVolumeId() uint32 { if m != nil { @@ -876,11 +942,11 @@ func (m *VolumeEcShardsDeleteRequest) GetVolumeId() uint32 { return 0 } -func (m *VolumeEcShardsDeleteRequest) GetShouldDeleteEcx() bool { +func (m *VolumeEcShardsDeleteRequest) GetCollection() string { if m != nil { - return m.ShouldDeleteEcx + return m.Collection } - return false + return "" } func (m *VolumeEcShardsDeleteRequest) GetShardIds() []uint32 { @@ -896,7 +962,7 @@ type VolumeEcShardsDeleteResponse struct { func (m *VolumeEcShardsDeleteResponse) Reset() { *m = VolumeEcShardsDeleteResponse{} } func (m *VolumeEcShardsDeleteResponse) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsDeleteResponse) ProtoMessage() {} -func (*VolumeEcShardsDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{39} } +func (*VolumeEcShardsDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{41} } type VolumeEcShardsMountRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` @@ -907,7 +973,7 @@ type VolumeEcShardsMountRequest struct { func (m *VolumeEcShardsMountRequest) Reset() { *m = VolumeEcShardsMountRequest{} } func (m *VolumeEcShardsMountRequest) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsMountRequest) ProtoMessage() {} -func (*VolumeEcShardsMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{40} } +func (*VolumeEcShardsMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{42} } func (m *VolumeEcShardsMountRequest) GetVolumeId() uint32 { if m != nil { @@ -936,7 +1002,7 @@ type VolumeEcShardsMountResponse struct { func (m *VolumeEcShardsMountResponse) Reset() { *m = VolumeEcShardsMountResponse{} } func (m *VolumeEcShardsMountResponse) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsMountResponse) ProtoMessage() {} -func (*VolumeEcShardsMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{41} } +func (*VolumeEcShardsMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{43} } type VolumeEcShardsUnmountRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` @@ -946,7 +1012,7 @@ type VolumeEcShardsUnmountRequest struct { func (m *VolumeEcShardsUnmountRequest) Reset() { *m = VolumeEcShardsUnmountRequest{} } func (m *VolumeEcShardsUnmountRequest) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsUnmountRequest) ProtoMessage() {} -func (*VolumeEcShardsUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{42} } +func (*VolumeEcShardsUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{44} } func (m *VolumeEcShardsUnmountRequest) GetVolumeId() uint32 { if m != nil { @@ -968,7 +1034,7 @@ type VolumeEcShardsUnmountResponse struct { func (m *VolumeEcShardsUnmountResponse) Reset() { *m = VolumeEcShardsUnmountResponse{} } func (m *VolumeEcShardsUnmountResponse) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardsUnmountResponse) ProtoMessage() {} -func (*VolumeEcShardsUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{43} } +func (*VolumeEcShardsUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{45} } type VolumeEcShardReadRequest struct { VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` @@ -980,7 +1046,7 @@ type VolumeEcShardReadRequest struct { func (m *VolumeEcShardReadRequest) Reset() { *m = VolumeEcShardReadRequest{} } func (m *VolumeEcShardReadRequest) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardReadRequest) ProtoMessage() {} -func (*VolumeEcShardReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{44} } +func (*VolumeEcShardReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{46} } func (m *VolumeEcShardReadRequest) GetVolumeId() uint32 { if m != nil { @@ -1017,7 +1083,7 @@ type VolumeEcShardReadResponse struct { func (m *VolumeEcShardReadResponse) Reset() { *m = VolumeEcShardReadResponse{} } func (m *VolumeEcShardReadResponse) String() string { return proto.CompactTextString(m) } func (*VolumeEcShardReadResponse) ProtoMessage() {} -func (*VolumeEcShardReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{45} } +func (*VolumeEcShardReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{47} } func (m *VolumeEcShardReadResponse) GetData() []byte { if m != nil { @@ -1033,7 +1099,7 @@ type ReadVolumeFileStatusRequest struct { func (m *ReadVolumeFileStatusRequest) Reset() { *m = ReadVolumeFileStatusRequest{} } func (m *ReadVolumeFileStatusRequest) String() string { return proto.CompactTextString(m) } func (*ReadVolumeFileStatusRequest) ProtoMessage() {} -func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{46} } +func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{48} } func (m *ReadVolumeFileStatusRequest) GetVolumeId() uint32 { if m != nil { @@ -1056,7 +1122,7 @@ type ReadVolumeFileStatusResponse struct { func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } func (m *ReadVolumeFileStatusResponse) String() string { return proto.CompactTextString(m) } func (*ReadVolumeFileStatusResponse) ProtoMessage() {} -func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{47} } +func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{49} } func (m *ReadVolumeFileStatusResponse) GetVolumeId() uint32 { if m != nil { @@ -1124,7 +1190,7 @@ type DiskStatus struct { func (m *DiskStatus) Reset() { *m = DiskStatus{} } func (m *DiskStatus) String() string { return proto.CompactTextString(m) } func (*DiskStatus) ProtoMessage() {} -func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{48} } +func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{50} } func (m *DiskStatus) GetDir() string { if m != nil { @@ -1167,7 +1233,7 @@ type MemStatus struct { func (m *MemStatus) Reset() { *m = MemStatus{} } func (m *MemStatus) String() string { return proto.CompactTextString(m) } func (*MemStatus) ProtoMessage() {} -func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{49} } +func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{51} } func (m *MemStatus) GetGoroutines() int32 { if m != nil { @@ -1255,6 +1321,8 @@ func init() { proto.RegisterType((*VolumeTailReceiverResponse)(nil), "volume_server_pb.VolumeTailReceiverResponse") proto.RegisterType((*VolumeEcShardsGenerateRequest)(nil), "volume_server_pb.VolumeEcShardsGenerateRequest") proto.RegisterType((*VolumeEcShardsGenerateResponse)(nil), "volume_server_pb.VolumeEcShardsGenerateResponse") + proto.RegisterType((*VolumeEcShardsRebuildRequest)(nil), "volume_server_pb.VolumeEcShardsRebuildRequest") + proto.RegisterType((*VolumeEcShardsRebuildResponse)(nil), "volume_server_pb.VolumeEcShardsRebuildResponse") proto.RegisterType((*VolumeEcShardsCopyRequest)(nil), "volume_server_pb.VolumeEcShardsCopyRequest") proto.RegisterType((*VolumeEcShardsCopyResponse)(nil), "volume_server_pb.VolumeEcShardsCopyResponse") proto.RegisterType((*VolumeEcShardsDeleteRequest)(nil), "volume_server_pb.VolumeEcShardsDeleteRequest") @@ -1303,6 +1371,7 @@ type VolumeServerClient interface { VolumeTailReceiver(ctx context.Context, in *VolumeTailReceiverRequest, opts ...grpc.CallOption) (*VolumeTailReceiverResponse, error) // erasure coding VolumeEcShardsGenerate(ctx context.Context, in *VolumeEcShardsGenerateRequest, opts ...grpc.CallOption) (*VolumeEcShardsGenerateResponse, error) + VolumeEcShardsRebuild(ctx context.Context, in *VolumeEcShardsRebuildRequest, opts ...grpc.CallOption) (*VolumeEcShardsRebuildResponse, error) VolumeEcShardsCopy(ctx context.Context, in *VolumeEcShardsCopyRequest, opts ...grpc.CallOption) (*VolumeEcShardsCopyResponse, error) VolumeEcShardsDelete(ctx context.Context, in *VolumeEcShardsDeleteRequest, opts ...grpc.CallOption) (*VolumeEcShardsDeleteResponse, error) VolumeEcShardsMount(ctx context.Context, in *VolumeEcShardsMountRequest, opts ...grpc.CallOption) (*VolumeEcShardsMountResponse, error) @@ -1549,6 +1618,15 @@ func (c *volumeServerClient) VolumeEcShardsGenerate(ctx context.Context, in *Vol return out, nil } +func (c *volumeServerClient) VolumeEcShardsRebuild(ctx context.Context, in *VolumeEcShardsRebuildRequest, opts ...grpc.CallOption) (*VolumeEcShardsRebuildResponse, error) { + out := new(VolumeEcShardsRebuildResponse) + err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeEcShardsRebuild", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *volumeServerClient) VolumeEcShardsCopy(ctx context.Context, in *VolumeEcShardsCopyRequest, opts ...grpc.CallOption) (*VolumeEcShardsCopyResponse, error) { out := new(VolumeEcShardsCopyResponse) err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/VolumeEcShardsCopy", in, out, c.cc, opts...) @@ -1641,6 +1719,7 @@ type VolumeServerServer interface { VolumeTailReceiver(context.Context, *VolumeTailReceiverRequest) (*VolumeTailReceiverResponse, error) // erasure coding VolumeEcShardsGenerate(context.Context, *VolumeEcShardsGenerateRequest) (*VolumeEcShardsGenerateResponse, error) + VolumeEcShardsRebuild(context.Context, *VolumeEcShardsRebuildRequest) (*VolumeEcShardsRebuildResponse, error) VolumeEcShardsCopy(context.Context, *VolumeEcShardsCopyRequest) (*VolumeEcShardsCopyResponse, error) VolumeEcShardsDelete(context.Context, *VolumeEcShardsDeleteRequest) (*VolumeEcShardsDeleteResponse, error) VolumeEcShardsMount(context.Context, *VolumeEcShardsMountRequest) (*VolumeEcShardsMountResponse, error) @@ -1985,6 +2064,24 @@ func _VolumeServer_VolumeEcShardsGenerate_Handler(srv interface{}, ctx context.C return interceptor(ctx, in, info, handler) } +func _VolumeServer_VolumeEcShardsRebuild_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(VolumeEcShardsRebuildRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(VolumeServerServer).VolumeEcShardsRebuild(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/volume_server_pb.VolumeServer/VolumeEcShardsRebuild", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(VolumeServerServer).VolumeEcShardsRebuild(ctx, req.(*VolumeEcShardsRebuildRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _VolumeServer_VolumeEcShardsCopy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(VolumeEcShardsCopyRequest) if err := dec(in); err != nil { @@ -2142,6 +2239,10 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ MethodName: "VolumeEcShardsGenerate", Handler: _VolumeServer_VolumeEcShardsGenerate_Handler, }, + { + MethodName: "VolumeEcShardsRebuild", + Handler: _VolumeServer_VolumeEcShardsRebuild_Handler, + }, { MethodName: "VolumeEcShardsCopy", Handler: _VolumeServer_VolumeEcShardsCopy_Handler, @@ -2187,113 +2288,118 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1724 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x59, 0xdd, 0x6e, 0xd4, 0xc6, - 0x17, 0x8f, 0xd9, 0x4d, 0x76, 0x73, 0x36, 0x81, 0x64, 0x12, 0x92, 0x8d, 0x03, 0x61, 0x31, 0xfc, - 0x21, 0x04, 0x48, 0xf8, 0x83, 0xda, 0xd2, 0xf6, 0xa2, 0x85, 0x40, 0xdb, 0x48, 0x05, 0x24, 0x07, - 0x10, 0x55, 0x91, 0xac, 0x89, 0x3d, 0x21, 0x56, 0xbc, 0xf6, 0xe2, 0x19, 0xa7, 0x09, 0x6a, 0x7b, - 0x43, 0x9f, 0xa0, 0x2f, 0xd0, 0x8b, 0xde, 0xf5, 0xa2, 0xb7, 0x7d, 0xaa, 0x3e, 0x40, 0xd5, 0x9b, - 0x6a, 0x3e, 0xec, 0xf5, 0x67, 0xd6, 0x29, 0x48, 0xbd, 0x9b, 0x3d, 0x73, 0xbe, 0x7d, 0xce, 0x99, - 0xf9, 0xcd, 0xc2, 0xdc, 0x41, 0xe0, 0x45, 0x7d, 0x62, 0x51, 0x12, 0x1e, 0x90, 0x70, 0x7d, 0x10, - 0x06, 0x2c, 0x40, 0x33, 0x19, 0xa2, 0x35, 0xd8, 0x31, 0x36, 0x00, 0xdd, 0xc7, 0xcc, 0xde, 0x7b, - 0x40, 0x3c, 0xc2, 0x88, 0x49, 0x5e, 0x47, 0x84, 0x32, 0xb4, 0x04, 0xed, 0x5d, 0xd7, 0x23, 0x96, - 0xeb, 0xd0, 0xae, 0xd6, 0x6b, 0xac, 0x4e, 0x9a, 0x2d, 0xfe, 0x7b, 0xcb, 0xa1, 0xc6, 0x13, 0x98, - 0xcb, 0x08, 0xd0, 0x41, 0xe0, 0x53, 0x82, 0xee, 0x42, 0x2b, 0x24, 0x34, 0xf2, 0x98, 0x14, 0xe8, - 0xdc, 0x5e, 0x59, 0xcf, 0xdb, 0x5a, 0x4f, 0x44, 0x22, 0x8f, 0x99, 0x31, 0xbb, 0xe1, 0xc2, 0x54, - 0x7a, 0x03, 0x2d, 0x42, 0x4b, 0xd9, 0xee, 0x6a, 0x3d, 0x6d, 0x75, 0xd2, 0x9c, 0x90, 0xa6, 0xd1, - 0x02, 0x4c, 0x50, 0x86, 0x59, 0x44, 0xbb, 0xa7, 0x7a, 0xda, 0xea, 0xb8, 0xa9, 0x7e, 0xa1, 0x79, - 0x18, 0x27, 0x61, 0x18, 0x84, 0xdd, 0x86, 0x60, 0x97, 0x3f, 0x10, 0x82, 0x26, 0x75, 0xdf, 0x90, - 0x6e, 0xb3, 0xa7, 0xad, 0x4e, 0x9b, 0x62, 0x6d, 0xb4, 0x60, 0xfc, 0x61, 0x7f, 0xc0, 0x8e, 0x8c, - 0x8f, 0xa0, 0xfb, 0x1c, 0xdb, 0x51, 0xd4, 0x7f, 0x2e, 0x7c, 0xdc, 0xdc, 0x23, 0xf6, 0x7e, 0x1c, - 0xfb, 0x32, 0x4c, 0x2a, 0xcf, 0x95, 0x07, 0xd3, 0x66, 0x5b, 0x12, 0xb6, 0x1c, 0xe3, 0x73, 0x58, - 0x2a, 0x11, 0x54, 0x39, 0xb8, 0x04, 0xd3, 0xaf, 0x70, 0xb8, 0x83, 0x5f, 0x11, 0x2b, 0xc4, 0xcc, - 0x0d, 0x84, 0xb4, 0x66, 0x4e, 0x29, 0xa2, 0xc9, 0x69, 0xc6, 0xb7, 0xa0, 0x67, 0x34, 0x04, 0xfd, - 0x01, 0xb6, 0x59, 0x1d, 0xe3, 0xa8, 0x07, 0x9d, 0x41, 0x48, 0xb0, 0xe7, 0x05, 0x36, 0x66, 0x44, - 0x64, 0xa1, 0x61, 0xa6, 0x49, 0xc6, 0x79, 0x58, 0x2e, 0x55, 0x2e, 0x1d, 0x34, 0xee, 0xe6, 0xbc, - 0x0f, 0xfa, 0x7d, 0xb7, 0x96, 0x69, 0xe3, 0x5c, 0xc1, 0x6b, 0x21, 0xa9, 0xf4, 0x7e, 0x9c, 0xdb, - 0xf5, 0x08, 0xf6, 0xa3, 0x41, 0x2d, 0xc5, 0x79, 0x8f, 0x63, 0xd1, 0x44, 0xf3, 0xa2, 0x2c, 0x8e, - 0xcd, 0xc0, 0xf3, 0x88, 0xcd, 0xdc, 0xc0, 0x8f, 0xd5, 0xae, 0x00, 0xd8, 0x09, 0x51, 0x95, 0x4a, - 0x8a, 0x62, 0xe8, 0xd0, 0x2d, 0x8a, 0x2a, 0xb5, 0xbf, 0x69, 0x70, 0xf6, 0x9e, 0x4a, 0x9a, 0x34, - 0x5c, 0xeb, 0x03, 0x64, 0x4d, 0x9e, 0xca, 0x9b, 0xcc, 0x7f, 0xa0, 0x46, 0xe1, 0x03, 0x71, 0x8e, - 0x90, 0x0c, 0x3c, 0xd7, 0xc6, 0x42, 0x45, 0x53, 0xa8, 0x48, 0x93, 0xd0, 0x0c, 0x34, 0x18, 0xf3, - 0xba, 0xe3, 0x62, 0x87, 0x2f, 0x8d, 0x2e, 0x2c, 0xe4, 0x7d, 0x55, 0x61, 0x7c, 0x08, 0x8b, 0x92, - 0xb2, 0x7d, 0xe4, 0xdb, 0xdb, 0xa2, 0x1b, 0x6a, 0x25, 0xfd, 0x6f, 0x0d, 0xba, 0x45, 0x41, 0x55, - 0xc5, 0xef, 0x9a, 0x81, 0x93, 0xc6, 0x87, 0x2e, 0x40, 0x87, 0x61, 0xd7, 0xb3, 0x82, 0xdd, 0x5d, - 0x4a, 0x58, 0x77, 0xa2, 0xa7, 0xad, 0x36, 0x4d, 0xe0, 0xa4, 0x27, 0x82, 0x82, 0xae, 0xc1, 0x8c, - 0x2d, 0x2b, 0xd9, 0x0a, 0xc9, 0x81, 0x4b, 0xb9, 0xe6, 0x96, 0x70, 0xec, 0x8c, 0x1d, 0x57, 0xb8, - 0x24, 0x23, 0x03, 0xa6, 0x5d, 0xe7, 0xd0, 0x12, 0x03, 0x44, 0xb4, 0x7f, 0x5b, 0x68, 0xeb, 0xb8, - 0xce, 0xe1, 0x17, 0xae, 0x47, 0xb6, 0xf9, 0x14, 0x78, 0x0e, 0xe7, 0x64, 0xf0, 0x5b, 0xbe, 0x1d, - 0x92, 0x3e, 0xf1, 0x19, 0xf6, 0x36, 0x83, 0xc1, 0x51, 0xad, 0x12, 0x58, 0x82, 0x36, 0x75, 0x7d, - 0x9b, 0x58, 0xbe, 0x1c, 0x43, 0x4d, 0xb3, 0x25, 0x7e, 0x3f, 0xa6, 0xc6, 0x7d, 0x38, 0x5f, 0xa1, - 0x57, 0x65, 0xf6, 0x22, 0x4c, 0x09, 0xc7, 0xec, 0xc0, 0x67, 0xc4, 0x67, 0x42, 0xf7, 0x94, 0xd9, - 0xe1, 0xb4, 0x4d, 0x49, 0x32, 0xfe, 0x0f, 0x48, 0xea, 0x78, 0x14, 0x44, 0x7e, 0xbd, 0xd6, 0x3c, - 0x0b, 0x73, 0x19, 0x11, 0x55, 0x1b, 0x77, 0x60, 0x5e, 0x92, 0x9f, 0xf9, 0xfd, 0xda, 0xba, 0x16, - 0xe1, 0x6c, 0x4e, 0x48, 0x69, 0xbb, 0x1d, 0x1b, 0xc9, 0x9e, 0x13, 0xc7, 0x2a, 0x5b, 0x88, 0x3d, - 0xc8, 0x1e, 0x15, 0xc6, 0xef, 0x1a, 0xcc, 0xc6, 0x63, 0xa4, 0x66, 0xd6, 0x4f, 0x58, 0x76, 0x8d, - 0xca, 0xb2, 0x6b, 0x0e, 0xcb, 0x6e, 0x15, 0x66, 0x68, 0x10, 0x85, 0x36, 0xb1, 0x1c, 0xcc, 0xb0, - 0xe5, 0x07, 0x0e, 0x51, 0x55, 0x79, 0x5a, 0xd2, 0x1f, 0x60, 0x86, 0x1f, 0x07, 0x0e, 0x31, 0x3e, - 0x8b, 0x3f, 0x4a, 0xe6, 0x6b, 0x5e, 0x83, 0x59, 0x0f, 0x53, 0x66, 0xe1, 0xc1, 0x80, 0xf8, 0x8e, - 0x85, 0x19, 0x2f, 0x09, 0x4d, 0x94, 0xc4, 0x69, 0xbe, 0x71, 0x4f, 0xd0, 0xef, 0xb1, 0xc7, 0xd4, - 0xf8, 0x59, 0x83, 0x33, 0x5c, 0x96, 0x97, 0x60, 0xad, 0x78, 0x67, 0xa0, 0x41, 0x0e, 0x99, 0x0a, - 0x94, 0x2f, 0xd1, 0x06, 0xcc, 0xa9, 0x5a, 0x77, 0x03, 0x7f, 0xd8, 0x06, 0x0d, 0x21, 0x88, 0x86, - 0x5b, 0x49, 0x27, 0x5c, 0x80, 0x0e, 0x65, 0xc1, 0x20, 0xee, 0xaa, 0xa6, 0xec, 0x2a, 0x4e, 0x92, - 0x5d, 0x65, 0x7c, 0x00, 0x33, 0x43, 0x9f, 0xea, 0x57, 0xe8, 0x5b, 0x2d, 0x1e, 0x3a, 0x4f, 0xb1, - 0xeb, 0x6d, 0x13, 0xdf, 0x21, 0xe1, 0x3b, 0x76, 0x0e, 0xba, 0x05, 0xf3, 0xae, 0xe3, 0x11, 0x8b, - 0xb9, 0x7d, 0x12, 0x44, 0xcc, 0xa2, 0xc4, 0x0e, 0x7c, 0x87, 0xc6, 0xd1, 0xf1, 0xbd, 0xa7, 0x72, - 0x6b, 0x5b, 0xee, 0x18, 0x3f, 0x25, 0x13, 0x2c, 0xed, 0xc5, 0xf0, 0x1c, 0xf6, 0x09, 0xe1, 0x0a, - 0xf7, 0x08, 0x76, 0x48, 0xa8, 0xc2, 0x98, 0x92, 0xc4, 0xaf, 0x04, 0x8d, 0xe7, 0x47, 0x31, 0xed, - 0x04, 0xce, 0x91, 0xf0, 0x68, 0xca, 0x04, 0x49, 0xba, 0x1f, 0x38, 0x47, 0x62, 0x94, 0x50, 0x4b, - 0x7c, 0x62, 0x7b, 0x2f, 0xf2, 0xf7, 0x85, 0x37, 0x6d, 0xb3, 0xe3, 0xd2, 0xaf, 0x31, 0x65, 0x9b, - 0x9c, 0x64, 0xfc, 0xa1, 0xc1, 0xd2, 0xd0, 0x0d, 0x93, 0xd8, 0xc4, 0x3d, 0xf8, 0x0f, 0xd2, 0xc1, - 0x25, 0x54, 0x2d, 0x67, 0x2e, 0x5d, 0xaa, 0xdc, 0x91, 0xdc, 0x53, 0x13, 0x5f, 0xec, 0x88, 0x03, - 0xbd, 0xc4, 0x71, 0xd5, 0xa2, 0x2f, 0xe3, 0x51, 0xf6, 0xd0, 0xde, 0xde, 0xc3, 0xa1, 0x43, 0xbf, - 0x24, 0x3e, 0x09, 0x31, 0x7b, 0x2f, 0xc7, 0xa4, 0xd1, 0x83, 0x95, 0x2a, 0xed, 0xca, 0xfe, 0x2f, - 0x49, 0x5e, 0x63, 0x96, 0xf7, 0x36, 0x2a, 0x96, 0x61, 0x92, 0x72, 0x8d, 0xe2, 0x6e, 0xdb, 0xe8, - 0x35, 0xb8, 0xb0, 0x20, 0x6c, 0x39, 0xf4, 0x04, 0x33, 0x21, 0xc9, 0x5f, 0xd6, 0x41, 0xe5, 0xff, - 0x5b, 0x0d, 0x96, 0xb3, 0xdb, 0xf5, 0xe7, 0x26, 0x5a, 0x83, 0x59, 0xba, 0x17, 0x44, 0x9e, 0x63, - 0x39, 0x42, 0xc8, 0x22, 0xf6, 0xa1, 0x08, 0xa4, 0x6d, 0x9e, 0x91, 0x1b, 0x52, 0xd9, 0x43, 0xfb, - 0xf0, 0xd8, 0x68, 0x8c, 0x95, 0xf8, 0xa0, 0xcb, 0x3b, 0xa1, 0xbc, 0x3c, 0xc8, 0xc7, 0x50, 0xfb, - 0xd0, 0x79, 0xa7, 0x2c, 0x8b, 0x3b, 0x5f, 0x99, 0x5d, 0xe5, 0xd6, 0x8b, 0xbc, 0xdb, 0x27, 0x38, - 0xc1, 0x8e, 0x37, 0x7c, 0x21, 0x5f, 0xd6, 0xf9, 0x63, 0xee, 0xc7, 0x78, 0xaa, 0x28, 0x06, 0x93, - 0x60, 0xa7, 0x76, 0x37, 0x2b, 0xb3, 0x22, 0x1b, 0xd3, 0x66, 0x4b, 0x59, 0xe5, 0xb0, 0x45, 0xcd, - 0x60, 0x79, 0x1f, 0x54, 0xbf, 0x32, 0x00, 0xa5, 0xa1, 0x00, 0xca, 0x46, 0xae, 0xec, 0xa5, 0x7d, - 0x35, 0xd6, 0x10, 0x34, 0x79, 0x55, 0xaa, 0x69, 0x26, 0xd6, 0xc6, 0x27, 0xb0, 0xcc, 0x79, 0xa4, - 0x90, 0xb8, 0xe1, 0xd4, 0xbf, 0x05, 0xfe, 0x79, 0x0a, 0xce, 0x95, 0x0b, 0xd7, 0xb9, 0x09, 0x7e, - 0x0a, 0x7a, 0x72, 0xd3, 0xe2, 0x83, 0x8a, 0x32, 0xdc, 0x1f, 0x24, 0xa3, 0x4a, 0x4e, 0xb4, 0x45, - 0x75, 0xed, 0x7a, 0x1a, 0xef, 0xc7, 0xf3, 0xaa, 0x70, 0x4d, 0x6b, 0x14, 0xae, 0x69, 0xdc, 0x80, - 0x83, 0x59, 0x95, 0x01, 0x79, 0x9e, 0x2d, 0x3a, 0x98, 0x55, 0x19, 0x48, 0x84, 0x85, 0x81, 0x71, - 0x69, 0x40, 0xf1, 0x0b, 0x03, 0xe7, 0x01, 0xd4, 0x61, 0x17, 0xf9, 0xf1, 0xb5, 0x73, 0x52, 0x1e, - 0x75, 0x91, 0x5f, 0x79, 0xe2, 0xb6, 0x2a, 0x4f, 0xdc, 0x6c, 0x4f, 0xb4, 0x0b, 0x63, 0xef, 0x05, - 0xc0, 0x03, 0x97, 0xee, 0xcb, 0x24, 0xf3, 0x23, 0xde, 0x71, 0x43, 0x85, 0x5b, 0xf8, 0x92, 0x53, - 0xb0, 0xe7, 0xa9, 0xd4, 0xf1, 0x25, 0xff, 0xe2, 0x11, 0x25, 0x8e, 0xca, 0x8e, 0x58, 0x73, 0xda, - 0x6e, 0x48, 0x88, 0x4a, 0x80, 0x58, 0x1b, 0xbf, 0x6a, 0x30, 0xf9, 0x88, 0xf4, 0x95, 0xe6, 0x15, - 0x80, 0x57, 0x41, 0x18, 0x44, 0xcc, 0xf5, 0x89, 0xbc, 0x91, 0x8c, 0x9b, 0x29, 0xca, 0xbf, 0xb7, - 0x23, 0x4a, 0x96, 0x78, 0xbb, 0x2a, 0x99, 0x62, 0xcd, 0x69, 0x7b, 0x04, 0x0f, 0x54, 0xfe, 0xc4, - 0x9a, 0x23, 0x72, 0xca, 0xb0, 0xbd, 0x2f, 0x92, 0xd5, 0x34, 0xe5, 0x8f, 0xdb, 0x7f, 0xcd, 0xc1, - 0x54, 0xfa, 0x0c, 0x42, 0x2f, 0xa1, 0x93, 0x7a, 0x4a, 0x40, 0x97, 0x8b, 0x2f, 0x06, 0xc5, 0xa7, - 0x09, 0xfd, 0x7f, 0x23, 0xb8, 0x54, 0x27, 0x8f, 0x21, 0x1f, 0x66, 0x0b, 0x50, 0x1d, 0xad, 0x15, - 0xa5, 0xab, 0x1e, 0x02, 0xf4, 0xeb, 0xb5, 0x78, 0x13, 0x7b, 0x0c, 0xe6, 0x4a, 0xb0, 0x37, 0xba, - 0x31, 0x42, 0x4b, 0x06, 0xff, 0xeb, 0x37, 0x6b, 0x72, 0x27, 0x56, 0x5f, 0x03, 0x2a, 0x02, 0x73, - 0x74, 0x7d, 0xa4, 0x9a, 0x21, 0xf0, 0xd7, 0x6f, 0xd4, 0x63, 0xae, 0x0c, 0x54, 0x42, 0xf6, 0x91, - 0x81, 0x66, 0x1e, 0x05, 0x46, 0x06, 0x9a, 0x7b, 0x07, 0x18, 0x43, 0xfb, 0x30, 0x93, 0x87, 0xf3, - 0xe8, 0x5a, 0xd5, 0x1b, 0x53, 0xe1, 0xb5, 0x40, 0x5f, 0xab, 0xc3, 0x9a, 0x18, 0x23, 0x70, 0x3a, - 0x0b, 0xb9, 0xd1, 0xd5, 0xa2, 0x7c, 0xe9, 0x03, 0x82, 0xbe, 0x3a, 0x9a, 0x31, 0x1d, 0x53, 0x1e, - 0x86, 0x97, 0xc5, 0x54, 0x81, 0xf1, 0xcb, 0x62, 0xaa, 0x42, 0xf5, 0xc6, 0x18, 0xfa, 0x3e, 0xc6, - 0x76, 0x39, 0x78, 0x8a, 0xd6, 0xab, 0xd4, 0x94, 0xe3, 0x63, 0x7d, 0xa3, 0x36, 0x7f, 0x6c, 0xfb, - 0x96, 0xc6, 0x7b, 0x3d, 0x85, 0x52, 0xcb, 0x7a, 0xbd, 0x88, 0x7b, 0xcb, 0x7a, 0xbd, 0x0c, 0xea, - 0x8e, 0xa1, 0x1d, 0x98, 0xce, 0xe0, 0x56, 0x74, 0xa5, 0x4a, 0x32, 0x7b, 0x97, 0xd0, 0xaf, 0x8e, - 0xe4, 0x4b, 0x6c, 0x58, 0xf1, 0xf4, 0x52, 0xe3, 0xaa, 0xd2, 0xb9, 0xec, 0xbc, 0xba, 0x32, 0x8a, - 0x2d, 0x31, 0xf0, 0x0d, 0xc0, 0x10, 0x66, 0xa2, 0x4b, 0x55, 0x72, 0xe9, 0x4f, 0x71, 0xf9, 0x78, - 0xa6, 0x44, 0xf5, 0x77, 0x30, 0x5f, 0x76, 0xd2, 0xa3, 0x92, 0x2e, 0x3c, 0xe6, 0x3a, 0xa1, 0xaf, - 0xd7, 0x65, 0x4f, 0x0c, 0x3f, 0x83, 0x76, 0x0c, 0x32, 0xd1, 0xc5, 0xa2, 0x74, 0x0e, 0x14, 0xeb, - 0xc6, 0x71, 0x2c, 0xa9, 0x6a, 0xea, 0xc7, 0x8d, 0x33, 0x44, 0x7f, 0xd5, 0x8d, 0x53, 0xc0, 0xa9, - 0xd5, 0x8d, 0x53, 0x04, 0x93, 0xc2, 0xdc, 0xeb, 0xf8, 0x01, 0x20, 0x0d, 0x96, 0x4a, 0x87, 0x6c, - 0x15, 0x16, 0x2c, 0x1d, 0xb2, 0xd5, 0xf8, 0x6b, 0x0c, 0xfd, 0x00, 0x0b, 0xe5, 0x18, 0x09, 0x55, - 0xb6, 0x5f, 0x05, 0x56, 0xd3, 0x6f, 0xd5, 0x17, 0xc8, 0x1c, 0x2b, 0x05, 0x78, 0x53, 0x1d, 0x71, - 0x09, 0x4a, 0xab, 0x8e, 0xb8, 0x14, 0x31, 0x89, 0x1a, 0x2d, 0x43, 0x2b, 0x65, 0x35, 0x7a, 0x0c, - 0xb4, 0xd2, 0xd7, 0xeb, 0xb2, 0x67, 0xce, 0xb3, 0x22, 0x1c, 0x41, 0x23, 0xfd, 0xcf, 0x8c, 0xaa, - 0x9b, 0x35, 0xb9, 0x13, 0xab, 0x6f, 0xe2, 0x71, 0x9c, 0xc3, 0x22, 0x68, 0x64, 0x00, 0xb9, 0x11, - 0xb6, 0x51, 0x9b, 0x3f, 0xb1, 0x3d, 0x88, 0x1f, 0xe0, 0x52, 0x30, 0x03, 0xad, 0x8d, 0xd0, 0x93, - 0xc2, 0x42, 0xfa, 0xf5, 0x5a, 0xbc, 0xc3, 0x0e, 0xda, 0x99, 0x10, 0xff, 0x3f, 0xdd, 0xf9, 0x27, - 0x00, 0x00, 0xff, 0xff, 0x67, 0x03, 0x26, 0xea, 0x96, 0x1a, 0x00, 0x00, + // 1795 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x19, 0x4d, 0x73, 0xd4, 0xc8, + 0xd5, 0x62, 0x66, 0x3c, 0xe3, 0x37, 0x63, 0xb0, 0xdb, 0xc6, 0x1e, 0xcb, 0xd8, 0x18, 0x41, 0xc0, + 0x18, 0xb0, 0x09, 0x54, 0x12, 0x92, 0x1c, 0x12, 0x30, 0x4e, 0x42, 0x25, 0x40, 0x95, 0x0c, 0x14, + 0x29, 0xa8, 0x52, 0xb5, 0xa5, 0x36, 0x56, 0x59, 0x23, 0x09, 0x75, 0xcb, 0x60, 0x2a, 0xd9, 0xcb, + 0xee, 0x3f, 0xd9, 0xdb, 0x1e, 0xf6, 0xba, 0x3f, 0x60, 0xff, 0xc2, 0xfe, 0x8d, 0x3d, 0xee, 0x69, + 0x2f, 0x5b, 0xfd, 0x21, 0xcd, 0xe8, 0xcb, 0x23, 0xaf, 0xa9, 0xda, 0x9b, 0xe6, 0xf5, 0xfb, 0xea, + 0xd7, 0xef, 0x7b, 0x60, 0xee, 0x28, 0xf0, 0xe2, 0x01, 0xb1, 0x28, 0x89, 0x8e, 0x48, 0xb4, 0x19, + 0x46, 0x01, 0x0b, 0xd0, 0x4c, 0x06, 0x68, 0x85, 0x7b, 0xc6, 0x16, 0xa0, 0x47, 0x98, 0xd9, 0x07, + 0x8f, 0x89, 0x47, 0x18, 0x31, 0xc9, 0xfb, 0x98, 0x50, 0x86, 0x96, 0xa0, 0xb3, 0xef, 0x7a, 0xc4, + 0x72, 0x1d, 0xda, 0xd7, 0xd6, 0x1a, 0xeb, 0x53, 0x66, 0x9b, 0xff, 0x7e, 0xe2, 0x50, 0xe3, 0x39, + 0xcc, 0x65, 0x08, 0x68, 0x18, 0xf8, 0x94, 0xa0, 0x07, 0xd0, 0x8e, 0x08, 0x8d, 0x3d, 0x26, 0x09, + 0xba, 0xf7, 0x56, 0x37, 0xf3, 0xb2, 0x36, 0x53, 0x92, 0xd8, 0x63, 0x66, 0x82, 0x6e, 0xb8, 0xd0, + 0x1b, 0x3d, 0x40, 0x8b, 0xd0, 0x56, 0xb2, 0xfb, 0xda, 0x9a, 0xb6, 0x3e, 0x65, 0x4e, 0x4a, 0xd1, + 0x68, 0x01, 0x26, 0x29, 0xc3, 0x2c, 0xa6, 0xfd, 0x73, 0x6b, 0xda, 0x7a, 0xcb, 0x54, 0xbf, 0xd0, + 0x3c, 0xb4, 0x48, 0x14, 0x05, 0x51, 0xbf, 0x21, 0xd0, 0xe5, 0x0f, 0x84, 0xa0, 0x49, 0xdd, 0x4f, + 0xa4, 0xdf, 0x5c, 0xd3, 0xd6, 0xa7, 0x4d, 0xf1, 0x6d, 0xb4, 0xa1, 0xb5, 0x33, 0x08, 0xd9, 0xb1, + 0xf1, 0x27, 0xe8, 0xbf, 0xc2, 0x76, 0x1c, 0x0f, 0x5e, 0x09, 0x1d, 0xb7, 0x0f, 0x88, 0x7d, 0x98, + 0xdc, 0x7d, 0x19, 0xa6, 0x94, 0xe6, 0x4a, 0x83, 0x69, 0xb3, 0x23, 0x01, 0x4f, 0x1c, 0xe3, 0xef, + 0xb0, 0x54, 0x42, 0xa8, 0x6c, 0x70, 0x15, 0xa6, 0xdf, 0xe1, 0x68, 0x0f, 0xbf, 0x23, 0x56, 0x84, + 0x99, 0x1b, 0x08, 0x6a, 0xcd, 0xec, 0x29, 0xa0, 0xc9, 0x61, 0xc6, 0x1b, 0xd0, 0x33, 0x1c, 0x82, + 0x41, 0x88, 0x6d, 0x56, 0x47, 0x38, 0x5a, 0x83, 0x6e, 0x18, 0x11, 0xec, 0x79, 0x81, 0x8d, 0x19, + 0x11, 0x56, 0x68, 0x98, 0xa3, 0x20, 0x63, 0x05, 0x96, 0x4b, 0x99, 0x4b, 0x05, 0x8d, 0x07, 0x39, + 0xed, 0x83, 0xc1, 0xc0, 0xad, 0x25, 0xda, 0xb8, 0x54, 0xd0, 0x5a, 0x50, 0x2a, 0xbe, 0x7f, 0xce, + 0x9d, 0x7a, 0x04, 0xfb, 0x71, 0x58, 0x8b, 0x71, 0x5e, 0xe3, 0x84, 0x34, 0xe5, 0xbc, 0x28, 0x9d, + 0x63, 0x3b, 0xf0, 0x3c, 0x62, 0x33, 0x37, 0xf0, 0x13, 0xb6, 0xab, 0x00, 0x76, 0x0a, 0x54, 0xae, + 0x32, 0x02, 0x31, 0x74, 0xe8, 0x17, 0x49, 0x15, 0xdb, 0x6f, 0x34, 0xb8, 0xf8, 0x50, 0x19, 0x4d, + 0x0a, 0xae, 0xf5, 0x00, 0x59, 0x91, 0xe7, 0xf2, 0x22, 0xf3, 0x0f, 0xd4, 0x28, 0x3c, 0x10, 0xc7, + 0x88, 0x48, 0xe8, 0xb9, 0x36, 0x16, 0x2c, 0x9a, 0x82, 0xc5, 0x28, 0x08, 0xcd, 0x40, 0x83, 0x31, + 0xaf, 0xdf, 0x12, 0x27, 0xfc, 0xd3, 0xe8, 0xc3, 0x42, 0x5e, 0x57, 0x75, 0x8d, 0x3f, 0xc2, 0xa2, + 0x84, 0xec, 0x1e, 0xfb, 0xf6, 0xae, 0x88, 0x86, 0x5a, 0x46, 0xff, 0x59, 0x83, 0x7e, 0x91, 0x50, + 0x79, 0xf1, 0x59, 0x2d, 0x70, 0xda, 0xfb, 0xa1, 0xcb, 0xd0, 0x65, 0xd8, 0xf5, 0xac, 0x60, 0x7f, + 0x9f, 0x12, 0xd6, 0x9f, 0x5c, 0xd3, 0xd6, 0x9b, 0x26, 0x70, 0xd0, 0x73, 0x01, 0x41, 0x37, 0x61, + 0xc6, 0x96, 0x9e, 0x6c, 0x45, 0xe4, 0xc8, 0xa5, 0x9c, 0x73, 0x5b, 0x28, 0x76, 0xc1, 0x4e, 0x3c, + 0x5c, 0x82, 0x91, 0x01, 0xd3, 0xae, 0xf3, 0xd1, 0x12, 0x09, 0x44, 0x84, 0x7f, 0x47, 0x70, 0xeb, + 0xba, 0xce, 0xc7, 0x7f, 0xb8, 0x1e, 0xd9, 0xe5, 0x59, 0xe0, 0x15, 0x5c, 0x92, 0x97, 0x7f, 0xe2, + 0xdb, 0x11, 0x19, 0x10, 0x9f, 0x61, 0x6f, 0x3b, 0x08, 0x8f, 0x6b, 0xb9, 0xc0, 0x12, 0x74, 0xa8, + 0xeb, 0xdb, 0xc4, 0xf2, 0x65, 0x1a, 0x6a, 0x9a, 0x6d, 0xf1, 0xfb, 0x19, 0x35, 0x1e, 0xc1, 0x4a, + 0x05, 0x5f, 0x65, 0xd9, 0x2b, 0xd0, 0x13, 0x8a, 0xd9, 0x81, 0xcf, 0x88, 0xcf, 0x04, 0xef, 0x9e, + 0xd9, 0xe5, 0xb0, 0x6d, 0x09, 0x32, 0x7e, 0x0f, 0x48, 0xf2, 0x78, 0x1a, 0xc4, 0x7e, 0xbd, 0xd0, + 0xbc, 0x08, 0x73, 0x19, 0x12, 0xe5, 0x1b, 0xf7, 0x61, 0x5e, 0x82, 0x5f, 0xfa, 0x83, 0xda, 0xbc, + 0x16, 0xe1, 0x62, 0x8e, 0x48, 0x71, 0xbb, 0x97, 0x08, 0xc9, 0xd6, 0x89, 0x13, 0x99, 0x2d, 0x24, + 0x1a, 0x64, 0x4b, 0x85, 0xf1, 0xad, 0x06, 0xb3, 0x49, 0x1a, 0xa9, 0x69, 0xf5, 0x53, 0xba, 0x5d, + 0xa3, 0xd2, 0xed, 0x9a, 0x43, 0xb7, 0x5b, 0x87, 0x19, 0x1a, 0xc4, 0x91, 0x4d, 0x2c, 0x07, 0x33, + 0x6c, 0xf9, 0x81, 0x43, 0x94, 0x57, 0x9e, 0x97, 0xf0, 0xc7, 0x98, 0xe1, 0x67, 0x81, 0x43, 0x8c, + 0xbf, 0x25, 0x8f, 0x92, 0x79, 0xcd, 0x9b, 0x30, 0xeb, 0x61, 0xca, 0x2c, 0x1c, 0x86, 0xc4, 0x77, + 0x2c, 0xcc, 0xb8, 0x4b, 0x68, 0xc2, 0x25, 0xce, 0xf3, 0x83, 0x87, 0x02, 0xfe, 0x90, 0x3d, 0xa3, + 0xc6, 0x0f, 0x1a, 0x5c, 0xe0, 0xb4, 0xdc, 0x05, 0x6b, 0xdd, 0x77, 0x06, 0x1a, 0xe4, 0x23, 0x53, + 0x17, 0xe5, 0x9f, 0x68, 0x0b, 0xe6, 0x94, 0xaf, 0xbb, 0x81, 0x3f, 0x0c, 0x83, 0x86, 0x20, 0x44, + 0xc3, 0xa3, 0x34, 0x12, 0x2e, 0x43, 0x97, 0xb2, 0x20, 0x4c, 0xa2, 0xaa, 0x29, 0xa3, 0x8a, 0x83, + 0x54, 0x54, 0x65, 0x6d, 0xda, 0x2a, 0xb1, 0x69, 0xcf, 0xa5, 0x16, 0xb1, 0x2d, 0xa9, 0x95, 0x88, + 0xcb, 0x8e, 0x09, 0x2e, 0xdd, 0xb1, 0xa5, 0x35, 0x8c, 0x3f, 0xc0, 0xcc, 0xf0, 0x56, 0xf5, 0x7d, + 0xfc, 0x4b, 0x2d, 0x49, 0x5b, 0x2f, 0xb0, 0xeb, 0xed, 0x12, 0xdf, 0x21, 0xd1, 0x19, 0x63, 0x0f, + 0xdd, 0x85, 0x79, 0xd7, 0xf1, 0x88, 0xc5, 0xdc, 0x01, 0x09, 0x62, 0x66, 0x51, 0x62, 0x07, 0xbe, + 0x43, 0x13, 0xfb, 0xf0, 0xb3, 0x17, 0xf2, 0x68, 0x57, 0x9e, 0x18, 0x5f, 0xa5, 0x39, 0x70, 0x54, + 0x8b, 0x61, 0x25, 0xf7, 0x09, 0xe1, 0x0c, 0x0f, 0x08, 0x76, 0x48, 0xa4, 0xae, 0xd1, 0x93, 0xc0, + 0x7f, 0x09, 0x18, 0xb7, 0xb0, 0x42, 0xda, 0x0b, 0x9c, 0x63, 0xa1, 0x51, 0xcf, 0x04, 0x09, 0x7a, + 0x14, 0x38, 0xc7, 0x22, 0x19, 0x51, 0x4b, 0x38, 0x89, 0x7d, 0x10, 0xfb, 0x87, 0x42, 0x9b, 0x8e, + 0xd9, 0x75, 0xe9, 0x7f, 0x30, 0x65, 0xdb, 0x1c, 0x64, 0x7c, 0xa7, 0xc1, 0xd2, 0x50, 0x0d, 0x93, + 0xd8, 0xc4, 0x3d, 0xfa, 0x0d, 0xcc, 0xc1, 0x29, 0x54, 0x34, 0x64, 0xda, 0x36, 0x15, 0x30, 0x48, + 0x9e, 0xa9, 0x9a, 0x21, 0x4e, 0x44, 0x4b, 0x50, 0xa2, 0xb8, 0x0a, 0xf2, 0xb7, 0x49, 0x32, 0xdc, + 0xb1, 0x77, 0x0f, 0x70, 0xe4, 0xd0, 0x7f, 0x12, 0x9f, 0x44, 0x98, 0x7d, 0x96, 0x42, 0x6b, 0xac, + 0xc1, 0x6a, 0x15, 0x77, 0x25, 0xff, 0x4d, 0x92, 0xe4, 0x13, 0x0c, 0x93, 0xec, 0xc5, 0xae, 0xe7, + 0x7c, 0x16, 0xf1, 0xff, 0xce, 0x5f, 0x2e, 0x65, 0xae, 0xfc, 0x67, 0x03, 0x66, 0x23, 0x01, 0x62, + 0x16, 0xe5, 0x08, 0x69, 0x23, 0x3d, 0x6d, 0x5e, 0x50, 0x07, 0x82, 0x90, 0x37, 0xd4, 0xdf, 0xa7, + 0x1e, 0x90, 0x70, 0xfb, 0x6c, 0x69, 0x71, 0x19, 0xa6, 0x86, 0xe2, 0x1b, 0x42, 0x7c, 0x87, 0x2a, + 0xb9, 0xdc, 0x3b, 0xed, 0x20, 0x3c, 0xb6, 0x88, 0x2d, 0xeb, 0xa5, 0x78, 0xea, 0x8e, 0xd9, 0xe5, + 0xc0, 0x1d, 0x5b, 0x94, 0xcb, 0x53, 0xe4, 0xc8, 0xd4, 0x1b, 0xb2, 0x97, 0x50, 0xaf, 0xf1, 0x01, + 0x96, 0xb3, 0xa7, 0xf5, 0xcb, 0xc8, 0x99, 0x2e, 0x69, 0xac, 0xe6, 0xdd, 0x20, 0x57, 0x8b, 0x8e, + 0xf2, 0x6a, 0xd7, 0xae, 0xbb, 0x67, 0xd3, 0x6b, 0x25, 0x6f, 0x90, 0x6c, 0xf1, 0x7e, 0x9d, 0x57, + 0xfb, 0x14, 0x45, 0xfc, 0x64, 0xc1, 0x97, 0xf3, 0xae, 0x9b, 0xaf, 0xf4, 0x5f, 0x24, 0x69, 0x51, + 0x21, 0x98, 0x04, 0x3b, 0xb5, 0xd3, 0x91, 0x12, 0x2b, 0xac, 0x31, 0x6d, 0xb6, 0x95, 0x54, 0x3e, + 0xb9, 0xa9, 0x32, 0x24, 0x5b, 0x62, 0xf5, 0x2b, 0x33, 0xa3, 0x35, 0xd4, 0x8c, 0xb6, 0x95, 0x8b, + 0x06, 0x29, 0x5f, 0xc5, 0x15, 0x82, 0x26, 0x77, 0x44, 0x95, 0x8e, 0xc5, 0xb7, 0xf1, 0x17, 0x58, + 0xe6, 0x38, 0x92, 0x48, 0x34, 0x79, 0xf5, 0x1b, 0xe1, 0x1f, 0xcf, 0xc1, 0xa5, 0x72, 0xe2, 0x3a, + 0xcd, 0xf0, 0x5f, 0x41, 0x4f, 0x9b, 0x4d, 0x9e, 0x69, 0x29, 0xc3, 0x83, 0x30, 0xcd, 0xb5, 0x32, + 0x25, 0x2f, 0xaa, 0xce, 0xf3, 0x45, 0x72, 0x9e, 0x24, 0xdc, 0x42, 0xa7, 0xda, 0x28, 0x74, 0xaa, + 0x5c, 0x80, 0x83, 0x59, 0x95, 0x00, 0x59, 0xd2, 0x17, 0x1d, 0xcc, 0xaa, 0x04, 0xa4, 0xc4, 0x42, + 0x40, 0x4b, 0x0a, 0x50, 0xf8, 0x42, 0xc0, 0x0a, 0x80, 0xaa, 0xd6, 0xb1, 0x9f, 0x74, 0xde, 0x53, + 0xb2, 0x56, 0xc7, 0x7e, 0x65, 0xd3, 0xd1, 0xae, 0x6c, 0x3a, 0xb2, 0x31, 0xd1, 0x29, 0x24, 0xce, + 0xd7, 0x00, 0x8f, 0x5d, 0x7a, 0x28, 0x8d, 0xcc, 0xbb, 0x1c, 0xc7, 0x8d, 0xd4, 0xe8, 0xc6, 0x3f, + 0x39, 0x04, 0x7b, 0x9e, 0x32, 0x1d, 0xff, 0xe4, 0x2f, 0x1e, 0x53, 0xe2, 0x28, 0xeb, 0x88, 0x6f, + 0x0e, 0xdb, 0x8f, 0x08, 0x51, 0x06, 0x10, 0xdf, 0xc6, 0xd7, 0x1a, 0x4c, 0x3d, 0x25, 0x03, 0xc5, + 0x79, 0x15, 0xe0, 0x5d, 0x10, 0x05, 0x31, 0x73, 0x7d, 0x22, 0x9b, 0xb2, 0x96, 0x39, 0x02, 0xf9, + 0xf5, 0x72, 0x84, 0xcb, 0x12, 0x6f, 0x5f, 0x19, 0x53, 0x7c, 0x73, 0xd8, 0x01, 0xc1, 0xa1, 0xb2, + 0x9f, 0xf8, 0x46, 0xf3, 0xd0, 0xa2, 0x0c, 0xdb, 0x87, 0xc2, 0x58, 0x4d, 0x53, 0xfe, 0xb8, 0xf7, + 0xd3, 0x3c, 0xf4, 0x46, 0x8b, 0x28, 0x7a, 0x0b, 0xdd, 0x91, 0x6d, 0x0a, 0xba, 0x56, 0x5c, 0x9a, + 0x14, 0xb7, 0x33, 0xfa, 0xef, 0xc6, 0x60, 0xa9, 0x48, 0x9e, 0x40, 0x3e, 0xcc, 0x16, 0xb6, 0x15, + 0x68, 0xa3, 0x48, 0x5d, 0xb5, 0x0b, 0xd1, 0x6f, 0xd5, 0xc2, 0x4d, 0xe5, 0x31, 0x98, 0x2b, 0x59, + 0x3f, 0xa0, 0xdb, 0x63, 0xb8, 0x64, 0x56, 0x20, 0xfa, 0x9d, 0x9a, 0xd8, 0xa9, 0xd4, 0xf7, 0x80, + 0x8a, 0xbb, 0x09, 0x74, 0x6b, 0x2c, 0x9b, 0xe1, 0xee, 0x43, 0xbf, 0x5d, 0x0f, 0xb9, 0xf2, 0xa2, + 0x72, 0x6b, 0x31, 0xf6, 0xa2, 0x99, 0xbd, 0xc8, 0xd8, 0x8b, 0xe6, 0x56, 0x21, 0x13, 0xe8, 0x10, + 0x66, 0xf2, 0x1b, 0x0d, 0x74, 0xb3, 0x6a, 0xcd, 0x56, 0x58, 0x98, 0xe8, 0x1b, 0x75, 0x50, 0x53, + 0x61, 0x04, 0xce, 0x67, 0xb7, 0x0e, 0xe8, 0x46, 0x91, 0xbe, 0x74, 0x87, 0xa2, 0xaf, 0x8f, 0x47, + 0x1c, 0xbd, 0x53, 0x7e, 0x13, 0x51, 0x76, 0xa7, 0x8a, 0x35, 0x47, 0xd9, 0x9d, 0xaa, 0x16, 0x1b, + 0xc6, 0x04, 0xfa, 0x5f, 0x32, 0xde, 0xe6, 0x26, 0x74, 0xb4, 0x59, 0xc5, 0xa6, 0x7c, 0x45, 0xa0, + 0x6f, 0xd5, 0xc6, 0x4f, 0x64, 0xdf, 0xd5, 0x78, 0xac, 0x8f, 0x0c, 0xea, 0x65, 0xb1, 0x5e, 0x1c, + 0xfd, 0xcb, 0x62, 0xbd, 0x6c, 0xda, 0x9f, 0x40, 0x7b, 0x30, 0x9d, 0x19, 0xdd, 0xd1, 0xf5, 0x2a, + 0xca, 0x6c, 0x2f, 0xa1, 0xdf, 0x18, 0x8b, 0x97, 0xca, 0xb0, 0x92, 0xec, 0xa5, 0xd2, 0x55, 0xa5, + 0x72, 0xd9, 0x7c, 0x75, 0x7d, 0x1c, 0x5a, 0x2a, 0xe0, 0xbf, 0x00, 0xc3, 0x49, 0x1b, 0x5d, 0xad, + 0xa2, 0x1b, 0x7d, 0x8a, 0x6b, 0x27, 0x23, 0xa5, 0xac, 0x3f, 0xc0, 0x7c, 0x59, 0xa5, 0x47, 0x25, + 0x51, 0x78, 0x42, 0x3b, 0xa1, 0x6f, 0xd6, 0x45, 0x4f, 0x05, 0xbf, 0x84, 0x4e, 0x32, 0x25, 0xa3, + 0x2b, 0x45, 0xea, 0xdc, 0x5e, 0x40, 0x37, 0x4e, 0x42, 0x19, 0xf1, 0xa6, 0x41, 0x12, 0x38, 0xc3, + 0xf1, 0xb5, 0x3a, 0x70, 0x0a, 0x83, 0x76, 0x75, 0xe0, 0x14, 0xa7, 0x61, 0x21, 0xee, 0x7d, 0xb2, + 0x03, 0x19, 0x9d, 0xf6, 0x4a, 0x93, 0x6c, 0xd5, 0x30, 0x5b, 0x9a, 0x64, 0xab, 0x07, 0xc8, 0x09, + 0xf4, 0x7f, 0x58, 0x28, 0x1f, 0xf2, 0x50, 0x65, 0xf8, 0x55, 0x0c, 0x9b, 0xfa, 0xdd, 0xfa, 0x04, + 0xa9, 0xf8, 0x4f, 0x49, 0xb2, 0xc8, 0x0d, 0x79, 0xd5, 0xc9, 0xa2, 0x7c, 0xd4, 0xd4, 0xb7, 0x6a, + 0xe3, 0x67, 0x4a, 0x5a, 0x61, 0x9a, 0xaa, 0xb6, 0x76, 0xc9, 0xe0, 0x58, 0x6d, 0xed, 0xd2, 0x01, + 0x4d, 0xc4, 0x47, 0xd9, 0xa4, 0x54, 0x16, 0x1f, 0x27, 0x8c, 0x72, 0xfa, 0x66, 0x5d, 0xf4, 0x4c, + 0x2d, 0x2d, 0x8e, 0x42, 0x68, 0xac, 0xfe, 0x99, 0x34, 0x79, 0xa7, 0x26, 0x76, 0xf5, 0xeb, 0x26, + 0x69, 0x73, 0xec, 0x05, 0x72, 0xe9, 0x73, 0xab, 0x36, 0x7e, 0x2a, 0x3b, 0x4c, 0xf6, 0x9f, 0x23, + 0x23, 0x0e, 0xda, 0x18, 0xc3, 0x67, 0x64, 0x0e, 0xd3, 0x6f, 0xd5, 0xc2, 0x1d, 0x46, 0xef, 0xde, + 0xa4, 0xf8, 0xfb, 0xef, 0xfe, 0x2f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x04, 0x0d, 0xa3, 0x5a, 0x15, + 0x1c, 0x00, 0x00, } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 20778b71f..393ea357d 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -53,11 +53,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil { return err } - if err := vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil { return err } @@ -92,7 +92,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } -func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, vid uint32, +func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, compactRevision uint32, stopOffset uint64, baseFileName, ext string) error { copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ @@ -100,6 +100,8 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb. Ext: ext, CompactionRevision: compactRevision, StopOffset: stopOffset, + Collection: collection, + IsEcVolume: isEcVolume, }) if err != nil { return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) @@ -107,7 +109,7 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb. err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond)) if err != nil { - return fmt.Errorf("failed to copy volume %d %s file: %v", vid, ext, err) + return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } return nil @@ -185,18 +187,27 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se // The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { - v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) - if v == nil { - return fmt.Errorf("not found volume id %d", req.VolumeId) - } + var fileName string + if !req.IsEcVolume { + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } - if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { - return fmt.Errorf("volume %d is compacted", req.VolumeId) + if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { + return fmt.Errorf("volume %d is compacted", req.VolumeId) + } + fileName = v.FileName() + req.Ext + } else { + ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + if !found { + return fmt.Errorf("not found ec volume id %d", req.VolumeId) + } + fileName = ecv.FileName() + req.Ext } bytesToRead := int64(req.StopOffset) - var fileName = v.FileName() + req.Ext file, err := os.Open(fileName) if err != nil { return err @@ -235,3 +246,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return nil } + +func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) { + +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index a2ba10323..da2146ccb 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" "os" + "path" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -13,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) /* @@ -54,6 +58,30 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } +// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { + + baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + + var rebuiltShardIds []uint32 + + for _, location := range vs.store.Locations { + if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { + // write .ec01 ~ .ec14 files + if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) + } else { + rebuiltShardIds = generatedShardIds + } + break + } + } + + return &volume_server_pb.VolumeEcShardsRebuildResponse{ + RebuiltShardIds: rebuiltShardIds, + }, nil +} + // VolumeEcShardsCopy copy the .ecx and some ec data slices func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { @@ -62,22 +90,26 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return nil, fmt.Errorf("no space left") } - baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId)) + baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // copy ecx file - if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { - return err - } - // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { return err } } + if !req.CopyEcxFile { + return nil + } + + // copy ecx file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { + return err + } + return nil }) if err != nil { @@ -87,65 +119,43 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil } -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed +// the shard should not be mounted before calling this. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { - foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req) - if err != nil { - return nil, err - } - - if !foundExistingVolume { - err = vs.doDeleteMountedShards(ctx, req) - } + baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) - return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err -} - -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume -func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) { - - v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) - if v == nil { - return false, nil + for _, shardId := range req.ShardIds { + os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) } - baseFileName := v.FileName() + // check whether to delete the ecx file also + hasEcxFile := false + existingShardCount := 0 - for _, shardId := range req.ShardIds { - if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil { - return true, err + for _, location := range vs.store.Locations { + fileInfos, err := ioutil.ReadDir(location.Directory) + if err != nil { + continue } - } - - if req.ShouldDeleteEcx { - if err := os.Remove(baseFileName + ".ecx"); err != nil { - return true, err + for _, fileInfo := range fileInfos { + if fileInfo.Name() == baseFilename+".ecx" { + hasEcxFile = true + continue + } + if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") { + existingShardCount++ + } } } - return true, nil -} - -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume -func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) { - - ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) - if !found { - return fmt.Errorf("volume %d not found", req.VolumeId) - } - - for _, shardId := range req.ShardIds { - if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found { - shard.Destroy() + if hasEcxFile && existingShardCount == 0 { + if err := os.Remove(baseFilename + ".ecx"); err != nil { + return nil, err } } - if len(ecv.Shards) == 0 { - vs.store.DestroyEcVolume(needle.VolumeId(req.VolumeId)) - } - - return nil + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil } func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 7118f2195..0934c79fd 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -56,9 +56,9 @@ func (c *commandEcBalance) Help() string { func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - collection := balanceCommand.String("c", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") - applyBalancing := balanceCommand.Bool("f", false, "apply the balancing plan") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") if err = balanceCommand.Parse(args); err != nil { return nil } @@ -211,7 +211,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) - err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) + err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) if err != nil { return err } @@ -223,25 +223,6 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a return nil } -func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { - - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) - - if !applyBalancing { - return nil - } - - // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err := oneServerCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) - if err != nil { - return err - } - - // ask source node to delete the shard, and maybe the ecx file - return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) - -} - func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { for _, shardInfo := range ecNode.info.EcShardInfos { diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go new file mode 100644 index 000000000..0cbf694cd --- /dev/null +++ b/weed/shell/command_ec_common.go @@ -0,0 +1,198 @@ +package shell + +import ( + "context" + "fmt" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func moveMountedShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { + + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + + if !applyBalancing { + return nil + } + + // ask destination node to copy shard and the ecx file from source node, and mount it + copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) + if err != nil { + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + // ask source node to delete the shard, and maybe the ecx file + return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + +} + +func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, + targetServer *EcNode, startFromShardId uint32, shardCount int, + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { + + var shardIdsToCopy []uint32 + for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { + shardIdsToCopy = append(shardIdsToCopy, shardId) + } + fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + if targetServer.info.Id != existingLocation { + + fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + CopyEcxFile: true, + SourceDataNode: existingLocation, + }) + if copyErr != nil { + return copyErr + } + } + + fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id) + _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + }) + if mountErr != nil { + return mountErr + } + + if targetServer.info.Id != existingLocation { + copiedShardIds = shardIdsToCopy + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) + } + + return nil + }) + + if err != nil { + return + } + + return +} + +func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { + for _, dc := range topo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + fn(dn) + } + } + } +} + +func sortEcNodes(ecNodes []*EcNode) { + sort.Slice(ecNodes, func(i, j int) bool { + return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + }) +} + +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { + for _, ecShardInfo := range ecShardInfos { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + count += shardBits.ShardIdCount() + } + return +} + +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { + return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +} + +type EcNode struct { + info *master_pb.DataNodeInfo + freeEcSlot int +} + +func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + + // list all possible locations + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return nil, 0, err + } + + // find out all volume servers with one slot left. + eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { + if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { + ecNodes = append(ecNodes, &EcNode{ + info: dn, + freeEcSlot: int(freeEcSlots), + }) + totalFreeEcSlots += freeEcSlots + } + }) + + sortEcNodes(ecNodes) + + return +} + +func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { + + fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeDeletedShardIds, + }) + return deleteErr + }) + +} + +func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { + + fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ + VolumeId: uint32(volumeId), + ShardIds: toBeUnmountedhardIds, + }) + return deleteErr + }) +} + +func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { + + fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeMountedhardIds, + }) + return mountErr + }) +} diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 14d1ae96b..94265a874 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -5,11 +5,9 @@ import ( "flag" "fmt" "io" - "sort" "sync" "time" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -73,6 +71,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr if err != nil { return err } + fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { return err @@ -96,9 +95,9 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err) } return nil @@ -118,7 +117,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { @@ -139,13 +138,19 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl // ask the data nodes to copy from the source volume server copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { - return nil + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + if err != nil { + return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { - return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) + return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume @@ -176,7 +181,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -201,81 +206,12 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia return } -func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { - - var shardIdsToCopy []uint32 - for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) - shardIdsToCopy = append(shardIdsToCopy, shardId) - } - - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - - if targetServer.info.Id != existingLocation { - - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation, - }) - if copyErr != nil { - return copyErr - } - } - - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - }) - if mountErr != nil { - return mountErr - } - - if targetServer.info.Id != existingLocation { - copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) - } - - return nil - }) - - if err != nil { - return - } - - return -} - -func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { - - shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ - VolumeId: uint32(volumeId), - ShardIds: toBeDeletedShardIds, - ShouldDeleteEcx: shouldDeleteEcx, - }) - return deleteErr - }) - -} - func balancedEcDistribution(servers []*EcNode) (allocated []int) { - freeSlots := make([]int, len(servers)) allocated = make([]int, len(servers)) - for i, server := range servers { - freeSlots[i] = countFreeShardSlots(server.info) - } allocatedCount := 0 for allocatedCount < erasure_coding.TotalShardsCount { - for i, _ := range servers { - if freeSlots[i]-allocated[i] > 0 { + for i, server := range servers { + if server.freeEcSlot-allocated[i] > 0 { allocated[i] += 1 allocatedCount += 1 } @@ -288,67 +224,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { - for _, dc := range topo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - fn(dn) - } - } - } -} - -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { - for _, ecShardInfo := range ecShardInfos { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() - } - return -} - -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) -} - -type EcNode struct { - info *master_pb.DataNodeInfo - freeEcSlot int -} - -func sortEcNodes(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot - }) -} - -func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - - // list all possible locations - var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - return err - }) - if err != nil { - return nil, 0, err - } - - // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { - if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { - ecNodes = append(ecNodes, &EcNode{ - info: dn, - freeEcSlot: int(freeEcSlots), - }) - totalFreeEcSlots += freeEcSlots - } - }) - - sortEcNodes(ecNodes) - - return -} - func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse @@ -360,9 +235,11 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, se return } - quietSeconds := int64((quietPeriod * time.Second).Seconds()) + quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() + fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + vidMap := make(map[uint32]bool) for _, dc := range resp.TopologyInfo.DataCenterInfos { for _, r := range dc.RackInfos { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go new file mode 100644 index 000000000..479b51484 --- /dev/null +++ b/weed/shell/command_ec_rebuild.go @@ -0,0 +1,272 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + commands = append(commands, &commandEcRebuild{}) +} + +type commandEcRebuild struct { +} + +func (c *commandEcRebuild) Name() string { + return "ec.rebuild" +} + +func (c *commandEcRebuild) Help() string { + return `find and rebuild missing ec shards among volume servers + + ec.rebuild [-c EACH_COLLECTION|] [-f] + + Algorithm: + + For each type of volume server (different max volume count limit){ + for each collection { + rebuildEcVolumes() + } + } + + func rebuildEcVolumes(){ + idealWritableVolumes = totalWritableVolumes / numVolumeServers + for { + sort all volume servers ordered by the number of local writable volumes + pick the volume server A with the lowest number of writable volumes x + pick the volume server B with the highest number of writable volumes y + if y > idealWritableVolumes and x +1 <= idealWritableVolumes { + if B has a writable volume id v that A does not have { + move writable volume v from A to B + } + } + } + } + +` +} + +func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + applyChanges := fixCommand.Bool("force", false, "apply the changes") + if err = fixCommand.Parse(args); err != nil { + return nil + } + + // collect all ec nodes + allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv) + if err != nil { + return err + } + + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, false, true) + if err != nil { + return err + } + fmt.Printf("rebuildEcVolumes collections %+v\n", len(collections)) + for _, c := range collections { + fmt.Printf("rebuildEcVolumes collection %+v\n", c) + if err = rebuildEcVolumes(commandEnv, allEcNodes, c, writer, *applyChanges); err != nil { + return err + } + } + } else { + if err = rebuildEcVolumes(commandEnv, allEcNodes, *collection, writer, *applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { + + ctx := context.Background() + + fmt.Printf("rebuildEcVolumes %s\n", collection) + + // collect vid => each shard locations, similar to ecShardMap in topology.go + ecShardMap := make(EcShardMap) + for _, ecNode := range allEcNodes { + ecShardMap.registerEcNode(ecNode, collection) + } + + for vid, locations := range ecShardMap { + shardCount := locations.shardCount() + if shardCount == erasure_coding.TotalShardsCount { + continue + } + if shardCount < erasure_coding.DataShardsCount { + return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) + } + + sortEcNodes(allEcNodes) + + if allEcNodes[0].freeEcSlot < erasure_coding.TotalShardsCount { + return fmt.Errorf("disk space is not enough") + } + + if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildOneEcVolume(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { + + fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) + + // collect shard files to rebuilder local disk + var generatedShardIds []uint32 + copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + if err != nil { + return err + } + defer func() { + // clean up working files + + // ask the rebuilder to delete the copied shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + if err != nil { + fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) + } + + // ask the rebuilder to delete the copied shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + if err != nil { + fmt.Fprintf(writer, "%s delete generated ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, generatedShardIds) + } + + }() + + if !applyChanges { + return nil + } + + // generate ec shards, and maybe ecx file, and mount them + generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + if err != nil { + return err + } + + // mount the generated shards + err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + if err != nil { + return err + } + + return nil +} + +func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, + collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { + + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + if rebultErr == nil { + rebuiltShardIds = resp.RebuiltShardIds + } + return rebultErr + }) + return +} + +func prepareDataToRecover(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { + + needEcxFile := true + var localShardBits erasure_coding.ShardBits + for _, ecShardInfo := range rebuilder.info.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + needEcxFile = false + localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + for shardId, ecNodes := range locations { + + if len(ecNodes) == 0 { + fmt.Fprintf(writer, "missing shard %d.%d\n", volumeId, shardId) + continue + } + + if localShardBits.HasShardId(erasure_coding.ShardId(shardId)){ + localShardIds = append(localShardIds, uint32(shardId)) + fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) + continue + } + + var copyErr error + if applyBalancing{ + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: []uint32{uint32(shardId)}, + CopyEcxFile: needEcxFile, + SourceDataNode: ecNodes[0].info.Id, + }) + return copyErr + }) + if copyErr == nil && needEcxFile { + needEcxFile = false + } + } + if copyErr != nil { + fmt.Fprintf(writer, "%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) + } else { + fmt.Fprintf(writer, "%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) + copiedShardIds = append(copiedShardIds, uint32(shardId)) + } + + } + + if len(copiedShardIds)+len(localShardIds) >= erasure_coding.DataShardsCount { + return copiedShardIds, localShardIds, nil + } + + return nil, nil, fmt.Errorf("%d shards are not enough to recover volume %d", len(copiedShardIds)+len(localShardIds), volumeId) + +} + +type EcShardMap map[needle.VolumeId]EcShardLocations +type EcShardLocations [][]*EcNode + +func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { + for _, shardInfo := range ecNode.info.EcShardInfos { + if shardInfo.Collection == collection { + existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] + if !found { + existing = make([][]*EcNode, erasure_coding.TotalShardsCount) + ecShardMap[needle.VolumeId(shardInfo.Id)] = existing + } + for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + existing[shardId] = append(existing[shardId], ecNode) + } + } + } +} + +func (ecShardLocations EcShardLocations) shardCount() (count int) { + for _, locations := range ecShardLocations { + if len(locations) > 0 { + count++ + } + } + return +} diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 2a5731333..9108fccaa 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -62,9 +62,9 @@ func (c *commandVolumeBalance) Help() string { func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - collection := balanceCommand.String("c", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") - applyBalancing := balanceCommand.Bool("f", false, "apply the balancing plan.") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.") if err = balanceCommand.Parse(args); err != nil { return nil } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 1cbb2c380..13797ed4b 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -61,6 +61,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) } l.ecVolumesLock.Lock() + defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid) @@ -70,7 +71,6 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard l.ecVolumes[vid] = ecVolume } ecVolume.AddEcVolumeShard(ecVolumeShard) - l.ecVolumesLock.Unlock() return nil } diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index da0cfcde8..090e6e075 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/klauspost/reedsolomon" ) @@ -53,6 +54,10 @@ func WriteEcFiles(baseFileName string) error { return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) } +func RebuildEcFiles(baseFileName string) ([]uint32, error) { + return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) +} + func ToExt(ecIndex int) string { return fmt.Sprintf(".ec%02d", ecIndex) } @@ -75,6 +80,37 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, return nil } +func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) { + + shardHasData := make([]bool, TotalShardsCount) + inputFiles := make([]*os.File, TotalShardsCount) + outputFiles := make([]*os.File, TotalShardsCount) + for shardId := 0; shardId < TotalShardsCount; shardId++ { + shardFileName := baseFileName + ToExt(shardId) + if util.FileExists(shardFileName) { + shardHasData[shardId] = true + inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer inputFiles[shardId].Close() + } else { + outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + defer outputFiles[shardId].Close() + generatedShardIds = append(generatedShardIds, uint32(shardId)) + } + } + + err = rebuildEcFiles(shardHasData, inputFiles, outputFiles) + if err != nil { + return nil, fmt.Errorf("rebuildEcFiles: %v", err) + } + return +} + func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error { bufferSize := int64(len(buffers[0])) @@ -150,20 +186,25 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo } func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error { + var processedSize int64 + enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount) if err != nil { return fmt.Errorf("failed to create encoder: %v", err) } + buffers := make([][]byte, TotalShardsCount) + for i, _ := range buffers { + buffers[i] = make([]byte, bufferSize) + } + outputs, err := openEcFiles(baseFileName, false) defer closeEcFiles(outputs) if err != nil { return fmt.Errorf("failed to open dat file: %v", err) } - for i, _ := range buffers { - buffers[i] = make([]byte, bufferSize) - } + for remainingSize > largeBlockSize*DataShardsCount { err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs) if err != nil { @@ -183,6 +224,64 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi return nil } +func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error { + + enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount) + if err != nil { + return fmt.Errorf("failed to create encoder: %v", err) + } + + buffers := make([][]byte, TotalShardsCount) + for i, _ := range buffers { + if shardHasData[i] { + buffers[i] = make([]byte, ErasureCodingSmallBlockSize) + } + } + + var startOffset int64 + var inputBufferDataSize int + for { + + // read the input data from files + for i := 0; i < TotalShardsCount; i++ { + if shardHasData[i] { + n, _ := inputFiles[i].ReadAt(buffers[i], startOffset) + if n == 0 { + return nil + } + if inputBufferDataSize != 0 { + inputBufferDataSize = n + } + if inputBufferDataSize != n { + return fmt.Errorf("ec shard size need to be the same") + } + } else { + buffers[i] = nil + } + } + + fmt.Printf("reconstructing [%d,%d)\n", startOffset, startOffset+int64(inputBufferDataSize)) + + // encode the data + err = enc.Reconstruct(buffers) + if err != nil { + return fmt.Errorf("reconstruct: %v", err) + } + + // write the data to output files + for i := 0; i < TotalShardsCount; i++ { + if !shardHasData[i] { + n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset) + if inputBufferDataSize != n { + return fmt.Errorf("fail to write to %s", outputFiles[i].Name()) + } + } + } + startOffset += int64(inputBufferDataSize) + } + +} + func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) { indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644) if err != nil { diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 687a582a7..9eeb35725 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -57,6 +57,14 @@ func EcShardFileName(collection string, dir string, id int) (fileName string) { return } +func EcShardBaseFileName(collection string, id int) (baseFileName string) { + baseFileName = strconv.Itoa(id) + if collection != "" { + baseFileName = collection+"_"+baseFileName + } + return +} + func (shard *EcVolumeShard) Close() { if shard.ecdFile != nil { _ = shard.ecdFile.Close() diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0c9f674d5..1fddb8285 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -45,6 +45,8 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er EcIndexBits: uint32(shardBits.AddShardId(shardId)), } return nil + } else { + return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err) } } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 9e6f52a72..44e96015e 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -47,7 +47,7 @@ func (v *Volume) String() string { return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) } -func VolumeFileName(collection string, dir string, id int) (fileName string) { +func VolumeFileName(dir string, collection string, id int) (fileName string) { idString := strconv.Itoa(id) if collection == "" { fileName = path.Join(dir, idString) @@ -57,7 +57,7 @@ func VolumeFileName(collection string, dir string, id int) (fileName string) { return } func (v *Volume) FileName() (fileName string) { - return VolumeFileName(v.Collection, v.dir, int(v.Id)) + return VolumeFileName(v.dir, v.Collection, int(v.Id)) } func (v *Volume) DataFile() *os.File { return v.dataFile diff --git a/weed/util/file_util.go b/weed/util/file_util.go index 8ff2978ba..78add6724 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -30,3 +30,13 @@ func GetFileSize(file *os.File) (size int64, err error) { } return } + +func FileExists(filename string) bool { + + _, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return true + +}