diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 0d75c62e2..69ce269cf 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -390,11 +390,13 @@ message SubscribeMetadataRequest { int64 until_ns = 8; int32 client_epoch = 9; repeated string directories = 10; // exact directory to watch + bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events } message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; int64 ts_ns = 3; + repeated SubscribeMetadataResponse events = 4; // batch of additional events (backlog catch-up) } message TraverseBfsMetadataRequest { diff --git a/weed/command/filer_sync_subscription_test.go b/weed/command/filer_sync_subscription_test.go new file mode 100644 index 000000000..3a19877fe --- /dev/null +++ b/weed/command/filer_sync_subscription_test.go @@ -0,0 +1,327 @@ +package command + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +// createFileEvent creates a SubscribeMetadataResponse for a file creation. +func createFileEvent(dir, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + }, + }, + } +} + +// partitionedEvents creates file creation events spread across numDirs directories. +func partitionedEvents(numDirs, filesPerDir int) (partitions [][]*filer_pb.SubscribeMetadataResponse, all []*filer_pb.SubscribeMetadataResponse) { + baseTs := time.Now().UnixNano() + partitions = make([][]*filer_pb.SubscribeMetadataResponse, numDirs) + for d := 0; d < numDirs; d++ { + dir := fmt.Sprintf("/bucket/dir%03d", d) + for f := 0; f < filesPerDir; f++ { + tsNs := baseTs + int64(d*filesPerDir+f) + 1 + event := createFileEvent(dir, fmt.Sprintf("file%06d.txt", f), tsNs) + partitions[d] = append(partitions[d], event) + all = append(all, event) + } + } + return +} + +// runSingleStream feeds all events through one MetadataProcessor with a per-event +// stream delivery delay (simulating a single gRPC SubscribeMetadata stream). +func runSingleStream(events []*filer_pb.SubscribeMetadataResponse, concurrency int, streamDelay, processDelay time.Duration) (processed int64, elapsed time.Duration) { + var wg sync.WaitGroup + + processFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + defer wg.Done() + time.Sleep(processDelay) + atomic.AddInt64(&processed, 1) + return nil + } + + processor := NewMetadataProcessor(processFn, concurrency, 0) + + start := time.Now() + for _, event := range events { + if streamDelay > 0 { + time.Sleep(streamDelay) + } + wg.Add(1) + processor.AddSyncJob(event) + } + wg.Wait() + elapsed = time.Since(start) + return +} + +// runParallelStreams feeds partitioned events through separate MetadataProcessors, +// each in its own goroutine (simulating parallel per-directory gRPC streams). +func runParallelStreams(partitions [][]*filer_pb.SubscribeMetadataResponse, concurrency int, streamDelay, processDelay time.Duration) (processed int64, elapsed time.Duration) { + var outerWg sync.WaitGroup + + start := time.Now() + for _, dirEvents := range partitions { + outerWg.Add(1) + go func(events []*filer_pb.SubscribeMetadataResponse) { + defer outerWg.Done() + + var wg sync.WaitGroup + processFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + defer wg.Done() + time.Sleep(processDelay) + atomic.AddInt64(&processed, 1) + return nil + } + + processor := NewMetadataProcessor(processFn, concurrency, 0) + for _, event := range events { + if streamDelay > 0 { + time.Sleep(streamDelay) + } + wg.Add(1) + processor.AddSyncJob(event) + } + wg.Wait() + }(dirEvents) + } + outerWg.Wait() + elapsed = time.Since(start) + return +} + +// TestStreamDeliveryBottleneck demonstrates that a single serial event stream +// is the primary throughput bottleneck, and N parallel streams achieve N× throughput. +// +// Reproduces discussion #8771: single filer.sync "/" achieves ~80 events/sec, +// while N parallel processes for individual directories achieve N × ~80 events/sec. +// +// The bottleneck is the serial gRPC metadata stream, NOT conflict detection or +// processing concurrency. +func TestStreamDeliveryBottleneck(t *testing.T) { + const ( + numDirs = 10 + filesPerDir = 200 + // Per-event stream delivery overhead (server-side log read + gRPC round-trip). + // Production: ~10-12ms giving ~80-100 events/sec. Scaled down for test speed. + streamDelay = 50 * time.Microsecond + processDelay = 200 * time.Microsecond + ) + + partitions, allEvents := partitionedEvents(numDirs, filesPerDir) + + singleCount, singleElapsed := runSingleStream(allEvents, 256, streamDelay, processDelay) + singleRate := float64(singleCount) / singleElapsed.Seconds() + t.Logf("1 stream: %4d events %v %6.0f events/sec", + singleCount, singleElapsed.Round(time.Millisecond), singleRate) + + parallelCount, parallelElapsed := runParallelStreams(partitions, 256, streamDelay, processDelay) + parallelRate := float64(parallelCount) / parallelElapsed.Seconds() + t.Logf("%d streams: %4d events %v %6.0f events/sec", + numDirs, parallelCount, parallelElapsed.Round(time.Millisecond), parallelRate) + + speedup := parallelRate / singleRate + t.Logf("Speedup: %.1fx (%d parallel streams vs 1 stream)", speedup, numDirs) + + if singleCount != int64(numDirs*filesPerDir) { + t.Errorf("single: expected %d events, got %d", numDirs*filesPerDir, singleCount) + } + if parallelCount != int64(numDirs*filesPerDir) { + t.Errorf("parallel: expected %d events, got %d", numDirs*filesPerDir, parallelCount) + } + // Parallel should be significantly faster + if speedup < float64(numDirs)*0.4 { + t.Errorf("expected at least %.1fx speedup, got %.1fx", float64(numDirs)*0.4, speedup) + } +} + +// TestConcurrencyIneffectiveOnStreamBottleneck shows that increasing the +// -concurrency flag has no effect when the stream delivery rate is the bottleneck. +// +// Matches the user observation: "-concurrency=256 is little better than default +// but increasing it to 1024 doesn't do anything." +func TestConcurrencyIneffectiveOnStreamBottleneck(t *testing.T) { + const ( + numDirs = 10 + filesPerDir = 100 + streamDelay = 50 * time.Microsecond + processDelay = 200 * time.Microsecond + ) + + _, allEvents := partitionedEvents(numDirs, filesPerDir) + + var rates []float64 + for _, concurrency := range []int{32, 128, 512} { + count, elapsed := runSingleStream(allEvents, concurrency, streamDelay, processDelay) + rate := float64(count) / elapsed.Seconds() + rates = append(rates, rate) + t.Logf("concurrency=%3d: %d events %v %.0f events/sec", + concurrency, count, elapsed.Round(time.Millisecond), rate) + } + + if len(rates) >= 2 { + ratio := rates[len(rates)-1] / rates[0] + t.Logf("concurrency 512 vs 32: %.2fx (expected ~1.0x when stream-limited)", ratio) + // Should be within 50% — concurrency doesn't help a stream bottleneck + if ratio > 1.5 || ratio < 0.5 { + t.Errorf("unexpected ratio %.2f: concurrency should not affect stream-limited throughput", ratio) + } + } +} + +// TestLogBufferSubscriptionThroughput uses the real LogBuffer and LoopProcessLogData +// to demonstrate that a single subscriber's callback is called serially (blocking +// the event loop), while N parallel subscribers process events concurrently. +// +// This directly reproduces the server-side pipeline: SubscribeMetadata reads events +// from the LogBuffer via LoopProcessLogData, and for each event calls stream.Send() +// which blocks until the client acknowledges. A slow client stalls the entire +// event loop for that subscriber. +func TestLogBufferSubscriptionThroughput(t *testing.T) { + const ( + numDirs = 10 + filesPerDir = 200 + totalEvents = numDirs * filesPerDir + processDelay = 200 * time.Microsecond + ) + + lb := log_buffer.NewLogBuffer("test-subscription", time.Hour, nil, nil, func() {}) + + // Populate buffer with events across directories + baseTs := time.Now().UnixNano() + var firstTsNs, lastTsNs int64 + for d := 0; d < numDirs; d++ { + dir := fmt.Sprintf("/data/dir%03d", d) + for f := 0; f < filesPerDir; f++ { + tsNs := baseTs + int64(d*filesPerDir+f) + 1 + if firstTsNs == 0 { + firstTsNs = tsNs + } + lastTsNs = tsNs + event := createFileEvent(dir, fmt.Sprintf("file%06d.txt", f), tsNs) + data, err := proto.Marshal(event) + if err != nil { + t.Fatalf("marshal: %v", err) + } + if err := lb.AddDataToBuffer([]byte(dir), data, tsNs); err != nil { + t.Fatalf("add to buffer: %v", err) + } + } + } + + startPos := log_buffer.NewMessagePosition(firstTsNs-1, -2) + + // --- Single subscriber: all events go through one callback serially --- + var singleProcessed int64 + var singleRate float64 + t.Run("single_subscriber_root", func(t *testing.T) { + done := make(chan struct{}) + start := time.Now() + go func() { + defer close(done) + lb.LoopProcessLogData("single-root", startPos, lastTsNs, + func() bool { return true }, + func(logEntry *filer_pb.LogEntry) (bool, error) { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + return false, err + } + // All events match "/" — process all + time.Sleep(processDelay) + atomic.AddInt64(&singleProcessed, 1) + return false, nil + }) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatal("timed out") + } + elapsed := time.Since(start) + singleRate = float64(singleProcessed) / elapsed.Seconds() + t.Logf("1 subscriber (/): %4d events %v %6.0f events/sec", + singleProcessed, elapsed.Round(time.Millisecond), singleRate) + + if singleProcessed != int64(totalEvents) { + t.Errorf("expected %d events, got %d", totalEvents, singleProcessed) + } + }) + + // --- N parallel subscribers, each filtering for one directory --- + var parallelProcessed int64 + var parallelRate float64 + t.Run("parallel_subscribers_per_dir", func(t *testing.T) { + var wg sync.WaitGroup + + start := time.Now() + for d := 0; d < numDirs; d++ { + wg.Add(1) + prefix := fmt.Sprintf("/data/dir%03d/", d) + name := fmt.Sprintf("parallel-dir%03d", d) + + go func(pfx, readerName string) { + defer wg.Done() + lb.LoopProcessLogData(readerName, startPos, lastTsNs, + func() bool { return true }, + func(logEntry *filer_pb.LogEntry) (bool, error) { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + return false, err + } + fullpath := event.Directory + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + fullpath += "/" + event.EventNotification.NewEntry.Name + } + if !strings.HasPrefix(fullpath, pfx) { + return false, nil // skip non-matching — no delay + } + time.Sleep(processDelay) + atomic.AddInt64(¶llelProcessed, 1) + return false, nil + }) + }(prefix, name) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatal("timed out") + } + elapsed := time.Since(start) + parallelRate = float64(parallelProcessed) / elapsed.Seconds() + t.Logf("%d subscribers: %4d events %v %6.0f events/sec", + numDirs, parallelProcessed, elapsed.Round(time.Millisecond), parallelRate) + + if parallelProcessed != int64(totalEvents) { + t.Errorf("expected %d events, got %d", totalEvents, parallelProcessed) + } + }) + + if singleRate > 0 && parallelRate > 0 { + speedup := parallelRate / singleRate + t.Logf("LogBuffer speedup: %.1fx (%d parallel subscribers vs 1)", speedup, numDirs) + } +} diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 1fb61de81..16874855b 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -206,22 +206,50 @@ func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, err = fmt.Errorf("reading from persisted logs: %w", visitErr) return } - var logEntry *filer_pb.LogEntry - for { - logEntry, visitErr = visitor.GetNext() - if visitErr != nil { - if visitErr == io.EOF { - break + + // Readahead: run the visitor in a background goroutine so volume server I/O + // for the next log file overlaps with event processing and gRPC delivery. + const readaheadSize = 1024 + type entryOrErr struct { + entry *filer_pb.LogEntry + err error + } + ch := make(chan entryOrErr, readaheadSize) + stopReadahead := make(chan struct{}) + go func() { + defer close(ch) + for { + entry, readErr := visitor.GetNext() + if readErr != nil { + if readErr != io.EOF { + select { + case ch <- entryOrErr{err: fmt.Errorf("read next from persisted logs: %w", readErr)}: + case <-stopReadahead: + } + } + return } - err = fmt.Errorf("read next from persisted logs: %w", visitErr) + select { + case ch <- entryOrErr{entry: entry}: + case <-stopReadahead: + return + } + } + }() + defer close(stopReadahead) + + for item := range ch { + if item.err != nil { + err = item.err return } - isDone, visitErr = eachLogEntryFn(logEntry) - if visitErr != nil { - err = fmt.Errorf("process persisted log entry: %w", visitErr) + var processErr error + isDone, processErr = eachLogEntryFn(item.entry) + if processErr != nil { + err = fmt.Errorf("process persisted log entry: %w", processErr) return } - lastTsNs = logEntry.TsNs + lastTsNs = item.entry.TsNs if isDone { return } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index a37d4ab74..ab610bb6a 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -200,17 +200,28 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, defer cancel() atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1) stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + string(self), - PathPrefix: "/", - SinceNs: lastTsNs, - ClientId: ma.filer.UniqueFilerId, - ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch), + ClientName: "filer:" + string(self), + PathPrefix: "/", + SinceNs: lastTsNs, + ClientId: ma.filer.UniqueFilerId, + ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch), + ClientSupportsBatching: true, }) if err != nil { glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err) return fmt.Errorf("subscribe: %w", err) } + processOne := func(event *filer_pb.SubscribeMetadataResponse) error { + if err := processEventFn(event); err != nil { + glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", event, err) + return fmt.Errorf("process %v: %w", event, err) + } + f.onMetadataChangeEvent(event) + lastTsNs = event.TsNs + return nil + } + for { resp, listenErr := stream.Recv() if listenErr == io.EOF { @@ -221,13 +232,15 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, return listenErr } - if err := processEventFn(resp); err != nil { - glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", resp, err) - return fmt.Errorf("process %v: %w", resp, err) + if err := processOne(resp); err != nil { + return err + } + // Process any additional batched events + for _, batchedEvent := range resp.Events { + if err := processOne(batchedEvent); err != nil { + return err + } } - - f.onMetadataChangeEvent(resp) - lastTsNs = resp.TsNs } }) return lastTsNs, err diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 0d75c62e2..69ce269cf 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -390,11 +390,13 @@ message SubscribeMetadataRequest { int64 until_ns = 8; int32 client_epoch = 9; repeated string directories = 10; // exact directory to watch + bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events } message SubscribeMetadataResponse { string directory = 1; EventNotification event_notification = 2; int64 ts_ns = 3; + repeated SubscribeMetadataResponse events = 4; // batch of additional events (backlog catch-up) } message TraverseBfsMetadataRequest { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index b4c1bb71e..2a49f55cd 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -2899,18 +2899,19 @@ func (x *GetFilerConfigurationResponse) GetMinorVersion() int32 { } type SubscribeMetadataRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` - PathPrefix string `protobuf:"bytes,2,opt,name=path_prefix,json=pathPrefix,proto3" json:"path_prefix,omitempty"` - SinceNs int64 `protobuf:"varint,3,opt,name=since_ns,json=sinceNs,proto3" json:"since_ns,omitempty"` - Signature int32 `protobuf:"varint,4,opt,name=signature,proto3" json:"signature,omitempty"` - PathPrefixes []string `protobuf:"bytes,6,rep,name=path_prefixes,json=pathPrefixes,proto3" json:"path_prefixes,omitempty"` - ClientId int32 `protobuf:"varint,7,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` - UntilNs int64 `protobuf:"varint,8,opt,name=until_ns,json=untilNs,proto3" json:"until_ns,omitempty"` - ClientEpoch int32 `protobuf:"varint,9,opt,name=client_epoch,json=clientEpoch,proto3" json:"client_epoch,omitempty"` - Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` + PathPrefix string `protobuf:"bytes,2,opt,name=path_prefix,json=pathPrefix,proto3" json:"path_prefix,omitempty"` + SinceNs int64 `protobuf:"varint,3,opt,name=since_ns,json=sinceNs,proto3" json:"since_ns,omitempty"` + Signature int32 `protobuf:"varint,4,opt,name=signature,proto3" json:"signature,omitempty"` + PathPrefixes []string `protobuf:"bytes,6,rep,name=path_prefixes,json=pathPrefixes,proto3" json:"path_prefixes,omitempty"` + ClientId int32 `protobuf:"varint,7,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + UntilNs int64 `protobuf:"varint,8,opt,name=until_ns,json=untilNs,proto3" json:"until_ns,omitempty"` + ClientEpoch int32 `protobuf:"varint,9,opt,name=client_epoch,json=clientEpoch,proto3" json:"client_epoch,omitempty"` + Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch + ClientSupportsBatching bool `protobuf:"varint,11,opt,name=client_supports_batching,json=clientSupportsBatching,proto3" json:"client_supports_batching,omitempty"` // client can unpack SubscribeMetadataResponse.events + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SubscribeMetadataRequest) Reset() { @@ -3006,11 +3007,19 @@ func (x *SubscribeMetadataRequest) GetDirectories() []string { return nil } +func (x *SubscribeMetadataRequest) GetClientSupportsBatching() bool { + if x != nil { + return x.ClientSupportsBatching + } + return false +} + type SubscribeMetadataResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` - EventNotification *EventNotification `protobuf:"bytes,2,opt,name=event_notification,json=eventNotification,proto3" json:"event_notification,omitempty"` - TsNs int64 `protobuf:"varint,3,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` + EventNotification *EventNotification `protobuf:"bytes,2,opt,name=event_notification,json=eventNotification,proto3" json:"event_notification,omitempty"` + TsNs int64 `protobuf:"varint,3,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` + Events []*SubscribeMetadataResponse `protobuf:"bytes,4,rep,name=events,proto3" json:"events,omitempty"` // batch of additional events (backlog catch-up) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3066,6 +3075,13 @@ func (x *SubscribeMetadataResponse) GetTsNs() int64 { return 0 } +func (x *SubscribeMetadataResponse) GetEvents() []*SubscribeMetadataResponse { + if x != nil { + return x.Events + } + return nil +} + type TraverseBfsMetadataRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` @@ -5063,7 +5079,7 @@ const file_filer_proto_rawDesc = "" + "\vfiler_group\x18\r \x01(\tR\n" + "filerGroup\x12#\n" + "\rmajor_version\x18\x0e \x01(\x05R\fmajorVersion\x12#\n" + - "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xb7\x02\n" + + "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xf1\x02\n" + "\x18SubscribeMetadataRequest\x12\x1f\n" + "\vclient_name\x18\x01 \x01(\tR\n" + "clientName\x12\x1f\n" + @@ -5076,11 +5092,13 @@ const file_filer_proto_rawDesc = "" + "\buntil_ns\x18\b \x01(\x03R\auntilNs\x12!\n" + "\fclient_epoch\x18\t \x01(\x05R\vclientEpoch\x12 \n" + "\vdirectories\x18\n" + - " \x03(\tR\vdirectories\"\x9a\x01\n" + + " \x03(\tR\vdirectories\x128\n" + + "\x18client_supports_batching\x18\v \x01(\bR\x16clientSupportsBatching\"\xd7\x01\n" + "\x19SubscribeMetadataResponse\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12J\n" + "\x12event_notification\x18\x02 \x01(\v2\x1b.filer_pb.EventNotificationR\x11eventNotification\x12\x13\n" + - "\x05ts_ns\x18\x03 \x01(\x03R\x04tsNs\"g\n" + + "\x05ts_ns\x18\x03 \x01(\x03R\x04tsNs\x12;\n" + + "\x06events\x18\x04 \x03(\v2#.filer_pb.SubscribeMetadataResponseR\x06events\"g\n" + "\x1aTraverseBfsMetadataRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12+\n" + "\x11excluded_prefixes\x18\x02 \x03(\tR\x10excludedPrefixes\"b\n" + @@ -5368,78 +5386,79 @@ var file_filer_proto_depIdxs = []int32{ 72, // 24: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry 32, // 25: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection 9, // 26: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification - 7, // 27: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry - 73, // 28: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource - 74, // 29: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf - 7, // 30: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry - 44, // 31: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse - 65, // 32: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock - 14, // 33: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest - 16, // 34: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest - 20, // 35: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest - 24, // 36: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest - 15, // 37: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse - 17, // 38: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse - 21, // 39: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse - 25, // 40: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse - 29, // 41: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations - 2, // 42: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest - 4, // 43: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest - 14, // 44: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest - 16, // 45: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest - 18, // 46: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest - 20, // 47: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest - 22, // 48: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest - 24, // 49: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest - 68, // 50: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest - 26, // 51: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest - 28, // 52: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest - 33, // 53: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest - 35, // 54: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest - 37, // 55: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest - 39, // 56: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest - 41, // 57: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest - 45, // 58: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest - 43, // 59: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 43, // 60: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 52, // 61: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest - 54, // 62: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest - 57, // 63: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest - 59, // 64: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest - 61, // 65: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest - 63, // 66: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest - 66, // 67: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest - 3, // 68: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse - 5, // 69: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse - 15, // 70: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse - 17, // 71: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse - 19, // 72: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse - 21, // 73: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse - 23, // 74: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse - 25, // 75: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse - 69, // 76: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse - 27, // 77: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse - 31, // 78: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse - 34, // 79: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse - 36, // 80: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse - 38, // 81: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse - 40, // 82: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse - 42, // 83: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse - 46, // 84: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse - 44, // 85: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 44, // 86: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 53, // 87: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse - 55, // 88: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse - 58, // 89: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse - 60, // 90: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse - 62, // 91: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse - 64, // 92: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse - 67, // 93: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse - 68, // [68:94] is the sub-list for method output_type - 42, // [42:68] is the sub-list for method input_type - 42, // [42:42] is the sub-list for extension type_name - 42, // [42:42] is the sub-list for extension extendee - 0, // [0:42] is the sub-list for field type_name + 44, // 27: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse + 7, // 28: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry + 73, // 29: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource + 74, // 30: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf + 7, // 31: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry + 44, // 32: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse + 65, // 33: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock + 14, // 34: filer_pb.StreamMutateEntryRequest.create_request:type_name -> filer_pb.CreateEntryRequest + 16, // 35: filer_pb.StreamMutateEntryRequest.update_request:type_name -> filer_pb.UpdateEntryRequest + 20, // 36: filer_pb.StreamMutateEntryRequest.delete_request:type_name -> filer_pb.DeleteEntryRequest + 24, // 37: filer_pb.StreamMutateEntryRequest.rename_request:type_name -> filer_pb.StreamRenameEntryRequest + 15, // 38: filer_pb.StreamMutateEntryResponse.create_response:type_name -> filer_pb.CreateEntryResponse + 17, // 39: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse + 21, // 40: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse + 25, // 41: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse + 29, // 42: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations + 2, // 43: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest + 4, // 44: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest + 14, // 45: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest + 16, // 46: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest + 18, // 47: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest + 20, // 48: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest + 22, // 49: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest + 24, // 50: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest + 68, // 51: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest + 26, // 52: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest + 28, // 53: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest + 33, // 54: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest + 35, // 55: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest + 37, // 56: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest + 39, // 57: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest + 41, // 58: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest + 45, // 59: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest + 43, // 60: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 43, // 61: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 52, // 62: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest + 54, // 63: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest + 57, // 64: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest + 59, // 65: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest + 61, // 66: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest + 63, // 67: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest + 66, // 68: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest + 3, // 69: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse + 5, // 70: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse + 15, // 71: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse + 17, // 72: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse + 19, // 73: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse + 21, // 74: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse + 23, // 75: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse + 25, // 76: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse + 69, // 77: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse + 27, // 78: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse + 31, // 79: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse + 34, // 80: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse + 36, // 81: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse + 38, // 82: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse + 40, // 83: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse + 42, // 84: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse + 46, // 85: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse + 44, // 86: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 44, // 87: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 53, // 88: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse + 55, // 89: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse + 58, // 90: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse + 60, // 91: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse + 62, // 92: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse + 64, // 93: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse + 67, // 94: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse + 69, // [69:95] is the sub-list for method output_type + 43, // [43:69] is the sub-list for method input_type + 43, // [43:43] is the sub-list for extension type_name + 43, // [43:43] is the sub-list for extension extendee + 0, // [0:43] is the sub-list for field type_name } func init() { file_filer_proto_init() } diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index d61f0abb8..1c4af2b5c 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -62,20 +62,41 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: option.ClientName, - PathPrefix: option.PathPrefix, - PathPrefixes: option.AdditionalPathPrefixes, - Directories: option.DirectoriesToWatch, - SinceNs: option.StartTsNs, - Signature: option.SelfSignature, - ClientId: option.ClientId, - ClientEpoch: option.ClientEpoch, - UntilNs: option.StopTsNs, + ClientName: option.ClientName, + PathPrefix: option.PathPrefix, + PathPrefixes: option.AdditionalPathPrefixes, + Directories: option.DirectoriesToWatch, + SinceNs: option.StartTsNs, + Signature: option.SelfSignature, + ClientId: option.ClientId, + ClientEpoch: option.ClientEpoch, + UntilNs: option.StopTsNs, + ClientSupportsBatching: true, }) if err != nil { return fmt.Errorf("subscribe: %w", err) } + handleErr := func(resp *filer_pb.SubscribeMetadataResponse, err error) { + switch option.EventErrorType { + case TrivialOnError: + glog.Errorf("process %v: %v", resp, err) + case FatalOnError: + glog.Fatalf("process %v: %v", resp, err) + case RetryForeverOnError: + util.RetryUntil("followMetaUpdates", func() error { + return processEventFn(resp) + }, func(err error) bool { + glog.Errorf("process %v: %v", resp, err) + return true + }) + case DontLogError: + // pass + default: + glog.Errorf("process %v: %v", resp, err) + } + } + for { resp, listenErr := stream.Recv() if listenErr == io.EOF { @@ -85,26 +106,19 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc return listenErr } + // Process the first event (always present in top-level fields) if err := processEventFn(resp); err != nil { - switch option.EventErrorType { - case TrivialOnError: - glog.Errorf("process %v: %v", resp, err) - case FatalOnError: - glog.Fatalf("process %v: %v", resp, err) - case RetryForeverOnError: - util.RetryUntil("followMetaUpdates", func() error { - return processEventFn(resp) - }, func(err error) bool { - glog.Errorf("process %v: %v", resp, err) - return true - }) - case DontLogError: - // pass - default: - glog.Errorf("process %v: %v", resp, err) - } + handleErr(resp, err) } option.StartTsNs = resp.TsNs + + // Process any additional batched events + for _, batchedEvent := range resp.Events { + if err := processEventFn(batchedEvent); err != nil { + handleErr(batchedEvent, err) + } + option.StartTsNs = batchedEvent.TsNs + } } } } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 45a82bc29..794d9c87c 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -23,6 +23,133 @@ const ( MaxUnsyncedEvents = 1e3 ) +// metadataStreamSender is satisfied by both gRPC stream types and pipelinedSender. +type metadataStreamSender interface { + Send(*filer_pb.SubscribeMetadataResponse) error +} + +const ( + // batchBehindThreshold: when an event's timestamp is older than this + // relative to wall clock, the sender switches to batch mode for throughput. + // When events are closer to current time, they are sent one-by-one for + // low latency. + batchBehindThreshold = 2 * time.Minute + maxBatchSize = 256 +) + +// pipelinedSender decouples event reading from gRPC delivery by buffering +// messages in a channel. A dedicated goroutine handles stream.Send(), allowing +// the reader to continue reading ahead without waiting for the client to +// acknowledge each event. +// +// When the client declares support for batching AND events are far behind +// current time (backlog catch-up), multiple events are packed into a single +// stream.Send() using the Events field. Otherwise events are sent one-by-one. +type pipelinedSender struct { + sendCh chan *filer_pb.SubscribeMetadataResponse + errCh chan error + done chan struct{} + canBatch bool // true only if client set ClientSupportsBatching +} + +func newPipelinedSender(stream metadataStreamSender, bufSize int, clientSupportsBatching bool) *pipelinedSender { + s := &pipelinedSender{ + sendCh: make(chan *filer_pb.SubscribeMetadataResponse, bufSize), + errCh: make(chan error, 1), + done: make(chan struct{}), + canBatch: clientSupportsBatching, + } + go s.sendLoop(stream) + return s +} + +func (s *pipelinedSender) sendLoop(stream metadataStreamSender) { + defer close(s.done) + for msg := range s.sendCh { + shouldBatch := s.canBatch && time.Now().UnixNano()-msg.TsNs > int64(batchBehindThreshold) + + if !shouldBatch { + // Real-time: send immediately for low latency + if err := stream.Send(msg); err != nil { + s.reportErr(err) + return + } + continue + } + + // Backlog: batch multiple events into one Send for throughput. + // The first event goes in the top-level fields; additional events + // go in the Events slice. Old clients ignore the Events field. + batch := make([]*filer_pb.SubscribeMetadataResponse, 0, maxBatchSize) + batch = append(batch, msg) + drain: + for len(batch) < maxBatchSize { + select { + case next, ok := <-s.sendCh: + if !ok { + break drain + } + batch = append(batch, next) + default: + break drain + } + } + + var toSend *filer_pb.SubscribeMetadataResponse + if len(batch) == 1 { + toSend = batch[0] + } else { + // Pack batch: first event is the envelope, rest go in Events + toSend = batch[0] + toSend.Events = batch[1:] + } + if err := stream.Send(toSend); err != nil { + s.reportErr(err) + return + } + if toSend.Events != nil { + toSend.Events = nil + } + } +} + +func (s *pipelinedSender) reportErr(err error) { + select { + case s.errCh <- err: + default: + } + // Don't drain sendCh here — Send() detects the exit via <-s.done + // and the deferred close(s.done) in sendLoop will fire after this returns. +} + +func (s *pipelinedSender) Send(msg *filer_pb.SubscribeMetadataResponse) error { + select { + case s.sendCh <- msg: + return nil + case err := <-s.errCh: + return err + case <-s.done: + // Sender goroutine exited (stream error or shutdown). + select { + case err := <-s.errCh: + return err + default: + return fmt.Errorf("pipelined sender closed") + } + } +} + +func (s *pipelinedSender) Close() error { + close(s.sendCh) + <-s.done + select { + case err := <-s.errCh: + return err + default: + return nil + } +} + func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { if fs.filer.MetaAggregator == nil || !fs.filer.MetaAggregator.HasRemotePeers() { return fs.SubscribeLocalMetadata(req, stream) @@ -47,7 +174,16 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) + sender := newPipelinedSender(stream, 1024, req.ClientSupportsBatching) + defer sender.Close() + + // Register for instant notification when new data arrives in the aggregated log buffer. + // Used to replace the 1127ms sleep with event-driven wake-up. + aggNotifyName := "aggSubscribe:" + clientName + aggNotifyChan := fs.filer.MetaAggregator.MetaLogBuffer.RegisterSubscriber(aggNotifyName) + defer fs.filer.MetaAggregator.MetaLogBuffer.UnregisterSubscriber(aggNotifyName) + + eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -128,7 +264,17 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } - time.Sleep(1127 * time.Millisecond) + // Wait for new data (event-driven instead of 1127ms polling). + // Drain any stale notification first to avoid a spurious wake-up. + select { + case <-aggNotifyChan: + default: + } + select { + case <-aggNotifyChan: + case <-ctx.Done(): + return nil + } } return readInMemoryLogErr @@ -158,7 +304,10 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) + sender := newPipelinedSender(stream, 1024, req.ClientSupportsBatching) + defer sender.Close() + + eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -210,8 +359,12 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) readInMemoryLogErr = nil // Clear the error since we're skipping forward } else { - // No memory data yet, just wait - time.Sleep(1127 * time.Millisecond) + // No memory data yet, wait for new data (event-driven) + fs.listenersLock.Lock() + atomic.AddInt64(&fs.listenersWaits, 1) + fs.listenersCond.Wait() + atomic.AddInt64(&fs.listenersWaits, -1) + fs.listenersLock.Unlock() continue } } else { @@ -294,13 +447,13 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { filtered := 0 return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { defer func() { if filtered > MaxUnsyncedEvents { - if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ + if err := sender.Send(&filer_pb.SubscribeMetadataResponse{ EventNotification: &filer_pb.EventNotification{}, TsNs: tsNs, }); err == nil { @@ -364,7 +517,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe TsNs: tsNs, } // println("sending", dirPath, entryName) - if err := stream.Send(message); err != nil { + if err := sender.Send(message); err != nil { glog.V(0).Infof("=> client %v: %+v", clientName, err) return err } diff --git a/weed/server/filer_grpc_server_sub_meta_test.go b/weed/server/filer_grpc_server_sub_meta_test.go new file mode 100644 index 000000000..607dec278 --- /dev/null +++ b/weed/server/filer_grpc_server_sub_meta_test.go @@ -0,0 +1,334 @@ +package weed_server + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// slowStream simulates a gRPC stream with configurable per-Send latency. +// It counts individual events including those packed inside batches. +type slowStream struct { + sendDelay time.Duration + sends int64 // number of stream.Send() calls + eventsSent int64 // total events (1 + len(Events) per Send) +} + +func (s *slowStream) Send(msg *filer_pb.SubscribeMetadataResponse) error { + time.Sleep(s.sendDelay) + atomic.AddInt64(&s.sends, 1) + atomic.AddInt64(&s.eventsSent, 1+int64(len(msg.Events))) + return nil +} + +func makeEvent(dir, name string, tsNs int64) *filer_pb.SubscribeMetadataResponse { + return &filer_pb.SubscribeMetadataResponse{ + Directory: dir, + TsNs: tsNs, + EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + }, + }, + } +} + +// makeOldEvents creates events with timestamps far in the past (triggers batch mode). +func makeOldEvents(n int) []*filer_pb.SubscribeMetadataResponse { + baseTs := time.Now().Add(-time.Hour).UnixNano() // 1 hour ago → well past batchBehindThreshold + events := make([]*filer_pb.SubscribeMetadataResponse, n) + for i := range events { + events[i] = makeEvent("/bucket/dir", fmt.Sprintf("file%06d.txt", i), baseTs+int64(i)) + } + return events +} + +// makeRecentEvents creates events with timestamps close to now (sends one-by-one). +func makeRecentEvents(n int) []*filer_pb.SubscribeMetadataResponse { + baseTs := time.Now().UnixNano() + events := make([]*filer_pb.SubscribeMetadataResponse, n) + for i := range events { + events[i] = makeEvent("/bucket/dir", fmt.Sprintf("file%06d.txt", i), baseTs+int64(i)) + } + return events +} + +// TestPipelinedSenderThroughput compares direct (blocking) stream.Send with +// the pipelinedSender with adaptive batching. +// +// Simulates realistic backlog catch-up: the reader loads one log file at a time +// from a volume server (fileReadDelay per file), producing a burst of ~300 +// events. The sender has per-Send gRPC overhead (sendDelay). +// +// - Direct: serial — each event: send one-by-one between file reads +// - Pipelined+batched: file I/O overlaps with batched sending +func TestPipelinedSenderThroughput(t *testing.T) { + const ( + eventsPerFile = 300 // events in one minute-log file + numFiles = 7 // files to process + totalEvents = eventsPerFile * numFiles // 2100 + fileReadDelay = 5 * time.Millisecond // volume server read per log file + sendDelay = 50 * time.Microsecond // gRPC round-trip per Send() + ) + + // Partition old events into file-sized bursts + files := make([][]*filer_pb.SubscribeMetadataResponse, numFiles) + baseTs := time.Now().Add(-time.Hour).UnixNano() + for f := 0; f < numFiles; f++ { + files[f] = make([]*filer_pb.SubscribeMetadataResponse, eventsPerFile) + for i := 0; i < eventsPerFile; i++ { + idx := f*eventsPerFile + i + files[f][i] = makeEvent("/bucket/dir", fmt.Sprintf("file%06d.txt", idx), baseTs+int64(idx)) + } + } + + // --- Direct (old behavior): read file, send events one-by-one, repeat --- + var directRate float64 + t.Run("direct_send", func(t *testing.T) { + stream := &slowStream{sendDelay: sendDelay} + + start := time.Now() + for _, file := range files { + time.Sleep(fileReadDelay) // read log file from volume server + for _, ev := range file { + if err := stream.Send(ev); err != nil { + t.Fatalf("send error: %v", err) + } + } + } + elapsed := time.Since(start) + + directRate = float64(stream.eventsSent) / elapsed.Seconds() + t.Logf("direct: %d events %4d sends %v %6.0f events/sec", + stream.eventsSent, stream.sends, elapsed.Round(time.Millisecond), directRate) + }) + + // --- Pipelined + batched (new behavior): file reads overlap with batched sends --- + var batchedRate float64 + t.Run("pipelined_batched_send", func(t *testing.T) { + stream := &slowStream{sendDelay: sendDelay} + sender := newPipelinedSender(stream, 1024, true) + + start := time.Now() + for _, file := range files { + time.Sleep(fileReadDelay) // read log file from volume server + for _, ev := range file { + if err := sender.Send(ev); err != nil { + t.Fatalf("send error: %v", err) + } + } + } + if err := sender.Close(); err != nil { + t.Fatalf("close error: %v", err) + } + elapsed := time.Since(start) + + batchedRate = float64(stream.eventsSent) / elapsed.Seconds() + t.Logf("pipelined+batch: %d events %4d sends %v %6.0f events/sec", + stream.eventsSent, stream.sends, elapsed.Round(time.Millisecond), batchedRate) + }) + + if directRate > 0 { + t.Logf("Speedup: %.1fx (pipelined+batched vs direct)", batchedRate/directRate) + } +} + +// TestBatchingAdaptive verifies the adaptive behavior: old events are batched, +// recent events are sent one-by-one. +func TestBatchingAdaptive(t *testing.T) { + const numEvents = 500 + + t.Run("old_events_are_batched", func(t *testing.T) { + stream := &slowStream{sendDelay: 10 * time.Microsecond} + sender := newPipelinedSender(stream, 1024, true) + + // Push all events at once (no read delay) so the sender can batch aggressively + for _, ev := range makeOldEvents(numEvents) { + sender.Send(ev) + } + sender.Close() + + t.Logf("old events: %d events in %d sends (avg batch size: %.1f)", + stream.eventsSent, stream.sends, float64(stream.eventsSent)/float64(stream.sends)) + + if stream.sends >= int64(numEvents) { + t.Errorf("expected batching to reduce sends below %d, got %d", numEvents, stream.sends) + } + }) + + t.Run("recent_events_sent_individually", func(t *testing.T) { + stream := &slowStream{sendDelay: 10 * time.Microsecond} + sender := newPipelinedSender(stream, 1024, true) + + for _, ev := range makeRecentEvents(numEvents) { + sender.Send(ev) + } + sender.Close() + + t.Logf("recent events: %d events in %d sends (avg batch size: %.1f)", + stream.eventsSent, stream.sends, float64(stream.eventsSent)/float64(stream.sends)) + + if stream.sends != int64(numEvents) { + t.Errorf("expected 1:1 sends for recent events, got %d sends for %d events", stream.sends, numEvents) + } + }) +} + +// errorStreamImpl is a metadataStreamSender that returns an error after N sends. +type errorStreamImpl struct { + failAfter int + err error + count int64 +} + +func (s *errorStreamImpl) Send(msg *filer_pb.SubscribeMetadataResponse) error { + n := atomic.AddInt64(&s.count, 1) + if int(n) > s.failAfter { + return s.err + } + return nil +} + +// TestPipelinedSenderErrorPropagation verifies that when stream.Send fails, +// the error propagates to pipelinedSender.Send callers and Close. +func TestPipelinedSenderErrorPropagation(t *testing.T) { + sendErr := fmt.Errorf("connection reset") + + t.Run("send_returns_error", func(t *testing.T) { + // Stream fails after 5 successful sends + stream := &errorStreamImpl{failAfter: 5, err: sendErr} + sender := newPipelinedSender(stream, 4, true) + + var lastErr error + for i := 0; i < 100; i++ { + ev := makeOldEvents(1)[0] + if err := sender.Send(ev); err != nil { + lastErr = err + break + } + } + + if lastErr == nil { + t.Fatal("expected Send to return an error after stream failure") + } + t.Logf("Send returned error after stream failure: %v", lastErr) + }) + + t.Run("close_returns_error_if_not_consumed", func(t *testing.T) { + // Stream fails on the very first send — error surfaces via Close + // since Send may have already returned before the sender goroutine + // processes the message. + stream := &errorStreamImpl{failAfter: 0, err: sendErr} + sender := newPipelinedSender(stream, 1024, true) + + ev := makeOldEvents(1)[0] + sender.Send(ev) + + closeErr := sender.Close() + if closeErr == nil { + t.Log("Close returned nil (error was consumed by Send)") + } else { + t.Logf("Close returned error: %v", closeErr) + } + }) +} + +// TestPipelinedSingleVsParallelStreams shows 1 pipelined+batched stream vs +// N parallel pipelined+batched streams, using the realistic burst-read pattern. +func TestPipelinedSingleVsParallelStreams(t *testing.T) { + const ( + numDirs = 10 + filesPerDir = 7 // log files per directory + eventsPerFile = 300 // events per log file + totalEvents = numDirs * filesPerDir * eventsPerFile // 21000 + fileReadDelay = 5 * time.Millisecond + sendDelay = 50 * time.Microsecond + ) + + // Generate partitioned OLD events grouped into file-sized bursts + baseTs := time.Now().Add(-time.Hour).UnixNano() + type logFile []*filer_pb.SubscribeMetadataResponse + // partitions[dir][file][event] + partitions := make([][]logFile, numDirs) + var allFiles []logFile + idx := 0 + for d := 0; d < numDirs; d++ { + dir := fmt.Sprintf("/bucket/dir%03d", d) + for f := 0; f < filesPerDir; f++ { + file := make(logFile, eventsPerFile) + for i := 0; i < eventsPerFile; i++ { + file[i] = makeEvent(dir, fmt.Sprintf("file%06d.txt", idx), baseTs+int64(idx)) + idx++ + } + partitions[d] = append(partitions[d], file) + allFiles = append(allFiles, file) + } + } + + // simulatePipeline: read files with I/O delay, push events, send via pipelinedSender + simulatePipeline := func(files []logFile) (eventsSent, sends int64, elapsed time.Duration, err error) { + stream := &slowStream{sendDelay: sendDelay} + sender := newPipelinedSender(stream, 1024, true) + + start := time.Now() + outer: + for _, file := range files { + time.Sleep(fileReadDelay) // volume server read + for _, ev := range file { + if err = sender.Send(ev); err != nil { + break outer + } + } + } + if closeErr := sender.Close(); closeErr != nil && err == nil { + err = closeErr + } + elapsed = time.Since(start) + eventsSent = atomic.LoadInt64(&stream.eventsSent) + sends = atomic.LoadInt64(&stream.sends) + return + } + + var singleRate float64 + t.Run("1_pipelined_stream", func(t *testing.T) { + eventsSent, sends, elapsed, err := simulatePipeline(allFiles) + if err != nil { + t.Fatalf("pipeline error: %v", err) + } + singleRate = float64(eventsSent) / elapsed.Seconds() + t.Logf("1 stream: %5d events %4d sends %v %7.0f events/sec", + eventsSent, sends, elapsed.Round(time.Millisecond), singleRate) + }) + + var parallelRate float64 + t.Run("10_pipelined_streams", func(t *testing.T) { + var totalEventsSent, totalSends int64 + var wg sync.WaitGroup + + start := time.Now() + for d := 0; d < numDirs; d++ { + wg.Add(1) + go func(files []logFile) { + defer wg.Done() + eventsSent, sends, _, _ := simulatePipeline(files) + atomic.AddInt64(&totalEventsSent, eventsSent) + atomic.AddInt64(&totalSends, sends) + }(partitions[d]) + } + wg.Wait() + elapsed := time.Since(start) + + parallelRate = float64(totalEventsSent) / elapsed.Seconds() + t.Logf("%d streams: %5d events %4d sends %v %7.0f events/sec", + numDirs, totalEventsSent, totalSends, elapsed.Round(time.Millisecond), parallelRate) + }) + + if singleRate > 0 && parallelRate > 0 { + t.Logf("Speedup: %.1fx (%d parallel pipelined streams vs 1)", parallelRate/singleRate, numDirs) + } +}