Browse Source

fix: keep metadata subscriptions progressing (#8730) (#8746)

* fix: keep metadata subscriptions progressing (#8730)

* test: cancel slow metadata writers with parent context

* filer: ignore missing persisted log chunks
pull/8748/head
Chris Lu 8 hours ago
committed by GitHub
parent
commit
6bf654c25c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 208
      test/metadata_subscribe/metadata_subscribe_integration_test.go
  2. 2
      weed/filer/filer.go
  3. 25
      weed/filer/filer_notify.go
  4. 12
      weed/filer/meta_aggregator.go
  5. 23
      weed/server/filer_grpc_server_sub_meta.go
  6. 20
      weed/util/log_buffer/log_buffer.go
  7. 43
      weed/util/log_buffer/log_buffer_test.go
  8. 13
      weed/util/log_buffer/log_read.go
  9. 41
      weed/util/log_buffer/log_read_test.go
  10. 8
      weed/util/log_buffer/sealed_buffer.go

208
test/metadata_subscribe/metadata_subscribe_integration_test.go

@ -3,6 +3,7 @@ package metadata_subscribe
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"mime/multipart"
@ -27,6 +28,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
)
const slowConsumerMetadataPayloadSize = 4096
// TestMetadataSubscribeBasic tests basic metadata subscription functionality
func TestMetadataSubscribeBasic(t *testing.T) {
if testing.Short() {
@ -680,6 +683,108 @@ func TestMetadataSubscribeMillionUpdates(t *testing.T) {
})
}
func TestMetadataSubscribeSlowConsumerKeepsProgressing(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_slow_consumer_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Logf("Cluster started for slow consumer regression test")
t.Run("single_filer_slow_consumer", func(t *testing.T) {
var receivedCount int64
phaseOneEntries := 6000
phaseTwoEntries := 14000
totalEntries := phaseOneEntries + phaseTwoEntries
minExpected := int64(12000)
errChan := make(chan error, 1)
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
go func() {
err := followMetadataSlowly(
subCtx,
"127.0.0.1:8888",
"/slow-consumer/",
time.Now().Add(-5*time.Second).UnixNano(),
time.Millisecond,
func(resp *filer_pb.SubscribeMetadataResponse) {
if resp.GetEventNotification() == nil {
return
}
entry := resp.GetEventNotification().GetNewEntry()
if entry == nil || entry.IsDirectory {
return
}
atomic.AddInt64(&receivedCount, 1)
},
)
if err != nil && !errors.Is(err, context.Canceled) {
errChan <- err
}
}()
time.Sleep(2 * time.Second)
payload := bytes.Repeat([]byte("x"), slowConsumerMetadataPayloadSize)
startTime := time.Now()
require.NoError(t, createMetadataEntries(ctx, "127.0.0.1:8888", 0, phaseOneEntries, payload))
t.Logf("Created phase 1 with %d entries in %v", phaseOneEntries, time.Since(startTime))
time.Sleep(2 * time.Second)
require.NoError(t, createMetadataEntries(ctx, "127.0.0.1:8888", phaseOneEntries, phaseTwoEntries, payload))
t.Logf("Created phase 2 with %d entries", phaseTwoEntries)
checkTicker := time.NewTicker(2 * time.Second)
defer checkTicker.Stop()
deadline := time.NewTimer(45 * time.Second)
defer deadline.Stop()
lastReceived := int64(-1)
stableChecks := 0
for {
select {
case err := <-errChan:
t.Fatalf("slow consumer subscription error: %v", err)
case <-deadline.C:
t.Fatalf("timed out waiting for slow consumer progress, received %d/%d", atomic.LoadInt64(&receivedCount), totalEntries)
case <-checkTicker.C:
received := atomic.LoadInt64(&receivedCount)
t.Logf("Slow consumer progress: %d/%d", received, totalEntries)
if received >= minExpected {
return
}
if received == lastReceived {
stableChecks++
if stableChecks >= 4 {
t.Fatalf("slow consumer stalled at %d/%d after writes completed", received, totalEntries)
}
} else {
stableChecks = 0
}
lastReceived = received
}
}
})
}
// Helper types and functions
type TestCluster struct {
@ -915,3 +1020,106 @@ func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathP
}
})
}
func followMetadataSlowly(
ctx context.Context,
filerGrpcAddress, pathPrefix string,
sinceNs int64,
delay time.Duration,
onEvent func(resp *filer_pb.SubscribeMetadataResponse),
) error {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if grpcDialOption == nil {
grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
}
option := &pb.MetadataFollowOption{
ClientName: "slow_consumer_test",
ClientId: util.RandomInt32(),
ClientEpoch: int32(time.Now().Unix()),
PathPrefix: pathPrefix,
StartTsNs: sinceNs,
EventErrorType: pb.DontLogError,
}
return pb.FollowMetadata(pb.ServerAddress(filerGrpcAddress), grpcDialOption, option, func(resp *filer_pb.SubscribeMetadataResponse) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
onEvent(resp)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
return nil
}
})
}
func createMetadataEntries(ctx context.Context, filerGrpcAddress string, startIndex, total int, payload []byte) error {
const workers = 10
grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
errCh := make(chan error, workers)
var wg sync.WaitGroup
for workerID := 0; workerID < workers; workerID++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
err := pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
for idx := startIndex + workerID; idx < startIndex+total; idx += workers {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
dir := fmt.Sprintf("/slow-consumer/bucket-%02d", idx%6)
name := fmt.Sprintf("entry-%05d", idx)
_, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(len(payload)),
Mtime: time.Now().Unix(),
FileMode: 0644,
Uid: 1000,
Gid: 1000,
},
Extended: map[string][]byte{
"payload": payload,
},
},
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
errCh <- err
}
}(workerID)
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
return err
}
}
return nil
}

