From c602f53a6e7eca007319a8aab95f96e6fe0f73ae Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 16 Jun 2025 22:46:13 -0700 Subject: [PATCH] tail-volume-uses-the-source-volume-version --- weed/operation/tail_volume.go | 3 ++- weed/pb/volume_server.proto | 1 + weed/pb/volume_server_pb/volume_server.pb.go | 13 +++++++++++-- weed/server/volume_grpc_tail.go | 10 +++++++--- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index 67ee3e825..3ab0c73cc 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -54,6 +54,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia needleHeader := resp.NeedleHeader needleBody := resp.NeedleBody + version := needle.Version(resp.Version) if len(needleHeader) == 0 { continue @@ -73,7 +74,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia n := new(needle.Needle) n.ParseNeedleHeader(needleHeader) - err = n.ReadNeedleBodyBytes(needleBody, needle.GetCurrentVersion()) + err = n.ReadNeedleBodyBytes(needleBody, version) if err != nil { return err } diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 616ed01be..79b1ba1d0 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -341,6 +341,7 @@ message VolumeTailSenderResponse { bytes needle_header = 1; bytes needle_body = 2; bool is_last_chunk = 3; + uint32 version = 4; } message VolumeTailReceiverRequest { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 2eb514824..b4c5ec809 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -2535,6 +2535,7 @@ type VolumeTailSenderResponse struct { NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"` NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"` IsLastChunk bool `protobuf:"varint,3,opt,name=is_last_chunk,json=isLastChunk,proto3" json:"is_last_chunk,omitempty"` + Version uint32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2590,6 +2591,13 @@ func (x *VolumeTailSenderResponse) GetIsLastChunk() bool { return false } +func (x *VolumeTailSenderResponse) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + type VolumeTailReceiverRequest struct { state protoimpl.MessageState `protogen:"open.v1"` VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` @@ -5887,12 +5895,13 @@ const file_volume_server_proto_rawDesc = "" + "\x17VolumeTailSenderRequest\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" + "\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" + - "\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x84\x01\n" + + "\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x9e\x01\n" + "\x18VolumeTailSenderResponse\x12#\n" + "\rneedle_header\x18\x01 \x01(\fR\fneedleHeader\x12\x1f\n" + "\vneedle_body\x18\x02 \x01(\fR\n" + "needleBody\x12\"\n" + - "\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\"\xb7\x01\n" + + "\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\x12\x18\n" + + "\aversion\x18\x04 \x01(\rR\aversion\"\xb7\x01\n" + "\x19VolumeTailReceiverRequest\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" + "\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" + diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index b44d7d248..935635a83 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -3,9 +3,10 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -65,12 +66,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe if isLastOne { // need to heart beat to the client to ensure the connection health - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true}) + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())}) return lastTimestampNs, sendErr } scanner := &VolumeFileScanner4Tailing{ - stream: stream, + stream: stream, + version: uint32(v.Version()), } err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner) @@ -101,6 +103,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv type VolumeFileScanner4Tailing struct { stream volume_server_pb.VolumeServer_VolumeTailSenderServer lastProcessedTimestampNs uint64 + version uint32 } func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -126,6 +129,7 @@ func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset i NeedleHeader: needleHeader, NeedleBody: needleBody[i:stopOffset], IsLastChunk: isLastChunk, + Version: scanner.version, }) if sendErr != nil { return sendErr