diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 69ce269cf..47f50a385 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -391,12 +391,22 @@ message SubscribeMetadataRequest { int32 client_epoch = 9; repeated string directories = 10; // exact directory to watch bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events + bool client_supports_metadata_chunks = 12; // client can read log file chunks from volume servers } message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; int64 ts_ns = 3; repeated SubscribeMetadataResponse events = 4; // batch of additional events (backlog catch-up) + repeated LogFileChunkRef log_file_refs = 5; // log file chunk refs for client direct-read +} +// A persisted log file that the client can read directly from volume servers. +// The file format is: [4-byte size | protobuf LogEntry] repeated. +// Each LogEntry.Data contains a marshaled SubscribeMetadataResponse. +message LogFileChunkRef { + repeated FileChunk chunks = 1; // chunk references (fids) to read from volume servers + int64 file_ts_ns = 2; // minute-level timestamp of the log file + string filer_id = 3; // filer signature suffix from log filename } message TraverseBfsMetadataRequest { diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index a46f3fb71..0cf71efe1 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -40,6 +40,74 @@ func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePositi } +// CollectLogFileRefs lists persisted log files and returns their chunk references +// without reading any data from volume servers. The client can use the returned +// fids to read log file data directly from volume servers in parallel. +func (f *Filer) CollectLogFileRefs(ctx context.Context, startPosition log_buffer.MessagePosition, stopTsNs int64) (refs []*filer_pb.LogFileChunkRef, lastTsNs int64, err error) { + if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs { + return nil, 0, nil + } + + startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Time.Year(), startPosition.Time.Month(), startPosition.Time.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Time.Hour(), startPosition.Time.Minute()) + var stopDate, stopHourMinute string + if stopTsNs != 0 { + stopTime := time.Unix(0, stopTsNs).UTC() + stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day()) + stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute()) + } + + dayEntries, _, listDayErr := f.ListDirectoryEntries(ctx, SystemLogDir, startDate, true, math.MaxInt32, "", "", "") + if listDayErr != nil { + return nil, 0, fmt.Errorf("fail to list log by day: %w", listDayErr) + } + + for _, dayEntry := range dayEntries { + if stopDate != "" && strings.Compare(dayEntry.Name(), stopDate) > 0 { + break + } + + hourMinuteEntries, _, listErr := f.ListDirectoryEntries(ctx, util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") + if listErr != nil { + return nil, 0, fmt.Errorf("fail to list log %s: %w", dayEntry.Name(), listErr) + } + + for _, hmEntry := range hourMinuteEntries { + hourMinute := util.FileNameBase(hmEntry.Name()) + if dayEntry.Name() == startDate && strings.Compare(hourMinute, startHourMinute) < 0 { + continue + } + if dayEntry.Name() == stopDate && stopHourMinute != "" && strings.Compare(hourMinute, stopHourMinute) > 0 { + break + } + + tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute) + t, parseErr := time.Parse("2006-01-02-15-04", tsMinute) + if parseErr != nil { + glog.Errorf("failed to parse %s: %v", tsMinute, parseErr) + continue + } + filerId := getFilerId(hmEntry.Name()) + if filerId == "" { + continue + } + + chunks := hmEntry.GetChunks() + if len(chunks) == 0 { + continue + } + + refs = append(refs, &filer_pb.LogFileChunkRef{ + Chunks: chunks, + FileTsNs: t.UnixNano(), + FilerId: filerId, + }) + lastTsNs = t.UnixNano() + } + } + return +} + func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) { startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Time.Year(), startPosition.Time.Month(), startPosition.Time.Day()) dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "") diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index ab610bb6a..2a922f438 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -199,13 +199,20 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, ctx, cancel := context.WithCancel(context.Background()) defer cancel() atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1) + // Construct a log file reader that reads chunks via the peer filer's LookupVolume. + lookupFn := LookupFn(filerClient{client}) + logFileReaderFn := func(chunks []*filer_pb.FileChunk) (io.ReadCloser, error) { + return NewChunkStreamReaderFromLookup(ctx, lookupFn, chunks), nil + } + stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + string(self), - PathPrefix: "/", - SinceNs: lastTsNs, - ClientId: ma.filer.UniqueFilerId, - ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch), - ClientSupportsBatching: true, + ClientName: "filer:" + string(self), + PathPrefix: "/", + SinceNs: lastTsNs, + ClientId: ma.filer.UniqueFilerId, + ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch), + ClientSupportsBatching: true, + ClientSupportsMetadataChunks: true, }) if err != nil { glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err) @@ -222,6 +229,8 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, return nil } + var pendingRefs []*filer_pb.LogFileChunkRef + for { resp, listenErr := stream.Recv() if listenErr == io.EOF { @@ -232,8 +241,32 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, return listenErr } - if err := processOne(resp); err != nil { - return err + // Accumulate log file chunk references + if len(resp.LogFileRefs) > 0 { + pendingRefs = append(pendingRefs, resp.LogFileRefs...) + continue + } + + // Process accumulated refs (transition from disk to in-memory) + if len(pendingRefs) > 0 { + lastTs, readErr := pb.ReadLogFileRefs(pendingRefs, logFileReaderFn, + lastTsNs, 0, pb.PathFilter{PathPrefix: "/"}, + func(event *filer_pb.SubscribeMetadataResponse) error { + return processOne(event) + }) + if readErr != nil { + return fmt.Errorf("read log file refs from %s: %w", peer, readErr) + } + if lastTs > 0 { + lastTsNs = lastTs + } + pendingRefs = nil + } + + if resp.EventNotification != nil { + if err := processOne(resp); err != nil { + return err + } } // Process any additional batched events for _, batchedEvent := range resp.Events { @@ -302,3 +335,22 @@ func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSign return } + +// filerClient adapts a SeaweedFilerClient to the FilerClient interface +// for use with LookupFn. Used by MetaAggregator to resolve volume IDs +// on peer filers. +type filerClient struct { + client filer_pb.SeaweedFilerClient +} + +func (fc filerClient) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return fn(fc.client) +} + +func (fc filerClient) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +func (fc filerClient) GetDataCenter() string { + return "" +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 6cde0d776..511deb696 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -329,6 +329,12 @@ func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.M return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks) } +// NewChunkStreamReaderFromLookup creates a ChunkStreamReader from a lookup function. +// Used by clients that already have a LookupFileIdFunctionType (e.g., from FilerSource). +func NewChunkStreamReaderFromLookup(ctx context.Context, lookupFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + return doNewChunkStreamReader(ctx, lookupFn, chunks) +} + func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 69ce269cf..47f50a385 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -391,12 +391,22 @@ message SubscribeMetadataRequest { int32 client_epoch = 9; repeated string directories = 10; // exact directory to watch bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events + bool client_supports_metadata_chunks = 12; // client can read log file chunks from volume servers } message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; int64 ts_ns = 3; repeated SubscribeMetadataResponse events = 4; // batch of additional events (backlog catch-up) + repeated LogFileChunkRef log_file_refs = 5; // log file chunk refs for client direct-read +} +// A persisted log file that the client can read directly from volume servers. +// The file format is: [4-byte size | protobuf LogEntry] repeated. +// Each LogEntry.Data contains a marshaled SubscribeMetadataResponse. +message LogFileChunkRef { + repeated FileChunk chunks = 1; // chunk references (fids) to read from volume servers + int64 file_ts_ns = 2; // minute-level timestamp of the log file + string filer_id = 3; // filer signature suffix from log filename } message TraverseBfsMetadataRequest { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 2a49f55cd..d8399fec4 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -2899,19 +2899,20 @@ func (x *GetFilerConfigurationResponse) GetMinorVersion() int32 { } type SubscribeMetadataRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` - PathPrefix string `protobuf:"bytes,2,opt,name=path_prefix,json=pathPrefix,proto3" json:"path_prefix,omitempty"` - SinceNs int64 `protobuf:"varint,3,opt,name=since_ns,json=sinceNs,proto3" json:"since_ns,omitempty"` - Signature int32 `protobuf:"varint,4,opt,name=signature,proto3" json:"signature,omitempty"` - PathPrefixes []string `protobuf:"bytes,6,rep,name=path_prefixes,json=pathPrefixes,proto3" json:"path_prefixes,omitempty"` - ClientId int32 `protobuf:"varint,7,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` - UntilNs int64 `protobuf:"varint,8,opt,name=until_ns,json=untilNs,proto3" json:"until_ns,omitempty"` - ClientEpoch int32 `protobuf:"varint,9,opt,name=client_epoch,json=clientEpoch,proto3" json:"client_epoch,omitempty"` - Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch - ClientSupportsBatching bool `protobuf:"varint,11,opt,name=client_supports_batching,json=clientSupportsBatching,proto3" json:"client_supports_batching,omitempty"` // client can unpack SubscribeMetadataResponse.events - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` + PathPrefix string `protobuf:"bytes,2,opt,name=path_prefix,json=pathPrefix,proto3" json:"path_prefix,omitempty"` + SinceNs int64 `protobuf:"varint,3,opt,name=since_ns,json=sinceNs,proto3" json:"since_ns,omitempty"` + Signature int32 `protobuf:"varint,4,opt,name=signature,proto3" json:"signature,omitempty"` + PathPrefixes []string `protobuf:"bytes,6,rep,name=path_prefixes,json=pathPrefixes,proto3" json:"path_prefixes,omitempty"` + ClientId int32 `protobuf:"varint,7,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + UntilNs int64 `protobuf:"varint,8,opt,name=until_ns,json=untilNs,proto3" json:"until_ns,omitempty"` + ClientEpoch int32 `protobuf:"varint,9,opt,name=client_epoch,json=clientEpoch,proto3" json:"client_epoch,omitempty"` + Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch + ClientSupportsBatching bool `protobuf:"varint,11,opt,name=client_supports_batching,json=clientSupportsBatching,proto3" json:"client_supports_batching,omitempty"` // client can unpack SubscribeMetadataResponse.events + ClientSupportsMetadataChunks bool `protobuf:"varint,12,opt,name=client_supports_metadata_chunks,json=clientSupportsMetadataChunks,proto3" json:"client_supports_metadata_chunks,omitempty"` // client can read log file chunks from volume servers + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SubscribeMetadataRequest) Reset() { @@ -3014,12 +3015,20 @@ func (x *SubscribeMetadataRequest) GetClientSupportsBatching() bool { return false } +func (x *SubscribeMetadataRequest) GetClientSupportsMetadataChunks() bool { + if x != nil { + return x.ClientSupportsMetadataChunks + } + return false +} + type SubscribeMetadataResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` EventNotification *EventNotification `protobuf:"bytes,2,opt,name=event_notification,json=eventNotification,proto3" json:"event_notification,omitempty"` TsNs int64 `protobuf:"varint,3,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` - Events []*SubscribeMetadataResponse `protobuf:"bytes,4,rep,name=events,proto3" json:"events,omitempty"` // batch of additional events (backlog catch-up) + Events []*SubscribeMetadataResponse `protobuf:"bytes,4,rep,name=events,proto3" json:"events,omitempty"` // batch of additional events (backlog catch-up) + LogFileRefs []*LogFileChunkRef `protobuf:"bytes,5,rep,name=log_file_refs,json=logFileRefs,proto3" json:"log_file_refs,omitempty"` // log file chunk refs for client direct-read unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3082,6 +3091,76 @@ func (x *SubscribeMetadataResponse) GetEvents() []*SubscribeMetadataResponse { return nil } +func (x *SubscribeMetadataResponse) GetLogFileRefs() []*LogFileChunkRef { + if x != nil { + return x.LogFileRefs + } + return nil +} + +// A persisted log file that the client can read directly from volume servers. +// The file format is: [4-byte size | protobuf LogEntry] repeated. +// Each LogEntry.Data contains a marshaled SubscribeMetadataResponse. +type LogFileChunkRef struct { + state protoimpl.MessageState `protogen:"open.v1"` + Chunks []*FileChunk `protobuf:"bytes,1,rep,name=chunks,proto3" json:"chunks,omitempty"` // chunk references (fids) to read from volume servers + FileTsNs int64 `protobuf:"varint,2,opt,name=file_ts_ns,json=fileTsNs,proto3" json:"file_ts_ns,omitempty"` // minute-level timestamp of the log file + FilerId string `protobuf:"bytes,3,opt,name=filer_id,json=filerId,proto3" json:"filer_id,omitempty"` // filer signature suffix from log filename + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogFileChunkRef) Reset() { + *x = LogFileChunkRef{} + mi := &file_filer_proto_msgTypes[43] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogFileChunkRef) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogFileChunkRef) ProtoMessage() {} + +func (x *LogFileChunkRef) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[43] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogFileChunkRef.ProtoReflect.Descriptor instead. +func (*LogFileChunkRef) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{43} +} + +func (x *LogFileChunkRef) GetChunks() []*FileChunk { + if x != nil { + return x.Chunks + } + return nil +} + +func (x *LogFileChunkRef) GetFileTsNs() int64 { + if x != nil { + return x.FileTsNs + } + return 0 +} + +func (x *LogFileChunkRef) GetFilerId() string { + if x != nil { + return x.FilerId + } + return "" +} + type TraverseBfsMetadataRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` @@ -3092,7 +3171,7 @@ type TraverseBfsMetadataRequest struct { func (x *TraverseBfsMetadataRequest) Reset() { *x = TraverseBfsMetadataRequest{} - mi := &file_filer_proto_msgTypes[43] + mi := &file_filer_proto_msgTypes[44] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3104,7 +3183,7 @@ func (x *TraverseBfsMetadataRequest) String() string { func (*TraverseBfsMetadataRequest) ProtoMessage() {} func (x *TraverseBfsMetadataRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[43] + mi := &file_filer_proto_msgTypes[44] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3117,7 +3196,7 @@ func (x *TraverseBfsMetadataRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TraverseBfsMetadataRequest.ProtoReflect.Descriptor instead. func (*TraverseBfsMetadataRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{43} + return file_filer_proto_rawDescGZIP(), []int{44} } func (x *TraverseBfsMetadataRequest) GetDirectory() string { @@ -3144,7 +3223,7 @@ type TraverseBfsMetadataResponse struct { func (x *TraverseBfsMetadataResponse) Reset() { *x = TraverseBfsMetadataResponse{} - mi := &file_filer_proto_msgTypes[44] + mi := &file_filer_proto_msgTypes[45] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3156,7 +3235,7 @@ func (x *TraverseBfsMetadataResponse) String() string { func (*TraverseBfsMetadataResponse) ProtoMessage() {} func (x *TraverseBfsMetadataResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[44] + mi := &file_filer_proto_msgTypes[45] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3169,7 +3248,7 @@ func (x *TraverseBfsMetadataResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TraverseBfsMetadataResponse.ProtoReflect.Descriptor instead. func (*TraverseBfsMetadataResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{44} + return file_filer_proto_rawDescGZIP(), []int{45} } func (x *TraverseBfsMetadataResponse) GetDirectory() string { @@ -3199,7 +3278,7 @@ type LogEntry struct { func (x *LogEntry) Reset() { *x = LogEntry{} - mi := &file_filer_proto_msgTypes[45] + mi := &file_filer_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3211,7 +3290,7 @@ func (x *LogEntry) String() string { func (*LogEntry) ProtoMessage() {} func (x *LogEntry) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[45] + mi := &file_filer_proto_msgTypes[46] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3224,7 +3303,7 @@ func (x *LogEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry.ProtoReflect.Descriptor instead. func (*LogEntry) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{45} + return file_filer_proto_rawDescGZIP(), []int{46} } func (x *LogEntry) GetTsNs() int64 { @@ -3273,7 +3352,7 @@ type KeepConnectedRequest struct { func (x *KeepConnectedRequest) Reset() { *x = KeepConnectedRequest{} - mi := &file_filer_proto_msgTypes[46] + mi := &file_filer_proto_msgTypes[47] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3285,7 +3364,7 @@ func (x *KeepConnectedRequest) String() string { func (*KeepConnectedRequest) ProtoMessage() {} func (x *KeepConnectedRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[46] + mi := &file_filer_proto_msgTypes[47] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3298,7 +3377,7 @@ func (x *KeepConnectedRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use KeepConnectedRequest.ProtoReflect.Descriptor instead. func (*KeepConnectedRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{46} + return file_filer_proto_rawDescGZIP(), []int{47} } func (x *KeepConnectedRequest) GetName() string { @@ -3330,7 +3409,7 @@ type KeepConnectedResponse struct { func (x *KeepConnectedResponse) Reset() { *x = KeepConnectedResponse{} - mi := &file_filer_proto_msgTypes[47] + mi := &file_filer_proto_msgTypes[48] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3342,7 +3421,7 @@ func (x *KeepConnectedResponse) String() string { func (*KeepConnectedResponse) ProtoMessage() {} func (x *KeepConnectedResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[47] + mi := &file_filer_proto_msgTypes[48] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3355,7 +3434,7 @@ func (x *KeepConnectedResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use KeepConnectedResponse.ProtoReflect.Descriptor instead. func (*KeepConnectedResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{47} + return file_filer_proto_rawDescGZIP(), []int{48} } type LocateBrokerRequest struct { @@ -3367,7 +3446,7 @@ type LocateBrokerRequest struct { func (x *LocateBrokerRequest) Reset() { *x = LocateBrokerRequest{} - mi := &file_filer_proto_msgTypes[48] + mi := &file_filer_proto_msgTypes[49] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3379,7 +3458,7 @@ func (x *LocateBrokerRequest) String() string { func (*LocateBrokerRequest) ProtoMessage() {} func (x *LocateBrokerRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[48] + mi := &file_filer_proto_msgTypes[49] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3392,7 +3471,7 @@ func (x *LocateBrokerRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use LocateBrokerRequest.ProtoReflect.Descriptor instead. func (*LocateBrokerRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{48} + return file_filer_proto_rawDescGZIP(), []int{49} } func (x *LocateBrokerRequest) GetResource() string { @@ -3412,7 +3491,7 @@ type LocateBrokerResponse struct { func (x *LocateBrokerResponse) Reset() { *x = LocateBrokerResponse{} - mi := &file_filer_proto_msgTypes[49] + mi := &file_filer_proto_msgTypes[50] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3424,7 +3503,7 @@ func (x *LocateBrokerResponse) String() string { func (*LocateBrokerResponse) ProtoMessage() {} func (x *LocateBrokerResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[49] + mi := &file_filer_proto_msgTypes[50] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3437,7 +3516,7 @@ func (x *LocateBrokerResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use LocateBrokerResponse.ProtoReflect.Descriptor instead. func (*LocateBrokerResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{49} + return file_filer_proto_rawDescGZIP(), []int{50} } func (x *LocateBrokerResponse) GetFound() bool { @@ -3466,7 +3545,7 @@ type KvGetRequest struct { func (x *KvGetRequest) Reset() { *x = KvGetRequest{} - mi := &file_filer_proto_msgTypes[50] + mi := &file_filer_proto_msgTypes[51] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3478,7 +3557,7 @@ func (x *KvGetRequest) String() string { func (*KvGetRequest) ProtoMessage() {} func (x *KvGetRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[50] + mi := &file_filer_proto_msgTypes[51] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3491,7 +3570,7 @@ func (x *KvGetRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use KvGetRequest.ProtoReflect.Descriptor instead. func (*KvGetRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{50} + return file_filer_proto_rawDescGZIP(), []int{51} } func (x *KvGetRequest) GetKey() []byte { @@ -3511,7 +3590,7 @@ type KvGetResponse struct { func (x *KvGetResponse) Reset() { *x = KvGetResponse{} - mi := &file_filer_proto_msgTypes[51] + mi := &file_filer_proto_msgTypes[52] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3523,7 +3602,7 @@ func (x *KvGetResponse) String() string { func (*KvGetResponse) ProtoMessage() {} func (x *KvGetResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[51] + mi := &file_filer_proto_msgTypes[52] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3536,7 +3615,7 @@ func (x *KvGetResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use KvGetResponse.ProtoReflect.Descriptor instead. func (*KvGetResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{51} + return file_filer_proto_rawDescGZIP(), []int{52} } func (x *KvGetResponse) GetValue() []byte { @@ -3563,7 +3642,7 @@ type KvPutRequest struct { func (x *KvPutRequest) Reset() { *x = KvPutRequest{} - mi := &file_filer_proto_msgTypes[52] + mi := &file_filer_proto_msgTypes[53] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3575,7 +3654,7 @@ func (x *KvPutRequest) String() string { func (*KvPutRequest) ProtoMessage() {} func (x *KvPutRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[52] + mi := &file_filer_proto_msgTypes[53] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3588,7 +3667,7 @@ func (x *KvPutRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use KvPutRequest.ProtoReflect.Descriptor instead. func (*KvPutRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{52} + return file_filer_proto_rawDescGZIP(), []int{53} } func (x *KvPutRequest) GetKey() []byte { @@ -3614,7 +3693,7 @@ type KvPutResponse struct { func (x *KvPutResponse) Reset() { *x = KvPutResponse{} - mi := &file_filer_proto_msgTypes[53] + mi := &file_filer_proto_msgTypes[54] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3626,7 +3705,7 @@ func (x *KvPutResponse) String() string { func (*KvPutResponse) ProtoMessage() {} func (x *KvPutResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[53] + mi := &file_filer_proto_msgTypes[54] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3639,7 +3718,7 @@ func (x *KvPutResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use KvPutResponse.ProtoReflect.Descriptor instead. func (*KvPutResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{53} + return file_filer_proto_rawDescGZIP(), []int{54} } func (x *KvPutResponse) GetError() string { @@ -3662,7 +3741,7 @@ type FilerConf struct { func (x *FilerConf) Reset() { *x = FilerConf{} - mi := &file_filer_proto_msgTypes[54] + mi := &file_filer_proto_msgTypes[55] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3674,7 +3753,7 @@ func (x *FilerConf) String() string { func (*FilerConf) ProtoMessage() {} func (x *FilerConf) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[54] + mi := &file_filer_proto_msgTypes[55] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3687,7 +3766,7 @@ func (x *FilerConf) ProtoReflect() protoreflect.Message { // Deprecated: Use FilerConf.ProtoReflect.Descriptor instead. func (*FilerConf) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{54} + return file_filer_proto_rawDescGZIP(), []int{55} } func (x *FilerConf) GetVersion() int32 { @@ -3719,7 +3798,7 @@ type CacheRemoteObjectToLocalClusterRequest struct { func (x *CacheRemoteObjectToLocalClusterRequest) Reset() { *x = CacheRemoteObjectToLocalClusterRequest{} - mi := &file_filer_proto_msgTypes[55] + mi := &file_filer_proto_msgTypes[56] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3731,7 +3810,7 @@ func (x *CacheRemoteObjectToLocalClusterRequest) String() string { func (*CacheRemoteObjectToLocalClusterRequest) ProtoMessage() {} func (x *CacheRemoteObjectToLocalClusterRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[55] + mi := &file_filer_proto_msgTypes[56] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3744,7 +3823,7 @@ func (x *CacheRemoteObjectToLocalClusterRequest) ProtoReflect() protoreflect.Mes // Deprecated: Use CacheRemoteObjectToLocalClusterRequest.ProtoReflect.Descriptor instead. func (*CacheRemoteObjectToLocalClusterRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{55} + return file_filer_proto_rawDescGZIP(), []int{56} } func (x *CacheRemoteObjectToLocalClusterRequest) GetDirectory() string { @@ -3785,7 +3864,7 @@ type CacheRemoteObjectToLocalClusterResponse struct { func (x *CacheRemoteObjectToLocalClusterResponse) Reset() { *x = CacheRemoteObjectToLocalClusterResponse{} - mi := &file_filer_proto_msgTypes[56] + mi := &file_filer_proto_msgTypes[57] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3797,7 +3876,7 @@ func (x *CacheRemoteObjectToLocalClusterResponse) String() string { func (*CacheRemoteObjectToLocalClusterResponse) ProtoMessage() {} func (x *CacheRemoteObjectToLocalClusterResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[56] + mi := &file_filer_proto_msgTypes[57] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3810,7 +3889,7 @@ func (x *CacheRemoteObjectToLocalClusterResponse) ProtoReflect() protoreflect.Me // Deprecated: Use CacheRemoteObjectToLocalClusterResponse.ProtoReflect.Descriptor instead. func (*CacheRemoteObjectToLocalClusterResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{56} + return file_filer_proto_rawDescGZIP(), []int{57} } func (x *CacheRemoteObjectToLocalClusterResponse) GetEntry() *Entry { @@ -3843,7 +3922,7 @@ type LockRequest struct { func (x *LockRequest) Reset() { *x = LockRequest{} - mi := &file_filer_proto_msgTypes[57] + mi := &file_filer_proto_msgTypes[58] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3855,7 +3934,7 @@ func (x *LockRequest) String() string { func (*LockRequest) ProtoMessage() {} func (x *LockRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[57] + mi := &file_filer_proto_msgTypes[58] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3868,7 +3947,7 @@ func (x *LockRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use LockRequest.ProtoReflect.Descriptor instead. func (*LockRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{57} + return file_filer_proto_rawDescGZIP(), []int{58} } func (x *LockRequest) GetName() string { @@ -3918,7 +3997,7 @@ type LockResponse struct { func (x *LockResponse) Reset() { *x = LockResponse{} - mi := &file_filer_proto_msgTypes[58] + mi := &file_filer_proto_msgTypes[59] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3930,7 +4009,7 @@ func (x *LockResponse) String() string { func (*LockResponse) ProtoMessage() {} func (x *LockResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[58] + mi := &file_filer_proto_msgTypes[59] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3943,7 +4022,7 @@ func (x *LockResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use LockResponse.ProtoReflect.Descriptor instead. func (*LockResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{58} + return file_filer_proto_rawDescGZIP(), []int{59} } func (x *LockResponse) GetRenewToken() string { @@ -3985,7 +4064,7 @@ type UnlockRequest struct { func (x *UnlockRequest) Reset() { *x = UnlockRequest{} - mi := &file_filer_proto_msgTypes[59] + mi := &file_filer_proto_msgTypes[60] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3997,7 +4076,7 @@ func (x *UnlockRequest) String() string { func (*UnlockRequest) ProtoMessage() {} func (x *UnlockRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[59] + mi := &file_filer_proto_msgTypes[60] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4010,7 +4089,7 @@ func (x *UnlockRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UnlockRequest.ProtoReflect.Descriptor instead. func (*UnlockRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{59} + return file_filer_proto_rawDescGZIP(), []int{60} } func (x *UnlockRequest) GetName() string { @@ -4044,7 +4123,7 @@ type UnlockResponse struct { func (x *UnlockResponse) Reset() { *x = UnlockResponse{} - mi := &file_filer_proto_msgTypes[60] + mi := &file_filer_proto_msgTypes[61] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4056,7 +4135,7 @@ func (x *UnlockResponse) String() string { func (*UnlockResponse) ProtoMessage() {} func (x *UnlockResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[60] + mi := &file_filer_proto_msgTypes[61] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4069,7 +4148,7 @@ func (x *UnlockResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use UnlockResponse.ProtoReflect.Descriptor instead. func (*UnlockResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{60} + return file_filer_proto_rawDescGZIP(), []int{61} } func (x *UnlockResponse) GetError() string { @@ -4096,7 +4175,7 @@ type FindLockOwnerRequest struct { func (x *FindLockOwnerRequest) Reset() { *x = FindLockOwnerRequest{} - mi := &file_filer_proto_msgTypes[61] + mi := &file_filer_proto_msgTypes[62] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4108,7 +4187,7 @@ func (x *FindLockOwnerRequest) String() string { func (*FindLockOwnerRequest) ProtoMessage() {} func (x *FindLockOwnerRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[61] + mi := &file_filer_proto_msgTypes[62] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4121,7 +4200,7 @@ func (x *FindLockOwnerRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use FindLockOwnerRequest.ProtoReflect.Descriptor instead. func (*FindLockOwnerRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{61} + return file_filer_proto_rawDescGZIP(), []int{62} } func (x *FindLockOwnerRequest) GetName() string { @@ -4147,7 +4226,7 @@ type FindLockOwnerResponse struct { func (x *FindLockOwnerResponse) Reset() { *x = FindLockOwnerResponse{} - mi := &file_filer_proto_msgTypes[62] + mi := &file_filer_proto_msgTypes[63] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4159,7 +4238,7 @@ func (x *FindLockOwnerResponse) String() string { func (*FindLockOwnerResponse) ProtoMessage() {} func (x *FindLockOwnerResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[62] + mi := &file_filer_proto_msgTypes[63] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4172,7 +4251,7 @@ func (x *FindLockOwnerResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use FindLockOwnerResponse.ProtoReflect.Descriptor instead. func (*FindLockOwnerResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{62} + return file_filer_proto_rawDescGZIP(), []int{63} } func (x *FindLockOwnerResponse) GetOwner() string { @@ -4194,7 +4273,7 @@ type Lock struct { func (x *Lock) Reset() { *x = Lock{} - mi := &file_filer_proto_msgTypes[63] + mi := &file_filer_proto_msgTypes[64] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4206,7 +4285,7 @@ func (x *Lock) String() string { func (*Lock) ProtoMessage() {} func (x *Lock) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[63] + mi := &file_filer_proto_msgTypes[64] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4219,7 +4298,7 @@ func (x *Lock) ProtoReflect() protoreflect.Message { // Deprecated: Use Lock.ProtoReflect.Descriptor instead. func (*Lock) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{63} + return file_filer_proto_rawDescGZIP(), []int{64} } func (x *Lock) GetName() string { @@ -4259,7 +4338,7 @@ type TransferLocksRequest struct { func (x *TransferLocksRequest) Reset() { *x = TransferLocksRequest{} - mi := &file_filer_proto_msgTypes[64] + mi := &file_filer_proto_msgTypes[65] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4271,7 +4350,7 @@ func (x *TransferLocksRequest) String() string { func (*TransferLocksRequest) ProtoMessage() {} func (x *TransferLocksRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[64] + mi := &file_filer_proto_msgTypes[65] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4284,7 +4363,7 @@ func (x *TransferLocksRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TransferLocksRequest.ProtoReflect.Descriptor instead. func (*TransferLocksRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{64} + return file_filer_proto_rawDescGZIP(), []int{65} } func (x *TransferLocksRequest) GetLocks() []*Lock { @@ -4302,7 +4381,7 @@ type TransferLocksResponse struct { func (x *TransferLocksResponse) Reset() { *x = TransferLocksResponse{} - mi := &file_filer_proto_msgTypes[65] + mi := &file_filer_proto_msgTypes[66] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4314,7 +4393,7 @@ func (x *TransferLocksResponse) String() string { func (*TransferLocksResponse) ProtoMessage() {} func (x *TransferLocksResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[65] + mi := &file_filer_proto_msgTypes[66] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4327,7 +4406,7 @@ func (x *TransferLocksResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TransferLocksResponse.ProtoReflect.Descriptor instead. func (*TransferLocksResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{65} + return file_filer_proto_rawDescGZIP(), []int{66} } type StreamMutateEntryRequest struct { @@ -4346,7 +4425,7 @@ type StreamMutateEntryRequest struct { func (x *StreamMutateEntryRequest) Reset() { *x = StreamMutateEntryRequest{} - mi := &file_filer_proto_msgTypes[66] + mi := &file_filer_proto_msgTypes[67] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4358,7 +4437,7 @@ func (x *StreamMutateEntryRequest) String() string { func (*StreamMutateEntryRequest) ProtoMessage() {} func (x *StreamMutateEntryRequest) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[66] + mi := &file_filer_proto_msgTypes[67] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4371,7 +4450,7 @@ func (x *StreamMutateEntryRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamMutateEntryRequest.ProtoReflect.Descriptor instead. func (*StreamMutateEntryRequest) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{66} + return file_filer_proto_rawDescGZIP(), []int{67} } func (x *StreamMutateEntryRequest) GetRequestId() uint64 { @@ -4471,7 +4550,7 @@ type StreamMutateEntryResponse struct { func (x *StreamMutateEntryResponse) Reset() { *x = StreamMutateEntryResponse{} - mi := &file_filer_proto_msgTypes[67] + mi := &file_filer_proto_msgTypes[68] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4483,7 +4562,7 @@ func (x *StreamMutateEntryResponse) String() string { func (*StreamMutateEntryResponse) ProtoMessage() {} func (x *StreamMutateEntryResponse) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[67] + mi := &file_filer_proto_msgTypes[68] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4496,7 +4575,7 @@ func (x *StreamMutateEntryResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamMutateEntryResponse.ProtoReflect.Descriptor instead. func (*StreamMutateEntryResponse) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{67} + return file_filer_proto_rawDescGZIP(), []int{68} } func (x *StreamMutateEntryResponse) GetRequestId() uint64 { @@ -4610,7 +4689,7 @@ type LocateBrokerResponse_Resource struct { func (x *LocateBrokerResponse_Resource) Reset() { *x = LocateBrokerResponse_Resource{} - mi := &file_filer_proto_msgTypes[71] + mi := &file_filer_proto_msgTypes[72] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4622,7 +4701,7 @@ func (x *LocateBrokerResponse_Resource) String() string { func (*LocateBrokerResponse_Resource) ProtoMessage() {} func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[71] + mi := &file_filer_proto_msgTypes[72] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4635,7 +4714,7 @@ func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message { // Deprecated: Use LocateBrokerResponse_Resource.ProtoReflect.Descriptor instead. func (*LocateBrokerResponse_Resource) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{49, 0} + return file_filer_proto_rawDescGZIP(), []int{50, 0} } func (x *LocateBrokerResponse_Resource) GetGrpcAddresses() string { @@ -4676,7 +4755,7 @@ type FilerConf_PathConf struct { func (x *FilerConf_PathConf) Reset() { *x = FilerConf_PathConf{} - mi := &file_filer_proto_msgTypes[72] + mi := &file_filer_proto_msgTypes[73] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4688,7 +4767,7 @@ func (x *FilerConf_PathConf) String() string { func (*FilerConf_PathConf) ProtoMessage() {} func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[72] + mi := &file_filer_proto_msgTypes[73] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4701,7 +4780,7 @@ func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message { // Deprecated: Use FilerConf_PathConf.ProtoReflect.Descriptor instead. func (*FilerConf_PathConf) Descriptor() ([]byte, []int) { - return file_filer_proto_rawDescGZIP(), []int{54, 0} + return file_filer_proto_rawDescGZIP(), []int{55, 0} } func (x *FilerConf_PathConf) GetLocationPrefix() string { @@ -5079,7 +5158,7 @@ const file_filer_proto_rawDesc = "" + "\vfiler_group\x18\r \x01(\tR\n" + "filerGroup\x12#\n" + "\rmajor_version\x18\x0e \x01(\x05R\fmajorVersion\x12#\n" + - "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xf1\x02\n" + + "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xb8\x03\n" + "\x18SubscribeMetadataRequest\x12\x1f\n" + "\vclient_name\x18\x01 \x01(\tR\n" + "clientName\x12\x1f\n" + @@ -5093,12 +5172,19 @@ const file_filer_proto_rawDesc = "" + "\fclient_epoch\x18\t \x01(\x05R\vclientEpoch\x12 \n" + "\vdirectories\x18\n" + " \x03(\tR\vdirectories\x128\n" + - "\x18client_supports_batching\x18\v \x01(\bR\x16clientSupportsBatching\"\xd7\x01\n" + + "\x18client_supports_batching\x18\v \x01(\bR\x16clientSupportsBatching\x12E\n" + + "\x1fclient_supports_metadata_chunks\x18\f \x01(\bR\x1cclientSupportsMetadataChunks\"\x96\x02\n" + "\x19SubscribeMetadataResponse\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12J\n" + "\x12event_notification\x18\x02 \x01(\v2\x1b.filer_pb.EventNotificationR\x11eventNotification\x12\x13\n" + "\x05ts_ns\x18\x03 \x01(\x03R\x04tsNs\x12;\n" + - "\x06events\x18\x04 \x03(\v2#.filer_pb.SubscribeMetadataResponseR\x06events\"g\n" + + "\x06events\x18\x04 \x03(\v2#.filer_pb.SubscribeMetadataResponseR\x06events\x12=\n" + + "\rlog_file_refs\x18\x05 \x03(\v2\x19.filer_pb.LogFileChunkRefR\vlogFileRefs\"w\n" + + "\x0fLogFileChunkRef\x12+\n" + + "\x06chunks\x18\x01 \x03(\v2\x13.filer_pb.FileChunkR\x06chunks\x12\x1c\n" + + "\n" + + "file_ts_ns\x18\x02 \x01(\x03R\bfileTsNs\x12\x19\n" + + "\bfiler_id\x18\x03 \x01(\tR\afilerId\"g\n" + "\x1aTraverseBfsMetadataRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12+\n" + "\x11excluded_prefixes\x18\x02 \x03(\tR\x10excludedPrefixes\"b\n" + @@ -5280,7 +5366,7 @@ func file_filer_proto_rawDescGZIP() []byte { } var file_filer_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 73) +var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 74) var file_filer_proto_goTypes = []any{ (SSEType)(0), // 0: filer_pb.SSEType (FilerError)(0), // 1: filer_pb.FilerError @@ -5327,43 +5413,44 @@ var file_filer_proto_goTypes = []any{ (*GetFilerConfigurationResponse)(nil), // 42: filer_pb.GetFilerConfigurationResponse (*SubscribeMetadataRequest)(nil), // 43: filer_pb.SubscribeMetadataRequest (*SubscribeMetadataResponse)(nil), // 44: filer_pb.SubscribeMetadataResponse - (*TraverseBfsMetadataRequest)(nil), // 45: filer_pb.TraverseBfsMetadataRequest - (*TraverseBfsMetadataResponse)(nil), // 46: filer_pb.TraverseBfsMetadataResponse - (*LogEntry)(nil), // 47: filer_pb.LogEntry - (*KeepConnectedRequest)(nil), // 48: filer_pb.KeepConnectedRequest - (*KeepConnectedResponse)(nil), // 49: filer_pb.KeepConnectedResponse - (*LocateBrokerRequest)(nil), // 50: filer_pb.LocateBrokerRequest - (*LocateBrokerResponse)(nil), // 51: filer_pb.LocateBrokerResponse - (*KvGetRequest)(nil), // 52: filer_pb.KvGetRequest - (*KvGetResponse)(nil), // 53: filer_pb.KvGetResponse - (*KvPutRequest)(nil), // 54: filer_pb.KvPutRequest - (*KvPutResponse)(nil), // 55: filer_pb.KvPutResponse - (*FilerConf)(nil), // 56: filer_pb.FilerConf - (*CacheRemoteObjectToLocalClusterRequest)(nil), // 57: filer_pb.CacheRemoteObjectToLocalClusterRequest - (*CacheRemoteObjectToLocalClusterResponse)(nil), // 58: filer_pb.CacheRemoteObjectToLocalClusterResponse - (*LockRequest)(nil), // 59: filer_pb.LockRequest - (*LockResponse)(nil), // 60: filer_pb.LockResponse - (*UnlockRequest)(nil), // 61: filer_pb.UnlockRequest - (*UnlockResponse)(nil), // 62: filer_pb.UnlockResponse - (*FindLockOwnerRequest)(nil), // 63: filer_pb.FindLockOwnerRequest - (*FindLockOwnerResponse)(nil), // 64: filer_pb.FindLockOwnerResponse - (*Lock)(nil), // 65: filer_pb.Lock - (*TransferLocksRequest)(nil), // 66: filer_pb.TransferLocksRequest - (*TransferLocksResponse)(nil), // 67: filer_pb.TransferLocksResponse - (*StreamMutateEntryRequest)(nil), // 68: filer_pb.StreamMutateEntryRequest - (*StreamMutateEntryResponse)(nil), // 69: filer_pb.StreamMutateEntryResponse - nil, // 70: filer_pb.Entry.ExtendedEntry - nil, // 71: filer_pb.UpdateEntryRequest.ExpectedExtendedEntry - nil, // 72: filer_pb.LookupVolumeResponse.LocationsMapEntry - (*LocateBrokerResponse_Resource)(nil), // 73: filer_pb.LocateBrokerResponse.Resource - (*FilerConf_PathConf)(nil), // 74: filer_pb.FilerConf.PathConf + (*LogFileChunkRef)(nil), // 45: filer_pb.LogFileChunkRef + (*TraverseBfsMetadataRequest)(nil), // 46: filer_pb.TraverseBfsMetadataRequest + (*TraverseBfsMetadataResponse)(nil), // 47: filer_pb.TraverseBfsMetadataResponse + (*LogEntry)(nil), // 48: filer_pb.LogEntry + (*KeepConnectedRequest)(nil), // 49: filer_pb.KeepConnectedRequest + (*KeepConnectedResponse)(nil), // 50: filer_pb.KeepConnectedResponse + (*LocateBrokerRequest)(nil), // 51: filer_pb.LocateBrokerRequest + (*LocateBrokerResponse)(nil), // 52: filer_pb.LocateBrokerResponse + (*KvGetRequest)(nil), // 53: filer_pb.KvGetRequest + (*KvGetResponse)(nil), // 54: filer_pb.KvGetResponse + (*KvPutRequest)(nil), // 55: filer_pb.KvPutRequest + (*KvPutResponse)(nil), // 56: filer_pb.KvPutResponse + (*FilerConf)(nil), // 57: filer_pb.FilerConf + (*CacheRemoteObjectToLocalClusterRequest)(nil), // 58: filer_pb.CacheRemoteObjectToLocalClusterRequest + (*CacheRemoteObjectToLocalClusterResponse)(nil), // 59: filer_pb.CacheRemoteObjectToLocalClusterResponse + (*LockRequest)(nil), // 60: filer_pb.LockRequest + (*LockResponse)(nil), // 61: filer_pb.LockResponse + (*UnlockRequest)(nil), // 62: filer_pb.UnlockRequest + (*UnlockResponse)(nil), // 63: filer_pb.UnlockResponse + (*FindLockOwnerRequest)(nil), // 64: filer_pb.FindLockOwnerRequest + (*FindLockOwnerResponse)(nil), // 65: filer_pb.FindLockOwnerResponse + (*Lock)(nil), // 66: filer_pb.Lock + (*TransferLocksRequest)(nil), // 67: filer_pb.TransferLocksRequest + (*TransferLocksResponse)(nil), // 68: filer_pb.TransferLocksResponse + (*StreamMutateEntryRequest)(nil), // 69: filer_pb.StreamMutateEntryRequest + (*StreamMutateEntryResponse)(nil), // 70: filer_pb.StreamMutateEntryResponse + nil, // 71: filer_pb.Entry.ExtendedEntry + nil, // 72: filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + nil, // 73: filer_pb.LookupVolumeResponse.LocationsMapEntry + (*LocateBrokerResponse_Resource)(nil), // 74: filer_pb.LocateBrokerResponse.Resource + (*FilerConf_PathConf)(nil), // 75: filer_pb.FilerConf.PathConf } var file_filer_proto_depIdxs = []int32{ 7, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry 7, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry 10, // 2: filer_pb.Entry.chunks:type_name -> filer_pb.FileChunk 13, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes - 70, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry + 71, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry 6, // 5: filer_pb.Entry.remote_entry:type_name -> filer_pb.RemoteEntry 7, // 6: filer_pb.FullEntry.entry:type_name -> filer_pb.Entry 7, // 7: filer_pb.EventNotification.old_entry:type_name -> filer_pb.Entry @@ -5376,89 +5463,91 @@ var file_filer_proto_depIdxs = []int32{ 44, // 14: filer_pb.CreateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 1, // 15: filer_pb.CreateEntryResponse.error_code:type_name -> filer_pb.FilerError 7, // 16: filer_pb.UpdateEntryRequest.entry:type_name -> filer_pb.Entry - 71, // 17: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + 72, // 17: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry 44, // 18: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 10, // 19: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk 44, // 20: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 9, // 21: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification 30, // 22: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location 30, // 23: filer_pb.Locations.locations:type_name -> filer_pb.Location - 72, // 24: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry + 73, // 24: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry 32, // 25: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection 9, // 26: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification 44, // 27: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse - 7, // 28: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry - 73, // 29: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource - 74, // 30: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf - 7, // 31: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry - 44, // 32: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 65, // 33: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock - 14, // 34: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest - 16, // 35: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest - 20, // 36: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest - 24, // 37: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest - 15, // 38: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse - 17, // 39: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse - 21, // 40: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse - 25, // 41: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse - 29, // 42: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations - 2, // 43: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest - 4, // 44: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest - 14, // 45: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest - 16, // 46: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest - 18, // 47: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest - 20, // 48: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest - 22, // 49: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest - 24, // 50: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest - 68, // 51: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest - 26, // 52: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest - 28, // 53: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest - 33, // 54: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest - 35, // 55: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest - 37, // 56: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest - 39, // 57: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest - 41, // 58: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest - 45, // 59: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest - 43, // 60: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 43, // 61: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 52, // 62: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest - 54, // 63: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest - 57, // 64: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest - 59, // 65: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest - 61, // 66: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest - 63, // 67: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest - 66, // 68: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest - 3, // 69: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse - 5, // 70: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse - 15, // 71: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse - 17, // 72: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse - 19, // 73: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse - 21, // 74: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse - 23, // 75: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse - 25, // 76: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse - 69, // 77: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse - 27, // 78: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse - 31, // 79: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse - 34, // 80: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse - 36, // 81: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse - 38, // 82: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse - 40, // 83: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse - 42, // 84: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse - 46, // 85: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse - 44, // 86: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 44, // 87: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 53, // 88: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse - 55, // 89: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse - 58, // 90: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse - 60, // 91: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse - 62, // 92: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse - 64, // 93: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse - 67, // 94: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse - 69, // [69:95] is the sub-list for method output_type - 43, // [43:69] is the sub-list for method input_type - 43, // [43:43] is the sub-list for extension type_name - 43, // [43:43] is the sub-list for extension extendee - 0, // [0:43] is the sub-list for field type_name + 45, // 28: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef + 10, // 29: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk + 7, // 30: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry + 74, // 31: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource + 75, // 32: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf + 7, // 33: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry + 44, // 34: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 66, // 35: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock + 14, // 36: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest + 16, // 37: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest + 20, // 38: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest + 24, // 39: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest + 15, // 40: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse + 17, // 41: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse + 21, // 42: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse + 25, // 43: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse + 29, // 44: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations + 2, // 45: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest + 4, // 46: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest + 14, // 47: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest + 16, // 48: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest + 18, // 49: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest + 20, // 50: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest + 22, // 51: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest + 24, // 52: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest + 69, // 53: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest + 26, // 54: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest + 28, // 55: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest + 33, // 56: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest + 35, // 57: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest + 37, // 58: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest + 39, // 59: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest + 41, // 60: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest + 46, // 61: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest + 43, // 62: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 43, // 63: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 53, // 64: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest + 55, // 65: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest + 58, // 66: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest + 60, // 67: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest + 62, // 68: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest + 64, // 69: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest + 67, // 70: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest + 3, // 71: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse + 5, // 72: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse + 15, // 73: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse + 17, // 74: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse + 19, // 75: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse + 21, // 76: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse + 23, // 77: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse + 25, // 78: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse + 70, // 79: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse + 27, // 80: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse + 31, // 81: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse + 34, // 82: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse + 36, // 83: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse + 38, // 84: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse + 40, // 85: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse + 42, // 86: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse + 47, // 87: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse + 44, // 88: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 44, // 89: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 54, // 90: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse + 56, // 91: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse + 59, // 92: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse + 61, // 93: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse + 63, // 94: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse + 65, // 95: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse + 68, // 96: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse + 71, // [71:97] is the sub-list for method output_type + 45, // [45:71] is the sub-list for method input_type + 45, // [45:45] is the sub-list for extension type_name + 45, // [45:45] is the sub-list for extension extendee + 0, // [0:45] is the sub-list for field type_name } func init() { file_filer_proto_init() } @@ -5466,13 +5555,13 @@ func file_filer_proto_init() { if File_filer_proto != nil { return } - file_filer_proto_msgTypes[66].OneofWrappers = []any{ + file_filer_proto_msgTypes[67].OneofWrappers = []any{ (*StreamMutateEntryRequest_CreateRequest)(nil), (*StreamMutateEntryRequest_UpdateRequest)(nil), (*StreamMutateEntryRequest_DeleteRequest)(nil), (*StreamMutateEntryRequest_RenameRequest)(nil), } - file_filer_proto_msgTypes[67].OneofWrappers = []any{ + file_filer_proto_msgTypes[68].OneofWrappers = []any{ (*StreamMutateEntryResponse_CreateResponse)(nil), (*StreamMutateEntryResponse_UpdateResponse)(nil), (*StreamMutateEntryResponse_DeleteResponse)(nil), @@ -5484,7 +5573,7 @@ func file_filer_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_filer_proto_rawDesc), len(file_filer_proto_rawDesc)), NumEnums: 2, - NumMessages: 73, + NumMessages: 74, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/filer_pb_direct_read.go b/weed/pb/filer_pb_direct_read.go new file mode 100644 index 000000000..7da56a215 --- /dev/null +++ b/weed/pb/filer_pb_direct_read.go @@ -0,0 +1,382 @@ +package pb + +import ( + "container/heap" + "fmt" + "io" + "strings" + "sync" + + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// LogFileReaderFn creates an io.ReadCloser for a set of file chunks. +type LogFileReaderFn func(chunks []*filer_pb.FileChunk) (io.ReadCloser, error) + +// PathFilter holds subscription path filtering parameters, matching the +// server-side eachEventNotificationFn filtering logic. +type PathFilter struct { + PathPrefix string + AdditionalPathPrefixes []string + DirectoriesToWatch []string +} + +// ReadLogFileRefs reads log file data directly from volume servers using the +// chunk references, merges entries from multiple filers in timestamp order +// (same algorithm as the server's OrderedLogVisitor), applies path filtering, +// and invokes processEventFn for each matching event. +// +// Filers are read in parallel (one goroutine per filer). Within each filer, +// the next file is prefetched while the current file's entries are consumed. +func ReadLogFileRefs( + refs []*filer_pb.LogFileChunkRef, + newReader LogFileReaderFn, + startTsNs, stopTsNs int64, + filter PathFilter, + processEventFn ProcessMetadataFunc, +) (lastTsNs int64, err error) { + + if len(refs) == 0 { + return + } + + // Group refs by filer ID, preserving order within each filer. + perFiler := make(map[string][]*filer_pb.LogFileChunkRef) + var filerOrder []string + for _, ref := range refs { + if len(ref.Chunks) == 0 { + continue + } + if _, seen := perFiler[ref.FilerId]; !seen { + filerOrder = append(filerOrder, ref.FilerId) + } + perFiler[ref.FilerId] = append(perFiler[ref.FilerId], ref) + } + + if len(filerOrder) == 0 { + return + } + + // Single filer fast path: no merge heap needed. + if len(filerOrder) == 1 { + return readFilerFilesWithPrefetch(perFiler[filerOrder[0]], newReader, startTsNs, stopTsNs, filter, processEventFn) + } + + // Multiple filers: read each in parallel with prefetching, merge via min-heap. + return readMultiFilersMerged(filerOrder, perFiler, newReader, startTsNs, stopTsNs, filter, processEventFn) +} + +// readFilerFilesWithPrefetch reads files for a single filer, prefetching the +// next file while processing entries from the current one. +func readFilerFilesWithPrefetch( + refs []*filer_pb.LogFileChunkRef, + newReader LogFileReaderFn, + startTsNs, stopTsNs int64, + filter PathFilter, + processEventFn ProcessMetadataFunc, +) (lastTsNs int64, err error) { + + type prefetchResult struct { + entries []*filer_pb.LogEntry + err error + } + + startPrefetch := func(ref *filer_pb.LogFileChunkRef) chan prefetchResult { + ch := make(chan prefetchResult, 1) + go func() { + entries, readErr := readLogFileEntries(newReader, ref.Chunks, startTsNs, stopTsNs) + ch <- prefetchResult{entries, readErr} + }() + return ch + } + + var pendingCh chan prefetchResult + if len(refs) > 0 { + pendingCh = startPrefetch(refs[0]) + } + + for i, ref := range refs { + result := <-pendingCh + + // Start prefetching next file while we process current + if i+1 < len(refs) { + pendingCh = startPrefetch(refs[i+1]) + } + + if result.err != nil { + if isChunkNotFound(result.err) { + glog.V(0).Infof("skip log file filer=%s ts=%d: %v", ref.FilerId, ref.FileTsNs, result.err) + continue + } + return lastTsNs, fmt.Errorf("read log file filer=%s ts=%d: %w", ref.FilerId, ref.FileTsNs, result.err) + } + + for _, logEntry := range result.entries { + lastTsNs, err = processOneLogEntry(logEntry, filter, processEventFn) + if err != nil { + return + } + } + } + return +} + +// readMultiFilersMerged reads files from multiple filers in parallel (one goroutine +// per filer with prefetching), then merges entries in timestamp order via min-heap. +func readMultiFilersMerged( + filerOrder []string, + perFiler map[string][]*filer_pb.LogFileChunkRef, + newReader LogFileReaderFn, + startTsNs, stopTsNs int64, + filter PathFilter, + processEventFn ProcessMetadataFunc, +) (lastTsNs int64, err error) { + + type filerStream struct { + filerId string + entryCh chan *filer_pb.LogEntry + } + + streams := make([]filerStream, len(filerOrder)) + var wg sync.WaitGroup + + for i, filerId := range filerOrder { + entryCh := make(chan *filer_pb.LogEntry, 512) + streams[i] = filerStream{filerId: filerId, entryCh: entryCh} + + wg.Add(1) + go func(refs []*filer_pb.LogFileChunkRef, ch chan *filer_pb.LogEntry) { + defer wg.Done() + defer close(ch) + readFilerFilesToChannel(refs, newReader, startTsNs, stopTsNs, ch) + }(perFiler[filerId], entryCh) + } + + // Seed the min-heap with the first entry from each filer + pq := &logEntryHeap{} + heap.Init(pq) + for i := range streams { + if entry, ok := <-streams[i].entryCh; ok { + heap.Push(pq, &logEntryHeapItem{entry: entry, filerIdx: i}) + } + } + + // Merge loop + for pq.Len() > 0 { + item := heap.Pop(pq).(*logEntryHeapItem) + + lastTsNs, err = processOneLogEntry(item.entry, filter, processEventFn) + if err != nil { + for i := range streams { + for range streams[i].entryCh { + } + } + wg.Wait() + return + } + + if entry, ok := <-streams[item.filerIdx].entryCh; ok { + heap.Push(pq, &logEntryHeapItem{entry: entry, filerIdx: item.filerIdx}) + } + } + + wg.Wait() + return +} + +func readFilerFilesToChannel( + refs []*filer_pb.LogFileChunkRef, + newReader LogFileReaderFn, + startTsNs, stopTsNs int64, + ch chan *filer_pb.LogEntry, +) { + type prefetchResult struct { + entries []*filer_pb.LogEntry + err error + } + + startPrefetch := func(ref *filer_pb.LogFileChunkRef) chan prefetchResult { + resultCh := make(chan prefetchResult, 1) + go func() { + entries, err := readLogFileEntries(newReader, ref.Chunks, startTsNs, stopTsNs) + resultCh <- prefetchResult{entries, err} + }() + return resultCh + } + + var pendingCh chan prefetchResult + if len(refs) > 0 { + pendingCh = startPrefetch(refs[0]) + } + + for i, ref := range refs { + result := <-pendingCh + + if i+1 < len(refs) { + pendingCh = startPrefetch(refs[i+1]) + } + + if result.err != nil { + if isChunkNotFound(result.err) { + glog.V(0).Infof("skip log file filer=%s ts=%d: %v", ref.FilerId, ref.FileTsNs, result.err) + } else { + glog.Errorf("read log file filer=%s ts=%d: %v", ref.FilerId, ref.FileTsNs, result.err) + } + continue + } + + for _, entry := range result.entries { + ch <- entry + } + } +} + +func processOneLogEntry(logEntry *filer_pb.LogEntry, filter PathFilter, processEventFn ProcessMetadataFunc) (int64, error) { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unmarshal log entry: %v", err) + return 0, nil // skip corrupt entries + } + if !matchesFilter(event, filter) { + return event.TsNs, nil + } + if err := processEventFn(event); err != nil { + return event.TsNs, fmt.Errorf("process event: %w", err) + } + return event.TsNs, nil +} + +// --- path filtering (mirrors server-side eachEventNotificationFn logic) --- + +const systemLogDir = "/topics/.system/log" + +func matchesFilter(resp *filer_pb.SubscribeMetadataResponse, filter PathFilter) bool { + var entryName string + if resp.EventNotification != nil { + if resp.EventNotification.OldEntry != nil { + entryName = resp.EventNotification.OldEntry.Name + } else if resp.EventNotification.NewEntry != nil { + entryName = resp.EventNotification.NewEntry.Name + } + } + + fullpath := util.Join(resp.Directory, entryName) + + // Skip internal meta log entries + if strings.HasPrefix(fullpath, systemLogDir) { + return false + } + + // Check AdditionalPathPrefixes + for _, p := range filter.AdditionalPathPrefixes { + if strings.HasPrefix(fullpath, p) { + return true + } + } + + // Check DirectoriesToWatch (exact directory match) + for _, dir := range filter.DirectoriesToWatch { + if resp.Directory == dir { + return true + } + } + + // Check primary PathPrefix + if filter.PathPrefix == "" || filter.PathPrefix == "/" { + return true + } + if strings.HasPrefix(fullpath, filter.PathPrefix) { + return true + } + + // Check rename target + if resp.EventNotification != nil && resp.EventNotification.NewParentPath != "" { + newFullPath := util.Join(resp.EventNotification.NewParentPath, entryName) + if strings.HasPrefix(newFullPath, filter.PathPrefix) { + return true + } + } + + return false +} + +// isChunkNotFound checks if an error indicates a missing volume chunk. +// Matches the server-side isChunkNotFoundError logic. +func isChunkNotFound(err error) bool { + if err == nil { + return false + } + s := err.Error() + return strings.Contains(s, "not found") || strings.Contains(s, "status 404") +} + +// --- min-heap for merging entries across filers --- + +type logEntryHeapItem struct { + entry *filer_pb.LogEntry + filerIdx int +} + +type logEntryHeap []*logEntryHeapItem + +func (h logEntryHeap) Len() int { return len(h) } +func (h logEntryHeap) Less(i, j int) bool { return h[i].entry.TsNs < h[j].entry.TsNs } +func (h logEntryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *logEntryHeap) Push(x any) { *h = append(*h, x.(*logEntryHeapItem)) } +func (h *logEntryHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + old[n-1] = nil + *h = old[:n-1] + return item +} + +// --- log file parsing (uses io.ReadFull for correct partial-read handling) --- + +func readLogFileEntries(newReader LogFileReaderFn, chunks []*filer_pb.FileChunk, startTsNs, stopTsNs int64) ([]*filer_pb.LogEntry, error) { + reader, err := newReader(chunks) + if err != nil { + return nil, fmt.Errorf("create reader: %w", err) + } + defer reader.Close() + + sizeBuf := make([]byte, 4) + var entries []*filer_pb.LogEntry + + for { + _, err := io.ReadFull(reader, sizeBuf) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return entries, err + } + + size := util.BytesToUint32(sizeBuf) + entryData := make([]byte, size) + _, err = io.ReadFull(reader, entryData) + if err != nil { + return entries, err + } + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + return entries, err + } + + if logEntry.TsNs <= startTsNs { + continue + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + break + } + + entries = append(entries, logEntry) + } + return entries, nil +} diff --git a/weed/pb/filer_pb_direct_read_test.go b/weed/pb/filer_pb_direct_read_test.go new file mode 100644 index 000000000..68f71a306 --- /dev/null +++ b/weed/pb/filer_pb_direct_read_test.go @@ -0,0 +1,280 @@ +package pb + +import ( + "bytes" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// buildLogFileData creates the on-disk log file format: +// [4-byte size | protobuf LogEntry] repeated. +func buildLogFileData(events []*filer_pb.SubscribeMetadataResponse) []byte { + var buf bytes.Buffer + for _, event := range events { + eventData, _ := proto.Marshal(event) + logEntry := &filer_pb.LogEntry{ + TsNs: event.TsNs, + Data: eventData, + Key: []byte(event.Directory), + } + entryData, _ := proto.Marshal(logEntry) + sizeBuf := make([]byte, 4) + util.Uint32toBytes(sizeBuf, uint32(len(entryData))) + buf.Write(sizeBuf) + buf.Write(entryData) + } + return buf.Bytes() +} + +func makeSubEvent(dir, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + }, + }, + } +} + +// delayedReader wraps data with a per-open latency to simulate volume server I/O. +type delayedReader struct { + data []byte + delay time.Duration + openedAt time.Time +} + +func (r *delayedReader) Read(p []byte) (int, error) { + if r.openedAt.IsZero() { + r.openedAt = time.Now() + time.Sleep(r.delay) + } + if len(r.data) == 0 { + return 0, io.EOF + } + n := copy(p, r.data) + r.data = r.data[n:] + return n, nil +} + +func (r *delayedReader) Close() error { return nil } + +type testLogFiles struct { + refs []*filer_pb.LogFileChunkRef + fileData map[string][]byte // key: "filerId:fileTsNs" → raw log file bytes + fileDelay time.Duration +} + +func newTestLogFiles(numFilers, filesPerFiler, eventsPerFile int, fileDelay time.Duration) *testLogFiles { + t := &testLogFiles{ + fileData: make(map[string][]byte), + fileDelay: fileDelay, + } + + baseTs := time.Now().Add(-time.Hour).UnixNano() + tsCounter := int64(0) + + for f := 0; f < numFilers; f++ { + filerId := fmt.Sprintf("filer%02d", f) + for file := 0; file < filesPerFiler; file++ { + fileTsNs := baseTs + int64(file)*int64(time.Minute) + + events := make([]*filer_pb.SubscribeMetadataResponse, eventsPerFile) + for i := 0; i < eventsPerFile; i++ { + tsCounter++ + ts := baseTs + tsCounter + events[i] = makeSubEvent( + fmt.Sprintf("/data/%s/dir%02d", filerId, file), + fmt.Sprintf("file%04d.txt", i), + ts, + ) + } + + data := buildLogFileData(events) + key := fmt.Sprintf("%s:%d", filerId, fileTsNs) + t.fileData[key] = data + + t.refs = append(t.refs, &filer_pb.LogFileChunkRef{ + Chunks: []*filer_pb.FileChunk{{ + FileId: key, + }}, + FileTsNs: fileTsNs, + FilerId: filerId, + }) + } + } + return t +} + +func (t *testLogFiles) readerFn() LogFileReaderFn { + return func(chunks []*filer_pb.FileChunk) (io.ReadCloser, error) { + if len(chunks) == 0 { + return nil, fmt.Errorf("no chunks") + } + key := chunks[0].FileId + data, ok := t.fileData[key] + if !ok { + return nil, fmt.Errorf("file not found: %s", key) + } + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + return &delayedReader{data: dataCopy, delay: t.fileDelay}, nil + } +} + +func (t *testLogFiles) totalEvents() int { + total := 0 + for _, data := range t.fileData { + pos := 0 + for pos+4 <= len(data) { + size := int(util.BytesToUint32(data[pos : pos+4])) + pos += 4 + size + total++ + } + } + return total +} + +// TestReadLogFileRefsMergeOrder verifies that entries from multiple filers are +// delivered in correct timestamp order. +func TestReadLogFileRefsMergeOrder(t *testing.T) { + files := newTestLogFiles(3, 2, 50, 0) + + var timestamps []int64 + _, err := ReadLogFileRefs(files.refs, files.readerFn(), 0, 0, + PathFilter{PathPrefix: "/"}, + func(resp *filer_pb.SubscribeMetadataResponse) error { + timestamps = append(timestamps, resp.TsNs) + return nil + }) + if err != nil { + t.Fatalf("ReadLogFileRefs: %v", err) + } + + expected := files.totalEvents() + if len(timestamps) != expected { + t.Fatalf("expected %d events, got %d", expected, len(timestamps)) + } + + for i := 1; i < len(timestamps); i++ { + if timestamps[i] < timestamps[i-1] { + t.Errorf("out of order at index %d: ts[%d]=%d > ts[%d]=%d", + i, i-1, timestamps[i-1], i, timestamps[i]) + break + } + } + + t.Logf("Verified %d events from 3 filers in correct timestamp order", len(timestamps)) +} + +// TestReadLogFileRefsPathFilter verifies path filtering including system log exclusion. +func TestReadLogFileRefsPathFilter(t *testing.T) { + files := newTestLogFiles(2, 2, 50, 0) + total := files.totalEvents() + + var allCount, filteredCount int64 + _, err := ReadLogFileRefs(files.refs, files.readerFn(), 0, 0, + PathFilter{PathPrefix: "/"}, + func(resp *filer_pb.SubscribeMetadataResponse) error { + allCount++ + return nil + }) + if err != nil { + t.Fatalf("ReadLogFileRefs (all): %v", err) + } + + _, err = ReadLogFileRefs(files.refs, files.readerFn(), 0, 0, + PathFilter{PathPrefix: "/data/filer00/"}, + func(resp *filer_pb.SubscribeMetadataResponse) error { + filteredCount++ + return nil + }) + if err != nil { + t.Fatalf("ReadLogFileRefs (filtered): %v", err) + } + + t.Logf("Total events: %d, matching /data/filer00/: %d", allCount, filteredCount) + + if allCount != int64(total) { + t.Errorf("expected %d total events, got %d", total, allCount) + } + if filteredCount >= allCount { + t.Errorf("filter should reduce events: all=%d filtered=%d", allCount, filteredCount) + } + if filteredCount == 0 { + t.Errorf("filter matched zero events") + } +} + +// TestDirectReadVsServerSideThroughput compares: +// - Server-side: sequential file read → gRPC send per event +// - Client direct-read: parallel filers + prefetching + no gRPC +func TestDirectReadVsServerSideThroughput(t *testing.T) { + const ( + numFilers = 3 + filesPerFiler = 7 + eventsPerFile = 300 + fileReadDelay = 2 * time.Millisecond + sendDelay = 20 * time.Microsecond + ) + + files := newTestLogFiles(numFilers, filesPerFiler, eventsPerFile, fileReadDelay) + + var serverRate float64 + t.Run("server_side_sequential", func(t *testing.T) { + var processed int64 + start := time.Now() + + for _, ref := range files.refs { + time.Sleep(fileReadDelay) + key := ref.Chunks[0].FileId + data := files.fileData[key] + pos := 0 + for pos+4 <= len(data) { + size := int(util.BytesToUint32(data[pos : pos+4])) + pos += 4 + size + time.Sleep(sendDelay) + atomic.AddInt64(&processed, 1) + } + } + elapsed := time.Since(start) + serverRate = float64(processed) / elapsed.Seconds() + t.Logf("server-side: %d events %v %6.0f events/sec (%d files sequential + %v send/event)", + processed, elapsed.Round(time.Millisecond), serverRate, + numFilers*filesPerFiler, sendDelay) + }) + + var directRate float64 + t.Run("client_direct_read_parallel_prefetch", func(t *testing.T) { + var processed int64 + start := time.Now() + + _, err := ReadLogFileRefs(files.refs, files.readerFn(), 0, 0, + PathFilter{PathPrefix: "/"}, + func(resp *filer_pb.SubscribeMetadataResponse) error { + atomic.AddInt64(&processed, 1) + return nil + }) + if err != nil { + t.Fatalf("ReadLogFileRefs: %v", err) + } + elapsed := time.Since(start) + directRate = float64(processed) / elapsed.Seconds() + t.Logf("direct-read: %d events %v %6.0f events/sec (%d filers parallel + prefetch, no gRPC)", + processed, elapsed.Round(time.Millisecond), directRate, numFilers) + }) + + if serverRate > 0 { + t.Logf("Speedup: %.1fx (parallel + prefetch + no gRPC vs server-side sequential)", directRate/serverRate) + } +} diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 1c4af2b5c..2f2b984e2 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -34,6 +34,10 @@ type MetadataFollowOption struct { StartTsNs int64 StopTsNs int64 EventErrorType EventErrorType + // LogFileReaderFn, when non-nil, enables metadata chunks mode: + // the server sends log file chunk fids instead of streaming events, + // and the client reads directly from volume servers. + LogFileReaderFn LogFileReaderFn } type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error @@ -62,16 +66,17 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: option.ClientName, - PathPrefix: option.PathPrefix, - PathPrefixes: option.AdditionalPathPrefixes, - Directories: option.DirectoriesToWatch, - SinceNs: option.StartTsNs, - Signature: option.SelfSignature, - ClientId: option.ClientId, - ClientEpoch: option.ClientEpoch, - UntilNs: option.StopTsNs, - ClientSupportsBatching: true, + ClientName: option.ClientName, + PathPrefix: option.PathPrefix, + PathPrefixes: option.AdditionalPathPrefixes, + Directories: option.DirectoriesToWatch, + SinceNs: option.StartTsNs, + Signature: option.SelfSignature, + ClientId: option.ClientId, + ClientEpoch: option.ClientEpoch, + UntilNs: option.StopTsNs, + ClientSupportsBatching: true, + ClientSupportsMetadataChunks: option.LogFileReaderFn != nil, }) if err != nil { return fmt.Errorf("subscribe: %w", err) @@ -97,6 +102,8 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc } } + var pendingRefs []*filer_pb.LogFileChunkRef + for { resp, listenErr := stream.Recv() if listenErr == io.EOF { @@ -106,11 +113,38 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc return listenErr } + // Accumulate log file chunk references (metadata chunks mode) + if len(resp.LogFileRefs) > 0 { + pendingRefs = append(pendingRefs, resp.LogFileRefs...) + continue + } + + // Process accumulated refs before handling normal events (transition point) + if len(pendingRefs) > 0 && option.LogFileReaderFn != nil { + lastTs, readErr := ReadLogFileRefs(pendingRefs, option.LogFileReaderFn, + option.StartTsNs, option.StopTsNs, + PathFilter{ + PathPrefix: option.PathPrefix, + AdditionalPathPrefixes: option.AdditionalPathPrefixes, + DirectoriesToWatch: option.DirectoriesToWatch, + }, + processEventFn) + if readErr != nil { + return fmt.Errorf("read log file refs: %w", readErr) + } + if lastTs > 0 { + option.StartTsNs = lastTs + } + pendingRefs = nil + } + // Process the first event (always present in top-level fields) - if err := processEventFn(resp); err != nil { - handleErr(resp, err) + if resp.EventNotification != nil { + if err := processEventFn(resp); err != nil { + handleErr(resp, err) + } + option.StartTsNs = resp.TsNs } - option.StartTsNs = resp.TsNs // Process any additional batched events for _, batchedEvent := range resp.Events { diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 794d9c87c..16a138182 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "errors" "fmt" "strings" @@ -196,7 +197,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + if req.ClientSupportsMetadataChunks { + processedTsNs, isDone, readPersistedLogErr = fs.sendLogFileRefs(ctx, stream, lastReadTime, req.UntilNs) + } else { + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + } if readPersistedLogErr != nil { return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr) } @@ -331,7 +336,11 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq // Record the position we are about to read from lastDiskReadTsNs = currentReadTsNs glog.V(4).Infof("read on disk %v local subscribe %s from %+v (lastFlushed: %v)", clientName, req.PathPrefix, lastReadTime, time.Unix(0, currentFlushTsNs)) - processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + if req.ClientSupportsMetadataChunks { + processedTsNs, isDone, readPersistedLogErr = fs.sendLogFileRefs(ctx, stream, lastReadTime, req.UntilNs) + } else { + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + } if readPersistedLogErr != nil { glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr) @@ -447,6 +456,35 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } +// sendLogFileRefs collects persisted log file chunk references and sends them +// to the client so it can read the data directly from volume servers. +// This does zero volume server I/O — it only lists filer store directory entries. +// Sends directly on the gRPC stream (bypasses pipelinedSender) because ref +// messages have TsNs=0 and must not be batched into Events by the sender. +func (fs *FilerServer) sendLogFileRefs(ctx context.Context, stream metadataStreamSender, startPosition log_buffer.MessagePosition, stopTsNs int64) (lastTsNs int64, isDone bool, err error) { + refs, lastTsNs, err := fs.filer.CollectLogFileRefs(ctx, startPosition, stopTsNs) + if err != nil { + return 0, false, err + } + if len(refs) == 0 { + return 0, false, nil + } + + const maxRefsPerMessage = 64 + for i := 0; i < len(refs); i += maxRefsPerMessage { + end := i + maxRefsPerMessage + if end > len(refs) { + end = len(refs) + } + if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ + LogFileRefs: refs[i:end], + }); err != nil { + return lastTsNs, false, err + } + } + return lastTsNs, false, nil +} + func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { filtered := 0