2
weed/filer/filer.go

@ -81,7 +81,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
f.UniqueFilerId = -f.UniqueFilerId
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, nil, notifyFn)
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, f.readPersistedLogBufferPosition, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication

25
weed/filer/filer_notify.go

@ -2,8 +2,10 @@ package filer
import (
"context"
"errors"
"fmt"
"io"
nethttp "net/http"
"regexp"
"strings"
"time"
@ -16,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/notification"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) {
@ -174,6 +177,7 @@ func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTim
var (
volumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
chunkNotFoundPattern = regexp.MustCompile(`(urls not found|File Not Found)`)
httpNotFoundPattern = regexp.MustCompile(`404 Not Found: not found`)
)
// isChunkNotFoundError checks if the error indicates that a volume or chunk
@ -183,8 +187,13 @@ func isChunkNotFoundError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, util_http.ErrNotFound) || errors.Is(err, nethttp.ErrMissingFile) {
return true
}
errMsg := err.Error()
return volumeNotFoundPattern.MatchString(errMsg) || chunkNotFoundPattern.MatchString(errMsg)
return volumeNotFoundPattern.MatchString(errMsg) ||
chunkNotFoundPattern.MatchString(errMsg) ||
httpNotFoundPattern.MatchString(errMsg)
}
func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
@ -220,3 +229,17 @@ func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition,
return
}
func (f *Filer) readPersistedLogBufferPosition(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
lastReadPosition = startPosition
lastTsNs, isDone, err := f.ReadPersistedLogBuffer(startPosition, stopTsNs, eachLogEntryFn)
if err != nil {
return lastReadPosition, isDone, err
}
if lastTsNs != 0 {
lastReadPosition = log_buffer.NewMessagePosition(lastTsNs, 1)
}
return lastReadPosition, isDone, nil
}

12
weed/filer/meta_aggregator.go

