Browse Source

weed volume: add grpc operation to relicate a volume to local

pull/902/head
Chris Lu 6 years ago
parent
commit
95e0520182
  1. 4
      weed/operation/sync_volume.go
  2. 60
      weed/pb/volume_server.proto
  3. 536
      weed/pb/volume_server_pb/volume_server.pb.go
  4. 8
      weed/server/volume_grpc_admin.go
  5. 155
      weed/server/volume_grpc_replicate.go
  6. 20
      weed/server/volume_grpc_sync.go
  7. 22
      weed/server/volume_grpc_vacuum.go
  8. 2
      weed/shell/shell_liner.go
  9. 4
      weed/storage/store.go
  10. 13
      weed/storage/volume.go
  11. 2
      weed/storage/volume_sync.go
  12. 2
      weed/topology/allocate_volume.go
  13. 8
      weed/topology/topology_vacuum.go

4
weed/operation/sync_volume.go

@ -15,7 +15,7 @@ func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint
WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
VolumdId: vid,
VolumeId: vid,
})
return nil
})
@ -27,7 +27,7 @@ func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint
return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
VolumeId: vid,
})
if err != nil {
return err

60
weed/pb/volume_server.proto

@ -36,7 +36,12 @@ service VolumeServer {
rpc VolumeDelete (VolumeDeleteRequest) returns (VolumeDeleteResponse) {
}
// rpc VolumeUiPage (VolumeUiPageRequest) returns (VolumeUiPageResponse) {}
rpc ReplicateVolume (ReplicateVolumeRequest) returns (ReplicateVolumeResponse) {
}
rpc ReadVolumeFileStatus (ReadVolumeFileStatusRequest) returns (ReadVolumeFileStatusResponse) {
}
rpc CopyFile (CopyFileRequest) returns (stream CopyFileResponse) {
}
}
@ -60,27 +65,27 @@ message Empty {
}
message VacuumVolumeCheckRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VacuumVolumeCheckResponse {
double garbage_ratio = 1;
}
message VacuumVolumeCompactRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
int64 preallocate = 2;
}
message VacuumVolumeCompactResponse {
}
message VacuumVolumeCommitRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VacuumVolumeCommitResponse {
}
message VacuumVolumeCleanupRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VacuumVolumeCleanupResponse {
}
@ -92,7 +97,7 @@ message DeleteCollectionResponse {
}
message AssignVolumeRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
string collection = 2;
int64 preallocate = 3;
string replication = 4;
@ -102,10 +107,10 @@ message AssignVolumeResponse {
}
message VolumeSyncStatusRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VolumeSyncStatusResponse {
uint32 volumd_id = 1;
uint32 volume_id = 1;
string collection = 2;
string replication = 4;
string ttl = 5;
@ -115,14 +120,14 @@ message VolumeSyncStatusResponse {
}
message VolumeSyncIndexRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VolumeSyncIndexResponse {
bytes index_file_content = 1;
}
message VolumeSyncDataRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
uint32 revision = 2;
uint32 offset = 3;
uint32 size = 4;
@ -133,26 +138,51 @@ message VolumeSyncDataResponse {
}
message VolumeMountRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VolumeMountResponse {
}
message VolumeUnmountRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VolumeUnmountResponse {
}
message VolumeDeleteRequest {
uint32 volumd_id = 1;
uint32 volume_id = 1;
}
message VolumeDeleteResponse {
}
message VolumeUiPageRequest {
message ReplicateVolumeRequest {
uint32 volume_id = 1;
string collection = 2;
string replication = 3;
string ttl = 4;
string source_data_node = 5;
}
message ReplicateVolumeResponse {
}
message CopyFileRequest {
uint32 volume_id = 1;
bool is_idx_file = 2;
bool is_dat_file = 3;
}
message CopyFileResponse {
bytes file_content = 1;
}
message ReadVolumeFileStatusRequest {
uint32 volume_id = 1;
}
message VolumeUiPageResponse {
message ReadVolumeFileStatusResponse {
uint32 volume_id = 1;
uint64 idx_file_timestamp = 2;
uint64 idx_file_size = 3;
uint64 dat_file_timestamp = 4;
uint64 dat_file_size = 5;
}
message DiskStatus {

536
weed/pb/volume_server_pb/volume_server.pb.go

@ -37,8 +37,12 @@ It has these top-level messages:
VolumeUnmountResponse
VolumeDeleteRequest
VolumeDeleteResponse
VolumeUiPageRequest
VolumeUiPageResponse
ReplicateVolumeRequest
ReplicateVolumeResponse
CopyFileRequest
CopyFileResponse
ReadVolumeFileStatusRequest
ReadVolumeFileStatusResponse
DiskStatus
MemStatus
*/
@ -145,7 +149,7 @@ func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
type VacuumVolumeCheckRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VacuumVolumeCheckRequest) Reset() { *m = VacuumVolumeCheckRequest{} }
@ -153,9 +157,9 @@ func (m *VacuumVolumeCheckRequest) String() string { return proto.Com
func (*VacuumVolumeCheckRequest) ProtoMessage() {}
func (*VacuumVolumeCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *VacuumVolumeCheckRequest) GetVolumdId() uint32 {
func (m *VacuumVolumeCheckRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -177,7 +181,7 @@ func (m *VacuumVolumeCheckResponse) GetGarbageRatio() float64 {
}
type VacuumVolumeCompactRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
Preallocate int64 `protobuf:"varint,2,opt,name=preallocate" json:"preallocate,omitempty"`
}
@ -186,9 +190,9 @@ func (m *VacuumVolumeCompactRequest) String() string { return proto.C
func (*VacuumVolumeCompactRequest) ProtoMessage() {}
func (*VacuumVolumeCompactRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *VacuumVolumeCompactRequest) GetVolumdId() uint32 {
func (m *VacuumVolumeCompactRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -209,7 +213,7 @@ func (*VacuumVolumeCompactResponse) ProtoMessage() {}
func (*VacuumVolumeCompactResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
type VacuumVolumeCommitRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VacuumVolumeCommitRequest) Reset() { *m = VacuumVolumeCommitRequest{} }
@ -217,9 +221,9 @@ func (m *VacuumVolumeCommitRequest) String() string { return proto.Co
func (*VacuumVolumeCommitRequest) ProtoMessage() {}
func (*VacuumVolumeCommitRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *VacuumVolumeCommitRequest) GetVolumdId() uint32 {
func (m *VacuumVolumeCommitRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -233,7 +237,7 @@ func (*VacuumVolumeCommitResponse) ProtoMessage() {}
func (*VacuumVolumeCommitResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
type VacuumVolumeCleanupRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VacuumVolumeCleanupRequest) Reset() { *m = VacuumVolumeCleanupRequest{} }
@ -241,9 +245,9 @@ func (m *VacuumVolumeCleanupRequest) String() string { return proto.C
func (*VacuumVolumeCleanupRequest) ProtoMessage() {}
func (*VacuumVolumeCleanupRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *VacuumVolumeCleanupRequest) GetVolumdId() uint32 {
func (m *VacuumVolumeCleanupRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -281,7 +285,7 @@ func (*DeleteCollectionResponse) ProtoMessage() {}
func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
type AssignVolumeRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"`
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
@ -293,9 +297,9 @@ func (m *AssignVolumeRequest) String() string { return proto.CompactT
func (*AssignVolumeRequest) ProtoMessage() {}
func (*AssignVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} }
func (m *AssignVolumeRequest) GetVolumdId() uint32 {
func (m *AssignVolumeRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -337,7 +341,7 @@ func (*AssignVolumeResponse) ProtoMessage() {}
func (*AssignVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} }
type VolumeSyncStatusRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VolumeSyncStatusRequest) Reset() { *m = VolumeSyncStatusRequest{} }
@ -345,15 +349,15 @@ func (m *VolumeSyncStatusRequest) String() string { return proto.Comp
func (*VolumeSyncStatusRequest) ProtoMessage() {}
func (*VolumeSyncStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} }
func (m *VolumeSyncStatusRequest) GetVolumdId() uint32 {
func (m *VolumeSyncStatusRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
type VolumeSyncStatusResponse struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"`
@ -367,9 +371,9 @@ func (m *VolumeSyncStatusResponse) String() string { return proto.Com
func (*VolumeSyncStatusResponse) ProtoMessage() {}
func (*VolumeSyncStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} }
func (m *VolumeSyncStatusResponse) GetVolumdId() uint32 {
func (m *VolumeSyncStatusResponse) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -417,7 +421,7 @@ func (m *VolumeSyncStatusResponse) GetIdxFileSize() uint64 {
}
type VolumeSyncIndexRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VolumeSyncIndexRequest) Reset() { *m = VolumeSyncIndexRequest{} }
@ -425,9 +429,9 @@ func (m *VolumeSyncIndexRequest) String() string { return proto.Compa
func (*VolumeSyncIndexRequest) ProtoMessage() {}
func (*VolumeSyncIndexRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
func (m *VolumeSyncIndexRequest) GetVolumdId() uint32 {
func (m *VolumeSyncIndexRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -449,7 +453,7 @@ func (m *VolumeSyncIndexResponse) GetIndexFileContent() []byte {
}
type VolumeSyncDataRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
Revision uint32 `protobuf:"varint,2,opt,name=revision" json:"revision,omitempty"`
Offset uint32 `protobuf:"varint,3,opt,name=offset" json:"offset,omitempty"`
Size uint32 `protobuf:"varint,4,opt,name=size" json:"size,omitempty"`
@ -461,9 +465,9 @@ func (m *VolumeSyncDataRequest) String() string { return proto.Compac
func (*VolumeSyncDataRequest) ProtoMessage() {}
func (*VolumeSyncDataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} }
func (m *VolumeSyncDataRequest) GetVolumdId() uint32 {
func (m *VolumeSyncDataRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -513,7 +517,7 @@ func (m *VolumeSyncDataResponse) GetFileContent() []byte {
}
type VolumeMountRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VolumeMountRequest) Reset() { *m = VolumeMountRequest{} }
@ -521,9 +525,9 @@ func (m *VolumeMountRequest) String() string { return proto.CompactTe
func (*VolumeMountRequest) ProtoMessage() {}
func (*VolumeMountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} }
func (m *VolumeMountRequest) GetVolumdId() uint32 {
func (m *VolumeMountRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -537,7 +541,7 @@ func (*VolumeMountResponse) ProtoMessage() {}
func (*VolumeMountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} }
type VolumeUnmountRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VolumeUnmountRequest) Reset() { *m = VolumeUnmountRequest{} }
@ -545,9 +549,9 @@ func (m *VolumeUnmountRequest) String() string { return proto.Compact
func (*VolumeUnmountRequest) ProtoMessage() {}
func (*VolumeUnmountRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} }
func (m *VolumeUnmountRequest) GetVolumdId() uint32 {
func (m *VolumeUnmountRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -561,7 +565,7 @@ func (*VolumeUnmountResponse) ProtoMessage() {}
func (*VolumeUnmountResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} }
type VolumeDeleteRequest struct {
VolumdId uint32 `protobuf:"varint,1,opt,name=volumd_id,json=volumdId" json:"volumd_id,omitempty"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *VolumeDeleteRequest) Reset() { *m = VolumeDeleteRequest{} }
@ -569,9 +573,9 @@ func (m *VolumeDeleteRequest) String() string { return proto.CompactT
func (*VolumeDeleteRequest) ProtoMessage() {}
func (*VolumeDeleteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} }
func (m *VolumeDeleteRequest) GetVolumdId() uint32 {
func (m *VolumeDeleteRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumdId
return m.VolumeId
}
return 0
}
@ -584,21 +588,173 @@ func (m *VolumeDeleteResponse) String() string { return proto.Compact
func (*VolumeDeleteResponse) ProtoMessage() {}
func (*VolumeDeleteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27} }
type VolumeUiPageRequest struct {
type ReplicateVolumeRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
Replication string `protobuf:"bytes,3,opt,name=replication" json:"replication,omitempty"`
Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"`
SourceDataNode string `protobuf:"bytes,5,opt,name=source_data_node,json=sourceDataNode" json:"source_data_node,omitempty"`
}
func (m *VolumeUiPageRequest) Reset() { *m = VolumeUiPageRequest{} }
func (m *VolumeUiPageRequest) String() string { return proto.CompactTextString(m) }
func (*VolumeUiPageRequest) ProtoMessage() {}
func (*VolumeUiPageRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} }
func (m *ReplicateVolumeRequest) Reset() { *m = ReplicateVolumeRequest{} }
func (m *ReplicateVolumeRequest) String() string { return proto.CompactTextString(m) }
func (*ReplicateVolumeRequest) ProtoMessage() {}
func (*ReplicateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} }
type VolumeUiPageResponse struct {
func (m *ReplicateVolumeRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumeId
}
return 0
}
func (m *ReplicateVolumeRequest) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
func (m *ReplicateVolumeRequest) GetReplication() string {
if m != nil {
return m.Replication
}
return ""
}
func (m *ReplicateVolumeRequest) GetTtl() string {
if m != nil {
return m.Ttl
}
return ""
}
func (m *ReplicateVolumeRequest) GetSourceDataNode() string {
if m != nil {
return m.SourceDataNode
}
return ""
}
type ReplicateVolumeResponse struct {
}
func (m *ReplicateVolumeResponse) Reset() { *m = ReplicateVolumeResponse{} }
func (m *ReplicateVolumeResponse) String() string { return proto.CompactTextString(m) }
func (*ReplicateVolumeResponse) ProtoMessage() {}
func (*ReplicateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{29} }
type CopyFileRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
IsIdxFile bool `protobuf:"varint,2,opt,name=is_idx_file,json=isIdxFile" json:"is_idx_file,omitempty"`
IsDatFile bool `protobuf:"varint,3,opt,name=is_dat_file,json=isDatFile" json:"is_dat_file,omitempty"`
}
func (m *CopyFileRequest) Reset() { *m = CopyFileRequest{} }
func (m *CopyFileRequest) String() string { return proto.CompactTextString(m) }
func (*CopyFileRequest) ProtoMessage() {}
func (*CopyFileRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} }
func (m *CopyFileRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumeId
}
return 0
}
func (m *CopyFileRequest) GetIsIdxFile() bool {
if m != nil {
return m.IsIdxFile
}
return false
}
func (m *CopyFileRequest) GetIsDatFile() bool {
if m != nil {
return m.IsDatFile
}
return false
}
type CopyFileResponse struct {
FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"`
}
func (m *CopyFileResponse) Reset() { *m = CopyFileResponse{} }
func (m *CopyFileResponse) String() string { return proto.CompactTextString(m) }
func (*CopyFileResponse) ProtoMessage() {}
func (*CopyFileResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} }
func (m *CopyFileResponse) GetFileContent() []byte {
if m != nil {
return m.FileContent
}
return nil
}
type ReadVolumeFileStatusRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
}
func (m *ReadVolumeFileStatusRequest) Reset() { *m = ReadVolumeFileStatusRequest{} }
func (m *ReadVolumeFileStatusRequest) String() string { return proto.CompactTextString(m) }
func (*ReadVolumeFileStatusRequest) ProtoMessage() {}
func (*ReadVolumeFileStatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{32} }
func (m *ReadVolumeFileStatusRequest) GetVolumeId() uint32 {
if m != nil {
return m.VolumeId
}
return 0
}
type ReadVolumeFileStatusResponse struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
IdxFileTimestamp uint64 `protobuf:"varint,2,opt,name=idx_file_timestamp,json=idxFileTimestamp" json:"idx_file_timestamp,omitempty"`
IdxFileSize uint64 `protobuf:"varint,3,opt,name=idx_file_size,json=idxFileSize" json:"idx_file_size,omitempty"`
DatFileTimestamp uint64 `protobuf:"varint,4,opt,name=dat_file_timestamp,json=datFileTimestamp" json:"dat_file_timestamp,omitempty"`
DatFileSize uint64 `protobuf:"varint,5,opt,name=dat_file_size,json=datFileSize" json:"dat_file_size,omitempty"`
}
func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} }
func (m *ReadVolumeFileStatusResponse) String() string { return proto.CompactTextString(m) }
func (*ReadVolumeFileStatusResponse) ProtoMessage() {}
func (*ReadVolumeFileStatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{33} }
func (m *ReadVolumeFileStatusResponse) GetVolumeId() uint32 {
if m != nil {
return m.VolumeId
}
return 0
}
func (m *ReadVolumeFileStatusResponse) GetIdxFileTimestamp() uint64 {
if m != nil {
return m.IdxFileTimestamp
}
return 0
}
func (m *ReadVolumeFileStatusResponse) GetIdxFileSize() uint64 {
if m != nil {
return m.IdxFileSize
}
return 0
}
func (m *ReadVolumeFileStatusResponse) GetDatFileTimestamp() uint64 {
if m != nil {
return m.DatFileTimestamp
}
return 0
}
func (m *VolumeUiPageResponse) Reset() { *m = VolumeUiPageResponse{} }
func (m *VolumeUiPageResponse) String() string { return proto.CompactTextString(m) }
func (*VolumeUiPageResponse) ProtoMessage() {}
func (*VolumeUiPageResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{29} }
func (m *ReadVolumeFileStatusResponse) GetDatFileSize() uint64 {
if m != nil {
return m.DatFileSize
}
return 0
}
type DiskStatus struct {
Dir string `protobuf:"bytes,1,opt,name=dir" json:"dir,omitempty"`
@ -610,7 +766,7 @@ type DiskStatus struct {
func (m *DiskStatus) Reset() { *m = DiskStatus{} }
func (m *DiskStatus) String() string { return proto.CompactTextString(m) }
func (*DiskStatus) ProtoMessage() {}
func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{30} }
func (*DiskStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{34} }
func (m *DiskStatus) GetDir() string {
if m != nil {
@ -653,7 +809,7 @@ type MemStatus struct {
func (m *MemStatus) Reset() { *m = MemStatus{} }
func (m *MemStatus) String() string { return proto.CompactTextString(m) }
func (*MemStatus) ProtoMessage() {}
func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{31} }
func (*MemStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{35} }
func (m *MemStatus) GetGoroutines() int32 {
if m != nil {
@ -733,8 +889,12 @@ func init() {
proto.RegisterType((*VolumeUnmountResponse)(nil), "volume_server_pb.VolumeUnmountResponse")
proto.RegisterType((*VolumeDeleteRequest)(nil), "volume_server_pb.VolumeDeleteRequest")
proto.RegisterType((*VolumeDeleteResponse)(nil), "volume_server_pb.VolumeDeleteResponse")
proto.RegisterType((*VolumeUiPageRequest)(nil), "volume_server_pb.VolumeUiPageRequest")
proto.RegisterType((*VolumeUiPageResponse)(nil), "volume_server_pb.VolumeUiPageResponse")
proto.RegisterType((*ReplicateVolumeRequest)(nil), "volume_server_pb.ReplicateVolumeRequest")
proto.RegisterType((*ReplicateVolumeResponse)(nil), "volume_server_pb.ReplicateVolumeResponse")
proto.RegisterType((*CopyFileRequest)(nil), "volume_server_pb.CopyFileRequest")
proto.RegisterType((*CopyFileResponse)(nil), "volume_server_pb.CopyFileResponse")
proto.RegisterType((*ReadVolumeFileStatusRequest)(nil), "volume_server_pb.ReadVolumeFileStatusRequest")
proto.RegisterType((*ReadVolumeFileStatusResponse)(nil), "volume_server_pb.ReadVolumeFileStatusResponse")
proto.RegisterType((*DiskStatus)(nil), "volume_server_pb.DiskStatus")
proto.RegisterType((*MemStatus)(nil), "volume_server_pb.MemStatus")
}
@ -764,6 +924,9 @@ type VolumeServerClient interface {
VolumeMount(ctx context.Context, in *VolumeMountRequest, opts ...grpc.CallOption) (*VolumeMountResponse, error)
VolumeUnmount(ctx context.Context, in *VolumeUnmountRequest, opts ...grpc.CallOption) (*VolumeUnmountResponse, error)
VolumeDelete(ctx context.Context, in *VolumeDeleteRequest, opts ...grpc.CallOption) (*VolumeDeleteResponse, error)
ReplicateVolume(ctx context.Context, in *ReplicateVolumeRequest, opts ...grpc.CallOption) (*ReplicateVolumeResponse, error)
ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error)
CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error)
}
type volumeServerClient struct {
@ -937,6 +1100,56 @@ func (c *volumeServerClient) VolumeDelete(ctx context.Context, in *VolumeDeleteR
return out, nil
}
func (c *volumeServerClient) ReplicateVolume(ctx context.Context, in *ReplicateVolumeRequest, opts ...grpc.CallOption) (*ReplicateVolumeResponse, error) {
out := new(ReplicateVolumeResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/ReplicateVolume", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error) {
out := new(ReadVolumeFileStatusResponse)
err := grpc.Invoke(ctx, "/volume_server_pb.VolumeServer/ReadVolumeFileStatus", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) CopyFile(ctx context.Context, in *CopyFileRequest, opts ...grpc.CallOption) (VolumeServer_CopyFileClient, error) {
stream, err := grpc.NewClientStream(ctx, &_VolumeServer_serviceDesc.Streams[2], c.cc, "/volume_server_pb.VolumeServer/CopyFile", opts...)
if err != nil {
return nil, err
}
x := &volumeServerCopyFileClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type VolumeServer_CopyFileClient interface {
Recv() (*CopyFileResponse, error)
grpc.ClientStream
}
type volumeServerCopyFileClient struct {
grpc.ClientStream
}
func (x *volumeServerCopyFileClient) Recv() (*CopyFileResponse, error) {
m := new(CopyFileResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for VolumeServer service
type VolumeServerServer interface {
@ -954,6 +1167,9 @@ type VolumeServerServer interface {
VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error)
VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error)
VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error)
ReplicateVolume(context.Context, *ReplicateVolumeRequest) (*ReplicateVolumeResponse, error)
ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error)
CopyFile(*CopyFileRequest, VolumeServer_CopyFileServer) error
}
func RegisterVolumeServerServer(s *grpc.Server, srv VolumeServerServer) {
@ -1200,6 +1416,63 @@ func _VolumeServer_VolumeDelete_Handler(srv interface{}, ctx context.Context, de
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_ReplicateVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReplicateVolumeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).ReplicateVolume(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/ReplicateVolume",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).ReplicateVolume(ctx, req.(*ReplicateVolumeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_ReadVolumeFileStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadVolumeFileStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).ReadVolumeFileStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/ReadVolumeFileStatus",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).ReadVolumeFileStatus(ctx, req.(*ReadVolumeFileStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_CopyFile_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(CopyFileRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(VolumeServerServer).CopyFile(m, &volumeServerCopyFileServer{stream})
}
type VolumeServer_CopyFileServer interface {
Send(*CopyFileResponse) error
grpc.ServerStream
}
type volumeServerCopyFileServer struct {
grpc.ServerStream
}
func (x *volumeServerCopyFileServer) Send(m *CopyFileResponse) error {
return x.ServerStream.SendMsg(m)
}
var _VolumeServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "volume_server_pb.VolumeServer",
HandlerType: (*VolumeServerServer)(nil),
@ -1248,6 +1521,14 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
MethodName: "VolumeDelete",
Handler: _VolumeServer_VolumeDelete_Handler,
},
{
MethodName: "ReplicateVolume",
Handler: _VolumeServer_ReplicateVolume_Handler,
},
{
MethodName: "ReadVolumeFileStatus",
Handler: _VolumeServer_ReadVolumeFileStatus_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@ -1260,6 +1541,11 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
Handler: _VolumeServer_VolumeSyncData_Handler,
ServerStreams: true,
},
{
StreamName: "CopyFile",
Handler: _VolumeServer_CopyFile_Handler,
ServerStreams: true,
},
},
Metadata: "volume_server.proto",
}
@ -1267,71 +1553,83 @@ var _VolumeServer_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("volume_server.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1044 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xa4, 0x57, 0xdd, 0x72, 0xdb, 0x44,
0x14, 0x8e, 0x6a, 0x3b, 0x76, 0x8e, 0x6d, 0x6a, 0xd6, 0x69, 0xa2, 0xaa, 0x10, 0x8c, 0x80, 0xd4,
0x69, 0x43, 0x80, 0x74, 0x80, 0x32, 0xdc, 0x00, 0x09, 0x30, 0xb9, 0xe8, 0x94, 0xd9, 0x4c, 0x3b,
0xcc, 0xd0, 0x19, 0x8f, 0x22, 0xad, 0x9d, 0x25, 0xb2, 0xe4, 0x6a, 0x57, 0x99, 0x94, 0x37, 0xe1,
0x9a, 0x1b, 0x9e, 0x8e, 0x17, 0xe0, 0x86, 0xd9, 0x1f, 0xd9, 0xfa, 0x73, 0x24, 0xe0, 0x6e, 0xf7,
0xec, 0x39, 0xdf, 0xf9, 0xd9, 0xa3, 0xf3, 0xad, 0x60, 0x78, 0x1d, 0xfa, 0xf1, 0x9c, 0x4c, 0x18,
0x89, 0xae, 0x49, 0x74, 0xb4, 0x88, 0x42, 0x1e, 0xa2, 0x41, 0x46, 0x38, 0x59, 0x5c, 0xd8, 0x9f,
0x00, 0xfa, 0xce, 0xe1, 0xee, 0xe5, 0x29, 0xf1, 0x09, 0x27, 0x98, 0xbc, 0x8e, 0x09, 0xe3, 0xe8,
0x3e, 0x74, 0xa6, 0xd4, 0x27, 0x13, 0xea, 0x31, 0xd3, 0x18, 0x35, 0xc6, 0x5b, 0xb8, 0x2d, 0xf6,
0x67, 0x1e, 0xb3, 0x9f, 0xc3, 0x30, 0x63, 0xc0, 0x16, 0x61, 0xc0, 0x08, 0x7a, 0x0a, 0xed, 0x88,
0xb0, 0xd8, 0xe7, 0xca, 0xa0, 0x7b, 0xbc, 0x77, 0x94, 0xf7, 0x75, 0xb4, 0x34, 0x89, 0x7d, 0x8e,
0x13, 0x75, 0x9b, 0x42, 0x2f, 0x7d, 0x80, 0x76, 0xa1, 0xad, 0x7d, 0x9b, 0xc6, 0xc8, 0x18, 0x6f,
0xe1, 0x4d, 0xe5, 0x1a, 0xed, 0xc0, 0x26, 0xe3, 0x0e, 0x8f, 0x99, 0x79, 0x67, 0x64, 0x8c, 0x5b,
0x58, 0xef, 0xd0, 0x36, 0xb4, 0x48, 0x14, 0x85, 0x91, 0xd9, 0x90, 0xea, 0x6a, 0x83, 0x10, 0x34,
0x19, 0xfd, 0x8d, 0x98, 0xcd, 0x91, 0x31, 0xee, 0x63, 0xb9, 0xb6, 0xdb, 0xd0, 0xfa, 0x7e, 0xbe,
0xe0, 0x6f, 0xec, 0x2f, 0xc1, 0x7c, 0xe9, 0xb8, 0x71, 0x3c, 0x7f, 0x29, 0x63, 0x3c, 0xb9, 0x24,
0xee, 0x55, 0x92, 0xfb, 0x03, 0xd8, 0x92, 0x91, 0x7b, 0x49, 0x04, 0x7d, 0xdc, 0x51, 0x82, 0x33,
0xcf, 0xfe, 0x06, 0xee, 0x97, 0x18, 0xea, 0x1a, 0x7c, 0x00, 0xfd, 0x99, 0x13, 0x5d, 0x38, 0x33,
0x32, 0x89, 0x1c, 0x4e, 0x43, 0x69, 0x6d, 0xe0, 0x9e, 0x16, 0x62, 0x21, 0xb3, 0x7f, 0x01, 0x2b,
0x83, 0x10, 0xce, 0x17, 0x8e, 0xcb, 0xeb, 0x38, 0x47, 0x23, 0xe8, 0x2e, 0x22, 0xe2, 0xf8, 0x7e,
0xe8, 0x3a, 0x9c, 0xc8, 0x2a, 0x34, 0x70, 0x5a, 0x64, 0xbf, 0x0b, 0x0f, 0x4a, 0xc1, 0x55, 0x80,
0xf6, 0xd3, 0x5c, 0xf4, 0xe1, 0x7c, 0x4e, 0x6b, 0xb9, 0xb6, 0xdf, 0x29, 0x44, 0x2d, 0x2d, 0x35,
0xee, 0x57, 0xb9, 0x53, 0x9f, 0x38, 0x41, 0xbc, 0xa8, 0x05, 0x9c, 0x8f, 0x38, 0x31, 0x5d, 0x22,
0xef, 0xaa, 0xe6, 0x38, 0x09, 0x7d, 0x9f, 0xb8, 0x9c, 0x86, 0x41, 0x02, 0xbb, 0x07, 0xe0, 0x2e,
0x85, 0xba, 0x55, 0x52, 0x12, 0xdb, 0x02, 0xb3, 0x68, 0xaa, 0x61, 0xff, 0x34, 0x60, 0xf8, 0x2d,
0x63, 0x74, 0x16, 0x28, 0xb7, 0xb5, 0xca, 0x9f, 0x75, 0x78, 0x27, 0xef, 0x30, 0x7f, 0x3d, 0x8d,
0xc2, 0xf5, 0x08, 0x8d, 0x88, 0x2c, 0x7c, 0xea, 0x3a, 0x12, 0xa2, 0x29, 0x21, 0xd2, 0x22, 0x34,
0x80, 0x06, 0xe7, 0xbe, 0xd9, 0x92, 0x27, 0x62, 0x69, 0xef, 0xc0, 0x76, 0x36, 0x52, 0x9d, 0xc2,
0x17, 0xb0, 0xab, 0x24, 0xe7, 0x6f, 0x02, 0xf7, 0x5c, 0x7e, 0x09, 0xb5, 0x0a, 0xfe, 0xb7, 0x01,
0x66, 0xd1, 0x50, 0x77, 0xf0, 0xff, 0xcd, 0xff, 0xdf, 0x66, 0x87, 0xde, 0x83, 0x2e, 0x77, 0xa8,
0x3f, 0x09, 0xa7, 0x53, 0x46, 0xb8, 0xb9, 0x39, 0x32, 0xc6, 0x4d, 0x0c, 0x42, 0xf4, 0x5c, 0x4a,
0xd0, 0x01, 0x0c, 0x5c, 0xd5, 0xc5, 0x93, 0x88, 0x5c, 0x53, 0x26, 0x90, 0xdb, 0x32, 0xb0, 0xbb,
0x6e, 0xd2, 0xdd, 0x4a, 0x8c, 0x6c, 0xe8, 0x53, 0xef, 0x66, 0x22, 0x87, 0x87, 0xfc, 0xf4, 0x3b,
0x12, 0xad, 0x4b, 0xbd, 0x9b, 0x1f, 0xa8, 0x4f, 0xce, 0xc5, 0x04, 0xf8, 0x1c, 0x76, 0x56, 0xc9,
0x9f, 0x05, 0x1e, 0xb9, 0xa9, 0x55, 0xb4, 0x1f, 0xd3, 0xc5, 0xd6, 0x66, 0xba, 0x64, 0x87, 0x80,
0xa8, 0x10, 0x28, 0xbf, 0x6e, 0x18, 0x70, 0x12, 0x70, 0x09, 0xd0, 0xc3, 0x03, 0x79, 0x22, 0x9c,
0x9f, 0x28, 0xb9, 0xfd, 0xbb, 0x01, 0xf7, 0x56, 0x48, 0xa7, 0x0e, 0x77, 0x6a, 0xb5, 0x9e, 0x05,
0x9d, 0x65, 0xf6, 0x77, 0xd4, 0x59, 0xb2, 0x17, 0x63, 0x51, 0x57, 0xaf, 0x21, 0x4f, 0xf4, 0xae,
0x6c, 0x00, 0x0a, 0x27, 0x01, 0x21, 0x9e, 0x9a, 0xae, 0xea, 0x1a, 0x3a, 0x4a, 0x70, 0xe6, 0xd9,
0x5f, 0xa7, 0x6b, 0xa3, 0x42, 0xd3, 0x39, 0xbe, 0x0f, 0xbd, 0x92, 0xec, 0xba, 0xd3, 0x54, 0x62,
0x9f, 0x01, 0x52, 0xc6, 0xcf, 0xc2, 0x38, 0xa8, 0x37, 0x53, 0xee, 0xc1, 0x30, 0x63, 0xa2, 0x1b,
0xfb, 0x09, 0x6c, 0x2b, 0xf1, 0x8b, 0x60, 0x5e, 0x1b, 0x6b, 0x37, 0x29, 0xeb, 0xd2, 0x48, 0xa3,
0x1d, 0x27, 0x4e, 0xb2, 0x04, 0x77, 0x2b, 0xd8, 0x4e, 0x12, 0x41, 0x96, 0xe3, 0x56, 0x01, 0xbf,
0xa0, 0x3f, 0x89, 0x79, 0xae, 0xb0, 0x56, 0xea, 0x89, 0x58, 0xab, 0xff, 0x0c, 0x70, 0x4a, 0xd9,
0x95, 0xfa, 0xc4, 0x44, 0xef, 0x7b, 0x34, 0xd2, 0x73, 0x4a, 0x2c, 0x85, 0xc4, 0xf1, 0x7d, 0x79,
0x9f, 0x4d, 0x2c, 0x96, 0xe2, 0xca, 0x62, 0x46, 0x3c, 0x79, 0x91, 0x4d, 0x2c, 0xd7, 0x42, 0x36,
0x8d, 0x88, 0xba, 0xc6, 0x26, 0x96, 0x6b, 0xfb, 0x0f, 0x03, 0xb6, 0x9e, 0x91, 0xb9, 0x46, 0xde,
0x03, 0x98, 0x85, 0x51, 0x18, 0x73, 0x1a, 0x10, 0x26, 0x1d, 0xb4, 0x70, 0x4a, 0xf2, 0xdf, 0xfd,
0xc8, 0x16, 0x22, 0xfe, 0x54, 0x76, 0x4a, 0x13, 0xcb, 0xb5, 0x90, 0x5d, 0x12, 0x67, 0xa1, 0x3f,
0x55, 0xb9, 0x16, 0x0c, 0xcc, 0xb8, 0xe3, 0x5e, 0xc9, 0x2f, 0xb3, 0x89, 0xd5, 0xe6, 0xf8, 0x2f,
0x80, 0x9e, 0x6e, 0x28, 0xf9, 0x04, 0x40, 0xaf, 0xa0, 0x9b, 0x7a, 0x3a, 0xa0, 0x0f, 0x8b, 0x2f,
0x84, 0xe2, 0x53, 0xc4, 0xfa, 0xa8, 0x42, 0x4b, 0x17, 0x7b, 0x03, 0x05, 0xf0, 0x76, 0x81, 0x9a,
0xd1, 0xa3, 0xa2, 0xf5, 0x3a, 0xe2, 0xb7, 0x1e, 0xd7, 0xd2, 0x5d, 0xfa, 0xe3, 0x30, 0x2c, 0xe1,
0x5a, 0x74, 0x58, 0x81, 0x92, 0xe1, 0x7b, 0xeb, 0xe3, 0x9a, 0xda, 0x4b, 0xaf, 0xaf, 0x01, 0x15,
0x89, 0x18, 0x3d, 0xae, 0x84, 0x59, 0x11, 0xbd, 0x75, 0x58, 0x4f, 0x79, 0x6d, 0xa2, 0x8a, 0xa2,
0x2b, 0x13, 0xcd, 0x3c, 0x02, 0x2a, 0x13, 0xcd, 0xf1, 0xfe, 0x06, 0xba, 0x82, 0x41, 0x9e, 0xbe,
0xd1, 0xc1, 0xba, 0x37, 0x65, 0xe1, 0x75, 0x60, 0x3d, 0xaa, 0xa3, 0xba, 0x74, 0x36, 0x81, 0x5e,
0x9a, 0x64, 0x51, 0x49, 0xd3, 0x95, 0x3c, 0x17, 0xac, 0xfd, 0x2a, 0xb5, 0x74, 0x36, 0x79, 0xd2,
0x2d, 0xcb, 0x66, 0x0d, 0xa3, 0x97, 0x65, 0xb3, 0x8e, 0xc3, 0xed, 0x0d, 0xf4, 0x2b, 0xdc, 0xcd,
0xb1, 0x15, 0x1a, 0xdf, 0x06, 0x90, 0xe6, 0x41, 0xeb, 0xa0, 0x86, 0x66, 0xe2, 0xe9, 0x53, 0x03,
0xcd, 0xe0, 0xad, 0x2c, 0x69, 0xa0, 0x87, 0xb7, 0x01, 0xa4, 0x18, 0xcf, 0x1a, 0x57, 0x2b, 0xa6,
0x1c, 0xbd, 0x82, 0x6e, 0x8a, 0x2d, 0xca, 0x86, 0x47, 0x91, 0x7f, 0xca, 0x86, 0x47, 0x19, 0xe5,
0x6c, 0xa0, 0x0b, 0xe8, 0x67, 0xf8, 0x03, 0xed, 0xaf, 0xb3, 0xcc, 0xb2, 0x92, 0xf5, 0xb0, 0x52,
0x2f, 0xdd, 0x64, 0x69, 0x5a, 0x41, 0x6b, 0x83, 0xcb, 0x0e, 0xc0, 0xfd, 0x2a, 0xb5, 0xc4, 0xc1,
0xc5, 0xa6, 0xfc, 0xc9, 0x7b, 0xf2, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdb, 0x3c, 0x6d, 0xd7,
0xfb, 0x0d, 0x00, 0x00,
// 1247 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xac, 0x58, 0x5b, 0x73, 0xdb, 0xc4,
0x17, 0x8f, 0x62, 0x3b, 0x71, 0x8e, 0xed, 0xc6, 0xff, 0x4d, 0x9a, 0x38, 0x4a, 0xff, 0xc1, 0x15,
0x90, 0x3a, 0x6d, 0x1a, 0x20, 0x9d, 0x42, 0x81, 0x17, 0x20, 0x01, 0x26, 0x0f, 0xa5, 0x33, 0x0a,
0xed, 0x30, 0x43, 0x67, 0x34, 0x1b, 0x69, 0x9d, 0x88, 0xc8, 0x92, 0xaa, 0x5d, 0x85, 0x84, 0x6f,
0xc2, 0x33, 0x2f, 0x7d, 0xe7, 0x03, 0xf1, 0x41, 0x78, 0x61, 0xf6, 0x22, 0x59, 0x37, 0xc7, 0xe2,
0xf2, 0xb6, 0x3a, 0x7b, 0xce, 0xef, 0x5c, 0xf6, 0xec, 0xd9, 0x9f, 0x0d, 0x6b, 0x57, 0x81, 0x17,
0x4f, 0x88, 0x45, 0x49, 0x74, 0x45, 0xa2, 0x83, 0x30, 0x0a, 0x58, 0x80, 0xfa, 0x39, 0xa1, 0x15,
0x9e, 0x19, 0x1f, 0x00, 0xfa, 0x0a, 0x33, 0xfb, 0xe2, 0x98, 0x78, 0x84, 0x11, 0x93, 0xbc, 0x89,
0x09, 0x65, 0x68, 0x0b, 0xda, 0x63, 0xd7, 0x23, 0x96, 0xeb, 0xd0, 0x81, 0x36, 0x6c, 0x8c, 0x56,
0xcc, 0x65, 0xfe, 0x7d, 0xe2, 0x50, 0xe3, 0x05, 0xac, 0xe5, 0x0c, 0x68, 0x18, 0xf8, 0x94, 0xa0,
0x67, 0xb0, 0x1c, 0x11, 0x1a, 0x7b, 0x4c, 0x1a, 0x74, 0x0e, 0x77, 0x0e, 0x8a, 0xbe, 0x0e, 0x52,
0x93, 0xd8, 0x63, 0x66, 0xa2, 0x6e, 0xb8, 0xd0, 0xcd, 0x6e, 0xa0, 0x4d, 0x58, 0x56, 0xbe, 0x07,
0xda, 0x50, 0x1b, 0xad, 0x98, 0x4b, 0xd2, 0x35, 0xda, 0x80, 0x25, 0xca, 0x30, 0x8b, 0xe9, 0x60,
0x71, 0xa8, 0x8d, 0x5a, 0xa6, 0xfa, 0x42, 0xeb, 0xd0, 0x22, 0x51, 0x14, 0x44, 0x83, 0x86, 0x50,
0x97, 0x1f, 0x08, 0x41, 0x93, 0xba, 0xbf, 0x90, 0x41, 0x73, 0xa8, 0x8d, 0x7a, 0xa6, 0x58, 0x1b,
0xcb, 0xd0, 0xfa, 0x7a, 0x12, 0xb2, 0x1b, 0xe3, 0x13, 0x18, 0xbc, 0xc2, 0x76, 0x1c, 0x4f, 0x5e,
0x89, 0x18, 0x8f, 0x2e, 0x88, 0x7d, 0x99, 0xe4, 0xbe, 0x0d, 0x2b, 0x2a, 0x72, 0x15, 0x41, 0xcf,
0x6c, 0x4b, 0xc1, 0x89, 0x63, 0x7c, 0x01, 0x5b, 0x15, 0x86, 0xaa, 0x06, 0xef, 0x42, 0xef, 0x1c,
0x47, 0x67, 0xf8, 0x9c, 0x58, 0x11, 0x66, 0x6e, 0x20, 0xac, 0x35, 0xb3, 0xab, 0x84, 0x26, 0x97,
0x19, 0x3f, 0x82, 0x9e, 0x43, 0x08, 0x26, 0x21, 0xb6, 0x59, 0x1d, 0xe7, 0x68, 0x08, 0x9d, 0x30,
0x22, 0xd8, 0xf3, 0x02, 0x1b, 0x33, 0x22, 0xaa, 0xd0, 0x30, 0xb3, 0x22, 0xe3, 0xff, 0xb0, 0x5d,
0x09, 0x2e, 0x03, 0x34, 0x9e, 0x15, 0xa2, 0x0f, 0x26, 0x13, 0xb7, 0x96, 0x6b, 0xe3, 0x5e, 0x29,
0x6a, 0x61, 0xa9, 0x70, 0x3f, 0x2d, 0xec, 0x7a, 0x04, 0xfb, 0x71, 0x58, 0x0b, 0xb8, 0x18, 0x71,
0x62, 0x9a, 0x22, 0x6f, 0xca, 0xe6, 0x38, 0x0a, 0x3c, 0x8f, 0xd8, 0xcc, 0x0d, 0xfc, 0x04, 0x76,
0x07, 0xc0, 0x4e, 0x85, 0xaa, 0x55, 0x32, 0x12, 0x43, 0x87, 0x41, 0xd9, 0x54, 0xc1, 0xbe, 0xd5,
0x60, 0xed, 0x4b, 0x4a, 0xdd, 0x73, 0x5f, 0xba, 0xad, 0x55, 0xfe, 0xbc, 0xc3, 0xc5, 0xa2, 0xc3,
0xe2, 0xf1, 0x34, 0x4a, 0xc7, 0xc3, 0x35, 0x22, 0x12, 0x7a, 0xae, 0x8d, 0x05, 0x44, 0x53, 0x40,
0x64, 0x45, 0xa8, 0x0f, 0x0d, 0xc6, 0xbc, 0x41, 0x4b, 0xec, 0xf0, 0xa5, 0xb1, 0x01, 0xeb, 0xf9,
0x48, 0x55, 0x0a, 0x1f, 0xc3, 0xa6, 0x94, 0x9c, 0xde, 0xf8, 0xf6, 0xa9, 0xb8, 0x09, 0xb5, 0x0a,
0xfe, 0xa7, 0x06, 0x83, 0xb2, 0xa1, 0xea, 0xe0, 0x7f, 0x9b, 0xff, 0xdf, 0xcd, 0x0e, 0xbd, 0x03,
0x1d, 0x86, 0x5d, 0xcf, 0x0a, 0xc6, 0x63, 0x4a, 0xd8, 0x60, 0x69, 0xa8, 0x8d, 0x9a, 0x26, 0x70,
0xd1, 0x0b, 0x21, 0x41, 0x7b, 0xd0, 0xb7, 0x65, 0x17, 0x5b, 0x11, 0xb9, 0x72, 0x29, 0x47, 0x5e,
0x16, 0x81, 0xad, 0xda, 0x49, 0x77, 0x4b, 0x31, 0x32, 0xa0, 0xe7, 0x3a, 0xd7, 0x96, 0x18, 0x1e,
0xe2, 0xea, 0xb7, 0x05, 0x5a, 0xc7, 0x75, 0xae, 0xbf, 0x71, 0x3d, 0x72, 0xca, 0x27, 0xc0, 0x53,
0xd8, 0x98, 0x26, 0x7f, 0xe2, 0x3b, 0xe4, 0xba, 0x56, 0xd1, 0xbe, 0xcd, 0x16, 0x5b, 0x99, 0xa9,
0x92, 0xed, 0x03, 0x72, 0xb9, 0x40, 0xfa, 0xb5, 0x03, 0x9f, 0x11, 0x9f, 0x09, 0x80, 0xae, 0xd9,
0x17, 0x3b, 0xdc, 0xf9, 0x91, 0x94, 0x1b, 0xbf, 0x6a, 0x70, 0x77, 0x8a, 0x74, 0x8c, 0x19, 0xae,
0xd5, 0x7a, 0x3a, 0xb4, 0xd3, 0xec, 0x17, 0xe5, 0x5e, 0xf2, 0xcd, 0xc7, 0xa2, 0xaa, 0x5e, 0x43,
0xec, 0xa8, 0xaf, 0xaa, 0x01, 0xc8, 0x9d, 0xf8, 0x84, 0x38, 0x72, 0xba, 0xca, 0x63, 0x68, 0x4b,
0xc1, 0x89, 0x63, 0x7c, 0x9e, 0xad, 0x8d, 0x0c, 0x4d, 0xe5, 0x78, 0x1f, 0xba, 0x15, 0xd9, 0x75,
0xc6, 0x99, 0xc4, 0x3e, 0x02, 0x24, 0x8d, 0x9f, 0x07, 0xb1, 0x5f, 0x6f, 0xa6, 0xdc, 0x85, 0xb5,
0x9c, 0x89, 0x6a, 0xec, 0x27, 0xb0, 0x2e, 0xc5, 0x2f, 0xfd, 0x49, 0x6d, 0xac, 0xcd, 0xa4, 0xac,
0xa9, 0x91, 0x42, 0x3b, 0x4c, 0x9c, 0xe4, 0x1f, 0xb8, 0x5b, 0xc1, 0x36, 0x92, 0x08, 0xf2, 0x6f,
0x9c, 0xf1, 0xbb, 0x06, 0x1b, 0xa6, 0x6a, 0x67, 0xf2, 0xdf, 0x0e, 0x8e, 0xec, 0xc5, 0x69, 0xcc,
0xbc, 0x38, 0xcd, 0xe9, 0xc5, 0x19, 0x41, 0x9f, 0x06, 0x71, 0x64, 0x13, 0xcb, 0xc1, 0x0c, 0x5b,
0x7e, 0xe0, 0x10, 0x75, 0xa0, 0x77, 0xa4, 0x9c, 0x1f, 0xe0, 0x77, 0x81, 0x43, 0x8c, 0x2d, 0xd8,
0x2c, 0x05, 0xad, 0x12, 0xf2, 0x61, 0xf5, 0x28, 0x08, 0x6f, 0x78, 0x83, 0xd6, 0x4c, 0xa4, 0xe3,
0x52, 0x2b, 0xb9, 0x64, 0x22, 0x93, 0xb6, 0xb9, 0xe2, 0xd2, 0x13, 0x79, 0xc3, 0xd4, 0xbe, 0x83,
0x99, 0xdc, 0x6f, 0x24, 0xfb, 0xc7, 0x98, 0xf1, 0x7d, 0xe3, 0x29, 0xf4, 0xa7, 0xfe, 0xea, 0xf7,
0xd6, 0x67, 0xb0, 0x6d, 0x12, 0xec, 0xc8, 0xe0, 0xc5, 0x55, 0xae, 0x3f, 0xee, 0xfe, 0xd0, 0xe0,
0x5e, 0xb5, 0x71, 0x9d, 0x91, 0xc7, 0x2f, 0x77, 0x32, 0x52, 0x98, 0x3b, 0x21, 0x94, 0xe1, 0x49,
0x28, 0xf2, 0x6e, 0x9a, 0x7d, 0x35, 0x57, 0xbe, 0x4f, 0xe4, 0xe5, 0x01, 0xd4, 0x28, 0x0d, 0x20,
0x8e, 0x98, 0xd4, 0x27, 0x83, 0xd8, 0x94, 0x88, 0x8e, 0xac, 0x53, 0x0e, 0x31, 0xd5, 0x16, 0x88,
0x2d, 0x89, 0xa8, 0x14, 0xc5, 0x48, 0xfb, 0x01, 0xe0, 0xd8, 0xa5, 0x97, 0x32, 0x2d, 0xde, 0x29,
0x8e, 0x1b, 0xa9, 0xe7, 0x90, 0x2f, 0xb9, 0x04, 0x7b, 0x9e, 0x0a, 0x9a, 0x2f, 0xf9, 0x64, 0x88,
0x29, 0x71, 0x54, 0x78, 0x62, 0xcd, 0x65, 0xe3, 0x88, 0x10, 0x15, 0x89, 0x58, 0x1b, 0xbf, 0x69,
0xb0, 0xf2, 0x9c, 0x4c, 0x14, 0xf2, 0x0e, 0xc0, 0x79, 0x10, 0x05, 0x31, 0x73, 0x7d, 0x42, 0x85,
0x83, 0x96, 0x99, 0x91, 0xfc, 0x73, 0x3f, 0x62, 0x52, 0x11, 0x6f, 0xac, 0x92, 0x13, 0x6b, 0x2e,
0xbb, 0x20, 0x38, 0x54, 0x2f, 0x82, 0x58, 0x73, 0xa2, 0x47, 0x19, 0xb6, 0x2f, 0xc5, 0x03, 0xd0,
0x34, 0xe5, 0xc7, 0xe1, 0xdb, 0x1e, 0x74, 0xd5, 0xdc, 0x12, 0x4c, 0x13, 0xbd, 0x86, 0x4e, 0x86,
0xa1, 0xa2, 0xf7, 0xca, 0x44, 0xb4, 0xcc, 0x78, 0xf5, 0xf7, 0xe7, 0x68, 0xa9, 0x1b, 0xb3, 0x80,
0x7c, 0xf8, 0x5f, 0x89, 0x01, 0xa2, 0x87, 0x65, 0xeb, 0x59, 0xfc, 0x52, 0x7f, 0x54, 0x4b, 0x37,
0xf5, 0xc7, 0x60, 0xad, 0x82, 0xd2, 0xa1, 0xfd, 0x39, 0x28, 0x39, 0x5a, 0xa9, 0x3f, 0xae, 0xa9,
0x9d, 0x7a, 0x7d, 0x03, 0xa8, 0xcc, 0xf7, 0xd0, 0xa3, 0xb9, 0x30, 0x53, 0x3e, 0xa9, 0xef, 0xd7,
0x53, 0x9e, 0x99, 0xa8, 0x64, 0x82, 0x73, 0x13, 0xcd, 0x71, 0xcd, 0xb9, 0x89, 0x16, 0xe8, 0xe5,
0x02, 0xba, 0x84, 0x7e, 0x91, 0x25, 0xa2, 0xbd, 0x59, 0x3f, 0x5d, 0x4a, 0x24, 0x54, 0x7f, 0x58,
0x47, 0x35, 0x75, 0x66, 0x41, 0x37, 0xcb, 0xe5, 0x50, 0x45, 0xd3, 0x55, 0xb0, 0x52, 0x7d, 0x77,
0x9e, 0x5a, 0x36, 0x9b, 0x22, 0xb7, 0xab, 0xca, 0x66, 0x06, 0x71, 0xac, 0xca, 0x66, 0x16, 0x55,
0x34, 0x16, 0xd0, 0x4f, 0xb0, 0x5a, 0x20, 0x45, 0x68, 0x74, 0x1b, 0x40, 0x96, 0x6e, 0xe9, 0x7b,
0x35, 0x34, 0x13, 0x4f, 0x1f, 0x6a, 0xe8, 0x1c, 0xee, 0xe4, 0xb9, 0x09, 0x7a, 0x70, 0x1b, 0x40,
0x86, 0x58, 0xe9, 0xa3, 0xf9, 0x8a, 0x19, 0x47, 0xaf, 0xa1, 0x93, 0x21, 0x25, 0x55, 0xc3, 0xa3,
0x4c, 0x73, 0xaa, 0x86, 0x47, 0x15, 0xb3, 0x59, 0x40, 0x67, 0xd0, 0xcb, 0xd1, 0x14, 0xb4, 0x3b,
0xcb, 0x32, 0x4f, 0x7e, 0xf4, 0x07, 0x73, 0xf5, 0xb2, 0x4d, 0x96, 0x65, 0x2f, 0x68, 0x66, 0x70,
0xf9, 0x01, 0xb8, 0x3b, 0x4f, 0x2d, 0x75, 0x70, 0x01, 0xab, 0x05, 0x42, 0x51, 0x75, 0xee, 0xd5,
0x44, 0xa9, 0xea, 0xdc, 0x67, 0xb1, 0x93, 0x05, 0xf4, 0x33, 0xac, 0x57, 0xbd, 0xdd, 0xe8, 0x71,
0x15, 0xc8, 0x4c, 0x82, 0xa0, 0x1f, 0xd4, 0x55, 0x4f, 0x1d, 0xbf, 0x84, 0x76, 0x42, 0x54, 0xd0,
0xfd, 0xb2, 0x75, 0x81, 0x34, 0xe9, 0xc6, 0x6d, 0x2a, 0xd3, 0xe6, 0x3a, 0x5b, 0x12, 0xff, 0xc2,
0x3c, 0xf9, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x6c, 0x7d, 0x0d, 0xbb, 0x9c, 0x11, 0x00, 0x00,
}

8
weed/server/volume_grpc_admin.go

@ -29,7 +29,7 @@ func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb.
resp := &volume_server_pb.AssignVolumeResponse{}
err := vs.store.AddVolume(
storage.VolumeId(req.VolumdId),
storage.VolumeId(req.VolumeId),
req.Collection,
vs.needleMapKind,
req.Replication,
@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V
resp := &volume_server_pb.VolumeMountResponse{}
err := vs.store.MountVolume(storage.VolumeId(req.VolumdId))
err := vs.store.MountVolume(storage.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume mount %v: %v", req, err)
@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb
resp := &volume_server_pb.VolumeUnmountResponse{}
err := vs.store.UnmountVolume(storage.VolumeId(req.VolumdId))
err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume unmount %v: %v", req, err)
@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
resp := &volume_server_pb.VolumeDeleteResponse{}
err := vs.store.DeleteVolume(storage.VolumeId(req.VolumdId))
err := vs.store.DeleteVolume(storage.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume delete %v: %v", req, err)

155
weed/server/volume_grpc_replicate.go

@ -0,0 +1,155 @@
package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"io"
"os"
)
func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_pb.ReplicateVolumeRequest) (*volume_server_pb.ReplicateVolumeResponse, error) {
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v != nil {
// unmount the volume
err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err)
}
}
location := vs.store.FindFreeLocation()
if location == nil {
return nil, fmt.Errorf("no space left")
}
volumeFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
// send .idx file
// send .dat file
// confirm size and timestamp
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// TODO read file sizes before copying
client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{})
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: req.VolumeId,
IsIdxFile: true,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
}
err = writeToFile(copyFileClient, volumeFileName+".idx")
if err != nil {
return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
}
copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: req.VolumeId,
IsDatFile: true,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
}
err = writeToFile(copyFileClient, volumeFileName+".dat")
if err != nil {
return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
}
return nil
})
if err != nil {
return nil, err
}
// TODO: check the timestamp and size
// mount the volume
err = vs.store.MountVolume(storage.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
return &volume_server_pb.ReplicateVolumeResponse{}, err
}
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
println("writing to ", fileName)
dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil
}
defer dst.Close()
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
break
}
if receiveErr != nil {
return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
}
return nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
return resp, nil
}
func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) (error) {
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
const BufferSize = 1024 * 16
var fileName = v.FileName()
if req.IsDatFile {
fileName += ".dat"
} else if req.IsIdxFile {
fileName += ".idx"
}
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
buffer := make([]byte, BufferSize)
for {
bytesread, err := file.Read(buffer)
if err != nil {
if err != io.EOF {
return err
}
break
}
stream.Send(&volume_server_pb.CopyFileResponse{
FileContent: buffer[:bytesread],
})
}
return nil
}

20
weed/server/volume_grpc_sync.go

@ -12,14 +12,14 @@ import (
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumdId)
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
resp := v.GetVolumeSyncStatus()
glog.V(2).Infof("volume sync status %d", req.VolumdId)
glog.V(2).Infof("volume sync status %d", req.VolumeId)
return resp, nil
@ -27,17 +27,17 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumdId)
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
content, err := v.IndexFileContent()
if err != nil {
glog.Errorf("sync volume %d index: %v", req.VolumdId, err)
glog.Errorf("sync volume %d index: %v", req.VolumeId, err)
} else {
glog.V(2).Infof("sync volume %d index", req.VolumdId)
glog.V(2).Infof("sync volume %d index", req.VolumeId)
}
const blockSizeLimit = 1024 * 1024 * 2
@ -57,9 +57,9 @@ func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexReq
func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumdId)
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
if uint32(v.SuperBlock.CompactRevision) != req.Revision {
@ -82,7 +82,7 @@ func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataReque
}
if err != nil {
glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
glog.Errorf("sync volume %d data: %v", req.VolumeId, err)
}
const blockSizeLimit = 1024 * 1024 * 2

22
weed/server/volume_grpc_vacuum.go

@ -12,12 +12,12 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId))
garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumeId))
resp.GarbageRatio = garbageRatio
if err != nil {
glog.V(3).Infof("check volume %d: %v", req.VolumdId, err)
glog.V(3).Infof("check volume %d: %v", req.VolumeId, err)
}
return resp, err
@ -28,12 +28,12 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate)
err := vs.store.CompactVolume(storage.VolumeId(req.VolumeId), req.Preallocate)
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumdId, err)
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("compact volume %d", req.VolumdId)
glog.V(1).Infof("compact volume %d", req.VolumeId)
}
return resp, err
@ -44,12 +44,12 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId))
err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("commit volume %d: %v", req.VolumdId, err)
glog.Errorf("commit volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("commit volume %d", req.VolumdId)
glog.V(1).Infof("commit volume %d", req.VolumeId)
}
return resp, err
@ -60,12 +60,12 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId))
err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("cleanup volume %d: %v", req.VolumdId, err)
glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("cleanup volume %d", req.VolumdId)
glog.V(1).Infof("cleanup volume %d", req.VolumeId)
}
return resp, err

