From 24eacd9eee41fc125024fba17dc8893b0e82f7d3 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 18 Nov 2025 21:43:18 -0800 Subject: [PATCH] error handling for adding to log buffer --- weed/filer/filer_notify.go | 4 +- weed/filer/meta_aggregator.go | 5 +- weed/mq/broker/broker_grpc_pub_follow.go | 5 +- weed/mq/broker/broker_log_buffer_offset.go | 4 +- weed/mq/topic/local_partition.go | 4 +- weed/mq/topic/local_partition_offset.go | 4 +- weed/util/log_buffer/log_buffer.go | 68 ++++++++++++------- .../log_buffer/log_buffer_corruption_test.go | 4 +- .../log_buffer/log_buffer_flush_gap_test.go | 43 ++++++++---- .../log_buffer_queryability_test.go | 20 ++++-- weed/util/log_buffer/log_buffer_test.go | 36 ++++++---- .../log_buffer/log_read_integration_test.go | 18 +++-- .../log_buffer/log_read_stateless_test.go | 36 +++++++--- weed/util/log_buffer/log_read_test.go | 4 +- 14 files changed, 176 insertions(+), 79 deletions(-) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 2921d709b..845a0678e 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -83,7 +83,9 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica return } - f.LocalMetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs) + if err := f.LocalMetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs); err != nil { + glog.Errorf("failed to add data to log buffer for %s: %v", dir, err) + } } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 1ea334224..0fc64a947 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -172,7 +172,10 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs) + if err := ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs); err != nil { + glog.Errorf("failed to add data to log buffer for %s: %v", dir, err) + return err + } if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 117dc4f87..ec8e4ecf1 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -53,7 +53,10 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi // TODO: change this to DataMessage // log the message - logBuffer.AddToBuffer(dataMessage) + if err := logBuffer.AddToBuffer(dataMessage); err != nil { + glog.Errorf("Failed to add message to log buffer: %v", err) + return fmt.Errorf("failed to add message to log buffer: %w", err) + } // send back the ack if err := stream.Send(&mq_pb.PublishFollowMeResponse{ diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go index aeb8fad1b..565cb0aa9 100644 --- a/weed/mq/broker/broker_log_buffer_offset.go +++ b/weed/mq/broker/broker_log_buffer_offset.go @@ -73,7 +73,9 @@ func (b *MessageQueueBroker) addLogEntryToBuffer( // Use the new AddLogEntryToBuffer method to preserve offset information // This ensures the offset is maintained throughout the entire data flow - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + return err + } return nil } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 5f5c2278f..c8d1f119d 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -68,7 +68,9 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log } func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { - p.LogBuffer.AddToBuffer(message) + if err := p.LogBuffer.AddToBuffer(message); err != nil { + return fmt.Errorf("failed to add message to log buffer: %w", err) + } p.UpdateActivity() // Track publish activity for idle cleanup // maybe send to the follower diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go index e15234ca0..ddbc132db 100644 --- a/weed/mq/topic/local_partition_offset.go +++ b/weed/mq/topic/local_partition_offset.go @@ -62,7 +62,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse } // Add the entry to the buffer in a way that preserves offset on disk and in-memory - p.LogBuffer.AddLogEntryToBuffer(logEntry) + if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil { + return fmt.Errorf("failed to add log entry to buffer: %w", err) + } return nil } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 6c71a971e..22e69cc60 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -245,24 +245,28 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn return nil } -func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { - logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) +func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error { + return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) } // AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information -func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { +func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error { var toFlush *dataToFlush + var marshalErr error logBuffer.Lock() defer func() { logBuffer.Unlock() if toFlush != nil { logBuffer.flushChan <- toFlush } - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() + // Only notify if there was no error + if marshalErr == nil { + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() } - // Notify all registered subscribers instantly (<1ms latency) - logBuffer.notifySubscribers() }() processingTsNs := logEntry.TsNs @@ -278,10 +282,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { logBuffer.LastTsNs.Store(processingTsNs) } - logEntryData, marshalErr := proto.Marshal(logEntry) - if marshalErr != nil { - glog.Errorf("Failed to marshal LogEntry: %v", marshalErr) - return + logEntryData, err := proto.Marshal(logEntry) + if err != nil { + marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) + glog.Errorf("%v", marshalErr) + return marshalErr } size := len(logEntryData) @@ -316,7 +321,9 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - return + marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) + glog.Errorf("%v", marshalErr) + return marshalErr } // Safe to compute now that we've validated size is in valid range newSize := 2*size + 4 @@ -332,9 +339,10 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { logBuffer.pos += size + 4 logBuffer.offset++ + return nil } -func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error { // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock var ts time.Time @@ -353,17 +361,21 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } var toFlush *dataToFlush + var marshalErr error logBuffer.Lock() defer func() { logBuffer.Unlock() if toFlush != nil { logBuffer.flushChan <- toFlush } - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() + // Only notify if there was no error + if marshalErr == nil { + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() } - // Notify all registered subscribers instantly (<1ms latency) - logBuffer.notifySubscribers() }() // Handle timestamp collision inside lock (rare case) @@ -381,10 +393,11 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin logEntry.Offset = logBuffer.offset // Marshal with correct timestamp and offset - logEntryData, marshalErr := proto.Marshal(logEntry) - if marshalErr != nil { - glog.Errorf("Failed to marshal LogEntry: %v", marshalErr) - return + logEntryData, err := proto.Marshal(logEntry) + if err != nil { + marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) + glog.Errorf("%v", marshalErr) + return marshalErr } size := len(logEntryData) @@ -418,7 +431,9 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - return + marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) + glog.Errorf("%v", marshalErr) + return marshalErr } // Safe to compute now that we've validated size is in valid range newSize := 2*size + 4 @@ -434,6 +449,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin logBuffer.pos += size + 4 logBuffer.offset++ + return nil } func (logBuffer *LogBuffer) IsStopping() bool { @@ -851,15 +867,15 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) { } func readTs(buf []byte, pos int) (size int, ts int64, err error) { - // Bounds check for size field - if pos+4 > len(buf) { + // Bounds check for size field (overflow-safe) + if pos < 0 || pos > len(buf)-4 { return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf)) } size = int(util.BytesToUint32(buf[pos : pos+4])) - // Bounds check for entry data - if pos+4+size > len(buf) { + // Bounds check for entry data (overflow-safe, protects against negative size) + if size < 0 || size > len(buf)-pos-4 { return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf)) } diff --git a/weed/util/log_buffer/log_buffer_corruption_test.go b/weed/util/log_buffer/log_buffer_corruption_test.go index c0f0a33d8..c6f0b0591 100644 --- a/weed/util/log_buffer/log_buffer_corruption_test.go +++ b/weed/util/log_buffer/log_buffer_corruption_test.go @@ -102,7 +102,9 @@ func TestReadFromBufferCorruption(t *testing.T) { TsNs: 1000, Key: validKey, }) - lb.AddDataToBuffer(validKey, validData, 1000) + if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil { + t.Fatalf("Failed to add data to buffer: %v", err) + } // Manually corrupt the buffer by writing garbage // This simulates a corruption scenario diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go index bc40ea6df..fc6ab27b4 100644 --- a/weed/util/log_buffer/log_buffer_flush_gap_test.go +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -69,11 +69,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { t.Logf("Sending %d messages...", messageCount) for i := 0; i < messageCount; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Force flush multiple times to simulate real workload @@ -82,11 +84,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { // Add more messages after flush for i := messageCount; i < messageCount+50; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Force another flush @@ -209,11 +213,13 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { // Send 20 messages for i := 0; i < 20; i++ { offset := int64(batch*20 + i) - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", offset)), Value: []byte(fmt.Sprintf("message-%d", offset)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Check state before flush @@ -285,11 +291,14 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 200; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Logf("Failed to add buffer: %v", err) + return + } if i%50 == 0 { time.Sleep(10 * time.Millisecond) } @@ -389,7 +398,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: nextKafkaOffset, // Explicit Kafka offset } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } nextKafkaOffset++ } @@ -422,7 +433,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: nextKafkaOffset, } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } nextKafkaOffset++ } @@ -546,7 +559,9 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: i, } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Flush (moves data to disk) @@ -616,11 +631,13 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { // Add 10 messages for i := 0; i < 10; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)), Value: []byte(fmt.Sprintf("data-%d-%d", round, i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Check state after adding diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go index 16dd0f9b0..4774f25d8 100644 --- a/weed/util/log_buffer/log_buffer_queryability_test.go +++ b/weed/util/log_buffer/log_buffer_queryability_test.go @@ -39,7 +39,9 @@ func TestBufferQueryability(t *testing.T) { } // Add the entry to the buffer - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } // Verify the buffer has data if logBuffer.pos == 0 { @@ -122,7 +124,9 @@ func TestMultipleEntriesQueryability(t *testing.T) { Key: []byte("test-key-" + string(rune('0'+i))), Offset: int64(i), } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Read all entries @@ -197,7 +201,9 @@ func TestSchemaRegistryScenario(t *testing.T) { } // Add to buffer - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } // Simulate the SQL query scenario - read from offset 0 startPosition := NewMessagePosition(0, 0) @@ -255,7 +261,9 @@ func TestTimeBasedFirstReadBeforeEarliest(t *testing.T) { // Seed one entry so earliestTime is set baseTs := time.Now().Add(-time.Second) entry := &filer_pb.LogEntry{TsNs: baseTs.UnixNano(), Data: []byte("x"), Key: []byte("k"), Offset: 0} - logBuffer.AddLogEntryToBuffer(entry) + if err := logBuffer.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } _ = flushed // Start read 1ns before earliest memory, with offset sentinel (-2) @@ -280,7 +288,9 @@ func TestEarliestTimeExactRead(t *testing.T) { ts := time.Now() entry := &filer_pb.LogEntry{TsNs: ts.UnixNano(), Data: []byte("a"), Key: []byte("k"), Offset: 0} - logBuffer.AddLogEntryToBuffer(entry) + if err := logBuffer.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } startPos := NewMessagePosition(ts.UnixNano(), -2) buf, _, err := logBuffer.ReadFromBuffer(startPos) diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 7b851de06..2c24a608c 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -52,11 +52,13 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(&mq_pb.DataMessage{ + if err := lb.AddToBuffer(&mq_pb.DataMessage{ Key: nil, Value: buf, TsNs: 0, - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } wg.Wait() @@ -139,14 +141,16 @@ func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) { // Add some data to the buffer if needed (at current offset position) if tt.hasData { - testData := []byte("test message") - // Use AddLogEntryToBuffer to preserve offset information - lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ - TsNs: time.Now().UnixNano(), - Key: []byte("key"), - Data: testData, - Offset: tt.currentOffset, // Add data at current offset - }) + testData := []byte("test message") + // Use AddLogEntryToBuffer to preserve offset information + if err := lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: testData, + Offset: tt.currentOffset, // Add data at current offset + }); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Create an offset-based position for the requested offset @@ -365,11 +369,13 @@ func TestReadFromBuffer_InitializedFromDisk(t *testing.T) { lb.offset, lb.bufferStartOffset) // Now write a new message at offset 4 - lb.AddToBuffer(&mq_pb.DataMessage{ + if err := lb.AddToBuffer(&mq_pb.DataMessage{ Key: []byte("new-key"), Value: []byte("new-message-at-offset-4"), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } // After AddToBuffer: offset=5, pos>0 // Schema Registry tries to read offset 0 (should be on disk) @@ -503,11 +509,13 @@ func TestLoopProcessLogDataWithOffset_DiskReadRetry(t *testing.T) { // Now add data and flush it t.Logf("➕ Adding message to buffer...") - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte("key-0"), Value: []byte("message-0"), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } // Force flush t.Logf("Force flushing...") diff --git a/weed/util/log_buffer/log_read_integration_test.go b/weed/util/log_buffer/log_read_integration_test.go index 38549b9f7..8970ca683 100644 --- a/weed/util/log_buffer/log_read_integration_test.go +++ b/weed/util/log_buffer/log_read_integration_test.go @@ -31,7 +31,10 @@ func TestConcurrentProducerConsumer(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Errorf("Failed to add log entry: %v", err) + return + } time.Sleep(1 * time.Millisecond) // Simulate production rate } producerDone <- true @@ -130,7 +133,10 @@ func TestBackwardSeeksWhileProducing(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Errorf("Failed to add log entry: %v", err) + return + } time.Sleep(1 * time.Millisecond) } producerDone <- true @@ -216,7 +222,9 @@ func TestHighConcurrencyReads(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Start many concurrent readers at different offsets @@ -286,7 +294,9 @@ func TestRepeatedReadsAtSameOffset(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Read the same offset multiple times concurrently diff --git a/weed/util/log_buffer/log_read_stateless_test.go b/weed/util/log_buffer/log_read_stateless_test.go index 948a929ba..6c9206eb4 100644 --- a/weed/util/log_buffer/log_read_stateless_test.go +++ b/weed/util/log_buffer/log_read_stateless_test.go @@ -45,7 +45,9 @@ func TestReadMessagesAtOffset_SingleMessage(t *testing.T) { Data: []byte("value1"), Offset: 0, } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } // Read from offset 0 messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 1024) @@ -82,7 +84,9 @@ func TestReadMessagesAtOffset_MultipleMessages(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // Read from offset 0, max 3 messages @@ -118,7 +122,9 @@ func TestReadMessagesAtOffset_StartFromMiddle(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // Read from offset 5 @@ -155,7 +161,9 @@ func TestReadMessagesAtOffset_MaxBytesLimit(t *testing.T) { Data: make([]byte, 100), // 100 bytes Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // Request with max 250 bytes (should get ~2 messages) @@ -186,7 +194,9 @@ func TestReadMessagesAtOffset_ConcurrentReads(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // Start 10 concurrent readers at different offsets @@ -238,7 +248,9 @@ func TestReadMessagesAtOffset_FutureOffset(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // Try to read from offset 10 (future) @@ -269,7 +281,9 @@ func TestWaitForDataWithTimeout_DataAvailable(t *testing.T) { Data: []byte("value"), Offset: 0, } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } // Wait for data at offset 0 (should return immediately) dataAvailable := lb.WaitForDataWithTimeout(0, 100) @@ -321,7 +335,9 @@ func TestWaitForDataWithTimeout_DataArrives(t *testing.T) { Data: []byte("value"), Offset: 0, } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } // Wait for result <-done @@ -349,7 +365,9 @@ func TestGetHighWaterMark(t *testing.T) { Data: []byte("value"), Offset: int64(i), } - lb.AddLogEntryToBuffer(entry) + if err := lb.AddLogEntryToBuffer(entry); err != nil { + t.Fatalf("Failed to add log entry to buffer: %v", err) + } } // HWM should be 5 (next offset to write, not last written offset) diff --git a/weed/util/log_buffer/log_read_test.go b/weed/util/log_buffer/log_read_test.go index f01e2912a..802dcdacf 100644 --- a/weed/util/log_buffer/log_read_test.go +++ b/weed/util/log_buffer/log_read_test.go @@ -171,7 +171,9 @@ func TestLoopProcessLogDataWithOffset_WithData(t *testing.T) { } for _, msg := range testMessages { - logBuffer.AddToBuffer(msg) + if err := logBuffer.AddToBuffer(msg); err != nil { + t.Fatalf("Failed to add message to buffer: %v", err) + } } receivedCount := 0