@ -74,6 +74,18 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, star
}
}
func (ma *MetaAggregator) HasRemotePeers() bool {
ma.peerChansLock.Lock()
defer ma.peerChansLock.Unlock()
for address := range ma.peerChans {
if address != ma.self {
return true
}
}
return false
}
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {

23
weed/server/filer_grpc_server_sub_meta.go

@ -24,6 +24,9 @@ const (
)
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
if fs.filer.MetaAggregator == nil || !fs.filer.MetaAggregator.HasRemotePeers() {
return fs.SubscribeLocalMetadata(req, stream)
}
ctx := stream.Context()
peerAddress := findClientAddress(ctx, 0)
@ -99,18 +102,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
return false
default:
}
fs.filer.MetaAggregator.ListenersLock.Lock()
atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1)
fs.filer.MetaAggregator.ListenersCond.Wait()
atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1)
fs.filer.MetaAggregator.ListenersLock.Unlock()
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
@ -237,23 +233,12 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
return false
default:
}
fs.listenersLock.Lock()
atomic.AddInt64(&fs.listenersWaits, 1)
fs.listenersCond.Wait()
atomic.AddInt64(&fs.listenersWaits, -1)
fs.listenersLock.Unlock()
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
return false
}
return true
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {

20
weed/util/log_buffer/log_buffer.go

@ -606,6 +606,22 @@ func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() {
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
func (logBuffer *LogBuffer) HasData() bool {
logBuffer.RLock()
defer logBuffer.RUnlock()
if logBuffer.pos > 0 {
return true
}
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.size > 0 {
return true
}
}
return false
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
@ -771,7 +787,9 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
if pos < buf.size {
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
}
// If current buffer is not empty, return it

43
weed/util/log_buffer/log_buffer_test.go

@ -1,6 +1,7 @@
package log_buffer
import (
"bytes"
"crypto/rand"
"fmt"
"io"
@ -67,6 +68,48 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
}
}
func TestReadFromBufferTimestampBased_AfterFlushReturnsNewerData(t *testing.T) {
lb := NewLogBuffer("test", time.Hour, nil, nil, nil)
defer lb.ShutdownLogBuffer()
payload := bytes.Repeat([]byte("x"), 4096)
var sealed *MemBuffer
for i := 0; i < 5000; i++ {
if err := lb.AddDataToBuffer([]byte("k"), payload, int64(i+1)); err != nil {
t.Fatalf("AddDataToBuffer(%d): %v", i, err)
}
candidate := lb.prevBuffers.buffers[len(lb.prevBuffers.buffers)-1]
if candidate.size > 0 {
sealed = &MemBuffer{
size: candidate.size,
startTime: candidate.startTime,
stopTime: candidate.stopTime,
offset: candidate.offset,
}
break
}
}
if sealed == nil {
t.Fatal("expected first buffer flush to produce a sealed buffer")
}
for i := 5000; i < 5100; i++ {
if err := lb.AddDataToBuffer([]byte("k"), payload, int64(i+1)); err != nil {
t.Fatalf("AddDataToBuffer(%d): %v", i, err)
}
}
buf, _, err := lb.ReadFromBuffer(NewMessagePosition(sealed.stopTime.UnixNano(), sealed.offset))
if err != nil {
t.Fatalf("ReadFromBuffer returned error: %v", err)
}
if buf == nil || buf.Len() == 0 {
t.Fatalf("expected newer data after the first sealed buffer, got %v", buf)
}
}
// TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset
// that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever.
// This reproduces the bug where Schema Registry couldn't read the _schemas topic.

13
weed/util/log_buffer/log_read.go

@ -77,6 +77,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if err == ResumeFromDiskError {
// Try to read from disk if readFromDiskFn is available
if logBuffer.ReadFromDiskFn != nil {
prevReadPosition := lastReadPosition
lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, eachLogDataFn)
if err != nil {
return lastReadPosition, isDone, err
@ -84,6 +85,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if isDone {
return lastReadPosition, isDone, nil
}
if lastReadPosition != prevReadPosition {
continue
}
} else if logBuffer.HasData() {
return lastReadPosition, isDone, ResumeFromDiskError
}
// CRITICAL: Check if client is still connected
@ -261,6 +267,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
if err == ResumeFromDiskError {
// Try to read from disk if readFromDiskFn is available
if logBuffer.ReadFromDiskFn != nil {
prevReadPosition := lastReadPosition
// Wrap eachLogDataFn to match the expected signature
diskReadFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
return eachLogDataFn(logEntry, logEntry.Offset)
@ -272,7 +279,11 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
if isDone {
return lastReadPosition, isDone, nil
}
// Continue to next iteration after disk read
if lastReadPosition != prevReadPosition {
continue
}
} else if logBuffer.HasData() {
return lastReadPosition, isDone, ResumeFromDiskError
}
// CRITICAL: Check if client is still connected after disk read

41
weed/util/log_buffer/log_read_test.go

@ -307,6 +307,47 @@ func TestLoopProcessLogDataWithOffset_StopTime(t *testing.T) {
t.Logf("Loop correctly exited for past stopTsNs in %v (waitForDataFn called %d times)", elapsed, callCount)
}
func TestLoopProcessLogData_SlowConsumerFallsBehind(t *testing.T) {
flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
defer logBuffer.ShutdownLogBuffer()
baseTime := time.Now()
for i := 0; i < 1000; i++ {
ts := baseTime.Add(time.Duration(i) * time.Millisecond)
if err := logBuffer.AddDataToBuffer([]byte("key"), []byte("value"), ts.UnixNano()); err != nil {
t.Fatalf("AddDataToBuffer(%d): %v", i, err)
}
}
oldPosition := NewMessagePosition(baseTime.Add(-10*time.Second).UnixNano(), 1)
waitForDataFn := func() bool {
t.Errorf("waitForDataFn should not be called for a slow consumer that has fallen behind")
return false
}
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
return false, nil
}
done := make(chan struct{})
var err error
go func() {
_, _, err = logBuffer.LoopProcessLogData("slow-consumer", oldPosition, 0, waitForDataFn, eachLogEntryFn)
close(done)
}()
select {
case <-done:
if err != ResumeFromDiskError {
t.Fatalf("expected ResumeFromDiskError, got %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("LoopProcessLogData blocked instead of returning ResumeFromDiskError")
}
}
// BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer benchmarks the performance
// of the loop with an empty buffer to ensure no busy-waiting
func BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer(b *testing.B) {

8
weed/util/log_buffer/sealed_buffer.go

@ -32,7 +32,7 @@ func newSealedBuffers(size int) *SealedBuffers {
}
func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, startOffset int64, endOffset int64) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0]
oldBuf := sbs.buffers[0].buf
size := len(sbs.buffers)
for i := 0; i < size-1; i++ {
sbs.buffers[i].buf = sbs.buffers[i+1].buf
@ -48,12 +48,12 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte,
sbs.buffers[size-1].stopTime = stopTime
sbs.buffers[size-1].startOffset = startOffset
sbs.buffers[size-1].offset = endOffset
return oldMemBuffer.buf
return oldBuf
}
func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int, err error) {
lastReadTs := lastReadTime.UnixNano()
for pos < len(mb.buf) {
for pos < mb.size {
size, t, readErr := readTs(mb.buf, pos)
if readErr != nil {
// Return error if buffer is corrupted
@ -64,7 +64,7 @@ func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int, err error) {
}
pos += size + 4
}
return len(mb.buf), nil
return mb.size, nil
}
func (mb *MemBuffer) String() string {

Loading…
Cancel
Save