2
weed/shell/shell_liner.go

@ -71,7 +71,7 @@ func RunShell(options ShellOptions) {
} else {
for _, c := range commands {
if c.Name() == cmd {
if err := c.Do(args, commandEnv, os.Stderr); err != nil {
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
}

4
weed/storage/store.go

@ -77,7 +77,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume {
}
return nil
}
func (s *Store) findFreeLocation() (ret *DiskLocation) {
func (s *Store) FindFreeLocation() (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
@ -92,7 +92,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
if location := s.FindFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {

13
weed/storage/volume.go

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
"path"
"strconv"
"sync"
"time"
@ -42,14 +43,18 @@ func (v *Volume) String() string {
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
}
func (v *Volume) FileName() (fileName string) {
if v.Collection == "" {
fileName = path.Join(v.dir, v.Id.String())
func VolumeFileName(collection string, dir string, id int) (fileName string) {
idString := strconv.Itoa(id)
if collection == "" {
fileName = path.Join(dir, idString)
} else {
fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
fileName = path.Join(dir, collection+"_"+idString)
}
return
}
func (v *Volume) FileName() (fileName string) {
return VolumeFileName(v.Collection, v.dir, int(v.Id))
}
func (v *Volume) DataFile() *os.File {
return v.dataFile
}

2
weed/storage/volume_sync.go

@ -192,7 +192,7 @@ func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
VolumeId: uint32(v.Id),
Revision: uint32(compactRevision),
Offset: uint32(needleValue.Offset),
Size: uint32(needleValue.Size),

2
weed/topology/allocate_volume.go

@ -17,7 +17,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.Vo
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.AssignVolume(context.Background(), &volume_server_pb.AssignVolumeRequest{
VolumdId: uint32(vid),
VolumeId: uint32(vid),
Collection: option.Collection,
Replication: option.ReplicaPlacement.String(),
Ttl: option.Ttl.String(),

8
weed/topology/topology_vacuum.go

@ -17,7 +17,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
go func(index int, url string, vid storage.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumdId: uint32(vid),
VolumeId: uint32(vid),
})
if err != nil {
ch <- false
@ -52,7 +52,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumdId: uint32(vid),
VolumeId: uint32(vid),
})
return err
})
@ -83,7 +83,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumdId: uint32(vid),
VolumeId: uint32(vid),
})
return err
})
@ -104,7 +104,7 @@ func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout,
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumdId: uint32(vid),
VolumeId: uint32(vid),
})
return err
})

Loading…
Cancel
Save