diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 6b4a3eab0..1bf283c76 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -283,6 +283,7 @@ message CopyFileRequest { string collection = 5; bool is_ec_volume = 6; bool ignore_source_file_not_found = 7; + uint32 generation = 8; // generation of files to copy, defaults to 0 for backward compatibility } message CopyFileResponse { bytes file_content = 1; diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 1030b957b..72accfcdb 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -1831,6 +1831,7 @@ type CopyFileRequest struct { Collection string `protobuf:"bytes,5,opt,name=collection,proto3" json:"collection,omitempty"` IsEcVolume bool `protobuf:"varint,6,opt,name=is_ec_volume,json=isEcVolume,proto3" json:"is_ec_volume,omitempty"` IgnoreSourceFileNotFound bool `protobuf:"varint,7,opt,name=ignore_source_file_not_found,json=ignoreSourceFileNotFound,proto3" json:"ignore_source_file_not_found,omitempty"` + Generation uint32 `protobuf:"varint,8,opt,name=generation,proto3" json:"generation,omitempty"` // generation of files to copy, defaults to 0 for backward compatibility unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1914,6 +1915,13 @@ func (x *CopyFileRequest) GetIgnoreSourceFileNotFound() bool { return false } +func (x *CopyFileRequest) GetGeneration() uint32 { + if x != nil { + return x.Generation + } + return 0 +} + type CopyFileResponse struct { state protoimpl.MessageState `protogen:"open.v1"` FileContent []byte `protobuf:"bytes,1,opt,name=file_content,json=fileContent,proto3" json:"file_content,omitempty"` @@ -6265,7 +6273,7 @@ const file_volume_server_proto_rawDesc = "" + "\x12io_byte_per_second\x18\a \x01(\x03R\x0fioBytePerSecond\"h\n" + "\x12VolumeCopyResponse\x12)\n" + "\x11last_append_at_ns\x18\x01 \x01(\x04R\x0elastAppendAtNs\x12'\n" + - "\x0fprocessed_bytes\x18\x02 \x01(\x03R\x0eprocessedBytes\"\x94\x02\n" + + "\x0fprocessed_bytes\x18\x02 \x01(\x03R\x0eprocessedBytes\"\xb4\x02\n" + "\x0fCopyFileRequest\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x10\n" + "\x03ext\x18\x02 \x01(\tR\x03ext\x12/\n" + @@ -6277,7 +6285,10 @@ const file_volume_server_proto_rawDesc = "" + "collection\x12 \n" + "\fis_ec_volume\x18\x06 \x01(\bR\n" + "isEcVolume\x12>\n" + - "\x1cignore_source_file_not_found\x18\a \x01(\bR\x18ignoreSourceFileNotFound\"[\n" + + "\x1cignore_source_file_not_found\x18\a \x01(\bR\x18ignoreSourceFileNotFound\x12\x1e\n" + + "\n" + + "generation\x18\b \x01(\rR\n" + + "generation\"[\n" + "\x10CopyFileResponse\x12!\n" + "\ffile_content\x18\x01 \x01(\fR\vfileContent\x12$\n" + "\x0emodified_ts_ns\x18\x02 \x01(\x03R\fmodifiedTsNs\"z\n" + diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 84a9035ca..17089b46f 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -131,7 +131,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre nextReportTarget = processed + reportInterval } return true - }, throttler); err != nil { + }, throttler, 0); err != nil { // regular volumes use generation 0 return err } if sendErr != nil { @@ -142,14 +142,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } } - if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler, 0); err != nil { // regular volumes use generation 0 return err } if modifiedTsNs > 0 { os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler, 0); err != nil { // regular volumes use generation 0 return err } if modifiedTsNs > 0 { @@ -199,10 +199,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { - return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond)) + return vs.doCopyFileWithGeneration(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, 0) } -func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) { +func (vs *VolumeServer) doCopyFileWithGeneration(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, generation uint32) (modifiedTsNs int64, err error) { + return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond), generation) +} + +func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler, generation uint32) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -212,6 +216,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe Collection: collection, IsEcVolume: isEcVolume, IgnoreSourceFileNotFound: ignoreSourceFileNotFound, + Generation: generation, // pass generation to source server }) if err != nil { return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) @@ -332,22 +337,29 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v v.SyncToDisk() fileName = v.FileName(req.Ext) } else { - baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext + // Use generation-aware filename for EC volumes + generation := req.Generation for _, location := range vs.store.Locations { - tName := util.Join(location.Directory, baseFileName) + // Try data directory with generation-aware naming + baseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation) + tName := baseFileName + req.Ext if util.FileExists(tName) { fileName = tName + break } - tName = util.Join(location.IdxDirectory, baseFileName) + // Try index directory with generation-aware naming + baseFileName = erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation) + tName = baseFileName + req.Ext if util.FileExists(tName) { fileName = tName + break } } if fileName == "" { if req.IgnoreSourceFileNotFound { return nil } - return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId) + return fmt.Errorf("CopyFile not found ec volume id %d generation %d", req.VolumeId, generation) } } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index c5dc2a951..a2606839b 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -187,36 +187,40 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv } } - dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) - indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) + // Generate target filenames with generation awareness + generation := req.Generation + dataBaseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation) + indexBaseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation) + + glog.V(1).Infof("VolumeEcShardsCopy: copying EC shards with generation %d: data=%s, index=%s", + generation, dataBaseFileName, indexBaseFileName) err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // copy ec data slices + // copy ec data slices with generation awareness for _, shardId := range req.ShardIds { - if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil { + if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil, generation); err != nil { return err } } if req.CopyEcxFile { - - // copy ecx file - if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil { + // copy ecx file with generation awareness + if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil, generation); err != nil { return err } } if req.CopyEcjFile { - // copy ecj file - if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil { + // copy ecj file with generation awareness + if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil, generation); err != nil { return err } } if req.CopyVifFile { - // copy vif file - if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil { + // copy vif file with generation awareness + if _, err := vs.doCopyFileWithGeneration(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil, generation); err != nil { return err } } diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 7a7367aba..2451dc0e5 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -150,6 +150,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(sourceNode), + Generation: 1, // TODO: implement proper generation tracking in vacuum task }) if copyErr != nil { return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr) @@ -254,6 +255,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(sourceNode), + Generation: 1, // TODO: implement proper generation tracking in vacuum task }) if copyErr != nil { return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr)