diff --git a/test/metadata_subscribe/metadata_subscribe_integration_test.go b/test/metadata_subscribe/metadata_subscribe_integration_test.go index 9d8b7c39b..54bebbb90 100644 --- a/test/metadata_subscribe/metadata_subscribe_integration_test.go +++ b/test/metadata_subscribe/metadata_subscribe_integration_test.go @@ -3,6 +3,7 @@ package metadata_subscribe import ( "bytes" "context" + "errors" "fmt" "io" "mime/multipart" @@ -27,6 +28,8 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +const slowConsumerMetadataPayloadSize = 4096 + // TestMetadataSubscribeBasic tests basic metadata subscription functionality func TestMetadataSubscribeBasic(t *testing.T) { if testing.Short() { @@ -680,6 +683,108 @@ func TestMetadataSubscribeMillionUpdates(t *testing.T) { }) } +func TestMetadataSubscribeSlowConsumerKeepsProgressing(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_slow_consumer_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Logf("Cluster started for slow consumer regression test") + + t.Run("single_filer_slow_consumer", func(t *testing.T) { + var receivedCount int64 + phaseOneEntries := 6000 + phaseTwoEntries := 14000 + totalEntries := phaseOneEntries + phaseTwoEntries + minExpected := int64(12000) + errChan := make(chan error, 1) + + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + go func() { + err := followMetadataSlowly( + subCtx, + "127.0.0.1:8888", + "/slow-consumer/", + time.Now().Add(-5*time.Second).UnixNano(), + time.Millisecond, + func(resp *filer_pb.SubscribeMetadataResponse) { + if resp.GetEventNotification() == nil { + return + } + entry := resp.GetEventNotification().GetNewEntry() + if entry == nil || entry.IsDirectory { + return + } + atomic.AddInt64(&receivedCount, 1) + }, + ) + if err != nil && !errors.Is(err, context.Canceled) { + errChan <- err + } + }() + + time.Sleep(2 * time.Second) + + payload := bytes.Repeat([]byte("x"), slowConsumerMetadataPayloadSize) + startTime := time.Now() + require.NoError(t, createMetadataEntries(ctx, "127.0.0.1:8888", 0, phaseOneEntries, payload)) + t.Logf("Created phase 1 with %d entries in %v", phaseOneEntries, time.Since(startTime)) + + time.Sleep(2 * time.Second) + + require.NoError(t, createMetadataEntries(ctx, "127.0.0.1:8888", phaseOneEntries, phaseTwoEntries, payload)) + t.Logf("Created phase 2 with %d entries", phaseTwoEntries) + + checkTicker := time.NewTicker(2 * time.Second) + defer checkTicker.Stop() + deadline := time.NewTimer(45 * time.Second) + defer deadline.Stop() + + lastReceived := int64(-1) + stableChecks := 0 + + for { + select { + case err := <-errChan: + t.Fatalf("slow consumer subscription error: %v", err) + case <-deadline.C: + t.Fatalf("timed out waiting for slow consumer progress, received %d/%d", atomic.LoadInt64(&receivedCount), totalEntries) + case <-checkTicker.C: + received := atomic.LoadInt64(&receivedCount) + t.Logf("Slow consumer progress: %d/%d", received, totalEntries) + if received >= minExpected { + return + } + if received == lastReceived { + stableChecks++ + if stableChecks >= 4 { + t.Fatalf("slow consumer stalled at %d/%d after writes completed", received, totalEntries) + } + } else { + stableChecks = 0 + } + lastReceived = received + } + } + }) +} + // Helper types and functions type TestCluster struct { @@ -915,3 +1020,106 @@ func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathP } }) } + +func followMetadataSlowly( + ctx context.Context, + filerGrpcAddress, pathPrefix string, + sinceNs int64, + delay time.Duration, + onEvent func(resp *filer_pb.SubscribeMetadataResponse), +) error { + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + if grpcDialOption == nil { + grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + } + + option := &pb.MetadataFollowOption{ + ClientName: "slow_consumer_test", + ClientId: util.RandomInt32(), + ClientEpoch: int32(time.Now().Unix()), + PathPrefix: pathPrefix, + StartTsNs: sinceNs, + EventErrorType: pb.DontLogError, + } + + return pb.FollowMetadata(pb.ServerAddress(filerGrpcAddress), grpcDialOption, option, func(resp *filer_pb.SubscribeMetadataResponse) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + onEvent(resp) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + return nil + } + }) +} + +func createMetadataEntries(ctx context.Context, filerGrpcAddress string, startIndex, total int, payload []byte) error { + const workers = 10 + + grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + errCh := make(chan error, workers) + var wg sync.WaitGroup + + for workerID := 0; workerID < workers; workerID++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + err := pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + for idx := startIndex + workerID; idx < startIndex+total; idx += workers { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + dir := fmt.Sprintf("/slow-consumer/bucket-%02d", idx%6) + name := fmt.Sprintf("entry-%05d", idx) + + _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(len(payload)), + Mtime: time.Now().Unix(), + FileMode: 0644, + Uid: 1000, + Gid: 1000, + }, + Extended: map[string][]byte{ + "payload": payload, + }, + }, + }) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + errCh <- err + } + }(workerID) + } + + wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + return err + } + } + + return nil +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 63aae9612..537483fd4 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -81,7 +81,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH f.UniqueFilerId = -f.UniqueFilerId } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, nil, notifyFn) + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, f.readPersistedLogBufferPosition, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 48e1b163c..1fb61de81 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -2,8 +2,10 @@ package filer import ( "context" + "errors" "fmt" "io" + nethttp "net/http" "regexp" "strings" "time" @@ -16,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/notification" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) { @@ -174,6 +177,7 @@ func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTim var ( volumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`) chunkNotFoundPattern = regexp.MustCompile(`(urls not found|File Not Found)`) + httpNotFoundPattern = regexp.MustCompile(`404 Not Found: not found`) ) // isChunkNotFoundError checks if the error indicates that a volume or chunk @@ -183,8 +187,13 @@ func isChunkNotFoundError(err error) bool { if err == nil { return false } + if errors.Is(err, util_http.ErrNotFound) || errors.Is(err, nethttp.ErrMissingFile) { + return true + } errMsg := err.Error() - return volumeNotFoundPattern.MatchString(errMsg) || chunkNotFoundPattern.MatchString(errMsg) + return volumeNotFoundPattern.MatchString(errMsg) || + chunkNotFoundPattern.MatchString(errMsg) || + httpNotFoundPattern.MatchString(errMsg) } func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) { @@ -220,3 +229,17 @@ func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, return } + +func (f *Filer) readPersistedLogBufferPosition(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { + lastReadPosition = startPosition + + lastTsNs, isDone, err := f.ReadPersistedLogBuffer(startPosition, stopTsNs, eachLogEntryFn) + if err != nil { + return lastReadPosition, isDone, err + } + if lastTsNs != 0 { + lastReadPosition = log_buffer.NewMessagePosition(lastTsNs, 1) + } + + return lastReadPosition, isDone, nil +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 555791a58..a37d4ab74 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -74,6 +74,18 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, star } } +func (ma *MetaAggregator) HasRemotePeers() bool { + ma.peerChansLock.Lock() + defer ma.peerChansLock.Unlock() + + for address := range ma.peerChans { + if address != ma.self { + return true + } + } + return false +} + func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) { lastTsNs := startFrom.UnixNano() for { diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 29f71edc7..45a82bc29 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,6 +24,9 @@ const ( ) func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { + if fs.filer.MetaAggregator == nil || !fs.filer.MetaAggregator.HasRemotePeers() { + return fs.SubscribeLocalMetadata(req, stream) + } ctx := stream.Context() peerAddress := findClientAddress(ctx, 0) @@ -99,18 +102,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { - // Check if the client has disconnected by monitoring the context select { case <-ctx.Done(): return false default: } - - fs.filer.MetaAggregator.ListenersLock.Lock() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1) - fs.filer.MetaAggregator.ListenersCond.Wait() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1) - fs.filer.MetaAggregator.ListenersLock.Unlock() return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { @@ -237,23 +233,12 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { - - // Check if the client has disconnected by monitoring the context select { case <-ctx.Done(): return false default: } - - fs.listenersLock.Lock() - atomic.AddInt64(&fs.listenersWaits, 1) - fs.listenersCond.Wait() - atomic.AddInt64(&fs.listenersWaits, -1) - fs.listenersLock.Unlock() - if !fs.hasClient(req.ClientId, req.ClientEpoch) { - return false - } - return true + return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 60c0f35d6..471d3e140 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -606,6 +606,22 @@ func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime } + +func (logBuffer *LogBuffer) HasData() bool { + logBuffer.RLock() + defer logBuffer.RUnlock() + + if logBuffer.pos > 0 { + return true + } + for _, buf := range logBuffer.prevBuffers.buffers { + if buf.size > 0 { + return true + } + } + return false +} + func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ Time: logBuffer.startTime, @@ -771,7 +787,9 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err) return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err) } - return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil + if pos < buf.size { + return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil + } } } // If current buffer is not empty, return it diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index d99a8f20c..78a5d74f8 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -1,6 +1,7 @@ package log_buffer import ( + "bytes" "crypto/rand" "fmt" "io" @@ -67,6 +68,48 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { } } +func TestReadFromBufferTimestampBased_AfterFlushReturnsNewerData(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, nil) + defer lb.ShutdownLogBuffer() + + payload := bytes.Repeat([]byte("x"), 4096) + var sealed *MemBuffer + + for i := 0; i < 5000; i++ { + if err := lb.AddDataToBuffer([]byte("k"), payload, int64(i+1)); err != nil { + t.Fatalf("AddDataToBuffer(%d): %v", i, err) + } + candidate := lb.prevBuffers.buffers[len(lb.prevBuffers.buffers)-1] + if candidate.size > 0 { + sealed = &MemBuffer{ + size: candidate.size, + startTime: candidate.startTime, + stopTime: candidate.stopTime, + offset: candidate.offset, + } + break + } + } + + if sealed == nil { + t.Fatal("expected first buffer flush to produce a sealed buffer") + } + + for i := 5000; i < 5100; i++ { + if err := lb.AddDataToBuffer([]byte("k"), payload, int64(i+1)); err != nil { + t.Fatalf("AddDataToBuffer(%d): %v", i, err) + } + } + + buf, _, err := lb.ReadFromBuffer(NewMessagePosition(sealed.stopTime.UnixNano(), sealed.offset)) + if err != nil { + t.Fatalf("ReadFromBuffer returned error: %v", err) + } + if buf == nil || buf.Len() == 0 { + t.Fatalf("expected newer data after the first sealed buffer, got %v", buf) + } +} + // TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset // that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever. // This reproduces the bug where Schema Registry couldn't read the _schemas topic. diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index dfc91cb25..11cda011f 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -77,6 +77,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition if err == ResumeFromDiskError { // Try to read from disk if readFromDiskFn is available if logBuffer.ReadFromDiskFn != nil { + prevReadPosition := lastReadPosition lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, eachLogDataFn) if err != nil { return lastReadPosition, isDone, err @@ -84,6 +85,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition if isDone { return lastReadPosition, isDone, nil } + if lastReadPosition != prevReadPosition { + continue + } + } else if logBuffer.HasData() { + return lastReadPosition, isDone, ResumeFromDiskError } // CRITICAL: Check if client is still connected @@ -261,6 +267,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star if err == ResumeFromDiskError { // Try to read from disk if readFromDiskFn is available if logBuffer.ReadFromDiskFn != nil { + prevReadPosition := lastReadPosition // Wrap eachLogDataFn to match the expected signature diskReadFn := func(logEntry *filer_pb.LogEntry) (bool, error) { return eachLogDataFn(logEntry, logEntry.Offset) @@ -272,7 +279,11 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star if isDone { return lastReadPosition, isDone, nil } - // Continue to next iteration after disk read + if lastReadPosition != prevReadPosition { + continue + } + } else if logBuffer.HasData() { + return lastReadPosition, isDone, ResumeFromDiskError } // CRITICAL: Check if client is still connected after disk read diff --git a/weed/util/log_buffer/log_read_test.go b/weed/util/log_buffer/log_read_test.go index 802dcdacf..c46d31287 100644 --- a/weed/util/log_buffer/log_read_test.go +++ b/weed/util/log_buffer/log_read_test.go @@ -307,6 +307,47 @@ func TestLoopProcessLogDataWithOffset_StopTime(t *testing.T) { t.Logf("Loop correctly exited for past stopTsNs in %v (waitForDataFn called %d times)", elapsed, callCount) } +func TestLoopProcessLogData_SlowConsumerFallsBehind(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + baseTime := time.Now() + for i := 0; i < 1000; i++ { + ts := baseTime.Add(time.Duration(i) * time.Millisecond) + if err := logBuffer.AddDataToBuffer([]byte("key"), []byte("value"), ts.UnixNano()); err != nil { + t.Fatalf("AddDataToBuffer(%d): %v", i, err) + } + } + + oldPosition := NewMessagePosition(baseTime.Add(-10*time.Second).UnixNano(), 1) + + waitForDataFn := func() bool { + t.Errorf("waitForDataFn should not be called for a slow consumer that has fallen behind") + return false + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + return false, nil + } + + done := make(chan struct{}) + var err error + go func() { + _, _, err = logBuffer.LoopProcessLogData("slow-consumer", oldPosition, 0, waitForDataFn, eachLogEntryFn) + close(done) + }() + + select { + case <-done: + if err != ResumeFromDiskError { + t.Fatalf("expected ResumeFromDiskError, got %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("LoopProcessLogData blocked instead of returning ResumeFromDiskError") + } +} + // BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer benchmarks the performance // of the loop with an empty buffer to ensure no busy-waiting func BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer(b *testing.B) { diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index 109cb3862..e00148500 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -32,7 +32,7 @@ func newSealedBuffers(size int) *SealedBuffers { } func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, startOffset int64, endOffset int64) (newBuf []byte) { - oldMemBuffer := sbs.buffers[0] + oldBuf := sbs.buffers[0].buf size := len(sbs.buffers) for i := 0; i < size-1; i++ { sbs.buffers[i].buf = sbs.buffers[i+1].buf @@ -48,12 +48,12 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, sbs.buffers[size-1].stopTime = stopTime sbs.buffers[size-1].startOffset = startOffset sbs.buffers[size-1].offset = endOffset - return oldMemBuffer.buf + return oldBuf } func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int, err error) { lastReadTs := lastReadTime.UnixNano() - for pos < len(mb.buf) { + for pos < mb.size { size, t, readErr := readTs(mb.buf, pos) if readErr != nil { // Return error if buffer is corrupted @@ -64,7 +64,7 @@ func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int, err error) { } pos += size + 4 } - return len(mb.buf), nil + return mb.size, nil } func (mb *MemBuffer) String() string {