diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index d6513b2a2..aa3ffe29d 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -2,13 +2,20 @@ package broker import ( "fmt" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "sync/atomic" - "time" ) +// LogBufferStart tracks the starting buffer index for a live log file +// Buffer indexes are monotonically increasing, count = number of chunks +type LogBufferStart struct { + StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) +} + func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { partitionDir := topic.PartitionDir(t, p) @@ -21,10 +28,11 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT)) - // TODO append block with more metadata + // Get buffer index (now globally unique across restarts) + bufferIndex := logBuffer.GetBatchIndex() for { - if err := b.appendToFile(targetFile, buf); err != nil { + if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferIndex); err != nil { glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { @@ -40,6 +48,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } - glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) + glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (index %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferIndex) } } diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 9f3c7b50f..4f4ead23f 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -2,16 +2,27 @@ package broker import ( "context" + "encoding/json" "fmt" + "os" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "os" - "time" ) +// LogBufferStart tracks the starting buffer index for a file +type LogBufferStart struct { + StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) +} + func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error { + return b.appendToFileWithBufferIndex(targetFile, data, 0) +} + +func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error { fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data) if err2 != nil { @@ -35,10 +46,51 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error Gid: uint32(os.Getgid()), }, } + + // Add buffer start index for deduplication tracking + if bufferIndex != 0 { + entry.Extended = make(map[string][]byte) + bufferStart := LogBufferStart{ + StartIndex: bufferIndex, + } + startJson, _ := json.Marshal(bufferStart) + entry.Extended["buffer_start"] = startJson + } } else if err != nil { return fmt.Errorf("find %s: %v", fullpath, err) } else { offset = int64(filer.TotalSize(entry.GetChunks())) + + // Verify buffer index continuity for existing files (append operations) + if bufferIndex != 0 { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + // Check for existing buffer start + if existingData, exists := entry.Extended["buffer_start"]; exists { + var bufferStart LogBufferStart + json.Unmarshal(existingData, &bufferStart) + + // Verify that the new buffer index is consecutive + // Expected index = start + number of existing chunks + expectedIndex := bufferStart.StartIndex + int64(len(entry.GetChunks())) + if bufferIndex != expectedIndex { + // This shouldn't happen in normal operation + // Log warning but continue (don't crash the system) + fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n", + expectedIndex, bufferIndex) + } + // Note: We don't update the start index - it stays the same + } else { + // No existing buffer start, create new one (shouldn't happen for existing files) + bufferStart := LogBufferStart{ + StartIndex: bufferIndex, + } + startJson, _ := json.Marshal(bufferStart) + entry.Extended["buffer_start"] = startJson + } + } } // append to existing chunks diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 00ea04eee..dfe7c410f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -3,6 +3,10 @@ package topic import ( "context" "fmt" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -10,9 +14,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "sync" - "sync/atomic" - "time" ) type LocalPartition struct { diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index ab2aeb0f6..792f009a0 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -35,6 +35,12 @@ func withDebugMode(ctx context.Context) context.Context { return context.WithValue(ctx, debugModeKey{}, true) } +// LogBufferStart tracks the starting buffer index for a file +// Buffer indexes are monotonically increasing, count = len(chunks) +type LogBufferStart struct { + StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) +} + // SQLEngine provides SQL query execution capabilities for SeaweedFS // Assumptions: // 1. MQ namespaces map directly to SQL databases @@ -1994,7 +2000,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map return sourceFiles } -// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet +// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { @@ -2009,9 +2015,23 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, actualSourceFiles = parquetSourceFiles } + // Second, get duplicate files from log buffer metadata + logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) + if err != nil { + if isDebugMode(ctx) { + fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) + } + logBufferDuplicates = make(map[string]bool) + } + // Debug: Show deduplication status (only in explain mode) - if isDebugMode(ctx) && len(actualSourceFiles) > 0 { - fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) + if isDebugMode(ctx) { + if len(actualSourceFiles) > 0 { + fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) + } + if len(logBufferDuplicates) > 0 { + fmt.Printf("Excluding %d duplicate log buffer files from %s\n", len(logBufferDuplicates), partitionPath) + } } totalRows := int64(0) @@ -2028,6 +2048,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, return nil } + // Skip files that are duplicated due to log buffer metadata + if logBufferDuplicates[entry.Name] { + if isDebugMode(ctx) { + fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) + } + return nil + } + // Count rows in live log file rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) if err != nil { @@ -2070,6 +2098,96 @@ func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map return sourceFiles, err } +// getLogBufferStartFromFile reads buffer start from file extended attributes +func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) { + if entry.Extended == nil { + return nil, nil + } + + // Only support buffer_start format + if startJson, exists := entry.Extended["buffer_start"]; exists { + var bufferStart LogBufferStart + if err := json.Unmarshal(startJson, &bufferStart); err != nil { + return nil, fmt.Errorf("failed to parse buffer start: %v", err) + } + return &bufferStart, nil + } + + return nil, nil +} + +// buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) +func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { + if e.catalog.brokerClient == nil { + return make(map[string]bool), nil + } + + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return make(map[string]bool), nil // Don't fail the query, just skip deduplication + } + + // Track buffer ranges instead of individual indexes (much more efficient) + type BufferRange struct { + start, end int64 + } + + processedRanges := make([]BufferRange, 0) + duplicateFiles := make(map[string]bool) + + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Get buffer start for this file (most efficient) + bufferStart, err := e.getLogBufferStartFromFile(entry) + if err != nil || bufferStart == nil { + return nil // No buffer info, can't deduplicate + } + + // Calculate range for this file: [start, start + chunkCount - 1] + chunkCount := int64(len(entry.GetChunks())) + if chunkCount == 0 { + return nil // Empty file, skip + } + + fileRange := BufferRange{ + start: bufferStart.StartIndex, + end: bufferStart.StartIndex + chunkCount - 1, + } + + // Check if this range overlaps with any processed range + isDuplicate := false + for _, processedRange := range processedRanges { + if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { + // Ranges overlap - this file contains duplicate buffer indexes + isDuplicate = true + if isDebugMode(ctx) { + fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", + entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) + } + break + } + } + + if isDuplicate { + duplicateFiles[entry.Name] = true + } else { + // Add this range to processed ranges + processedRanges = append(processedRanges, fileRange) + } + + return nil + }) + + if err != nil { + return make(map[string]bool), nil // Don't fail the query + } + + return duplicateFiles, nil +} + // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { lookupFileIdFn := filer.LookupFn(filerClient) diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 00084b6f6..1f0f844d3 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/json" "errors" "testing" @@ -1136,3 +1137,120 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) } + +// Tests for log buffer deduplication functionality +func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) { + engine := NewTestSQLEngine() + + // Create sample buffer start (new ultra-efficient format) + bufferStart := LogBufferStart{StartIndex: 1609459100000000001} + startJson, _ := json.Marshal(bufferStart) + + // Create file entry with buffer start + some chunks + entry := &filer_pb.Entry{ + Name: "test-log-file", + Extended: map[string][]byte{ + "buffer_start": startJson, + }, + Chunks: []*filer_pb.FileChunk{ + {FileId: "chunk1", Offset: 0, Size: 1000}, + {FileId: "chunk2", Offset: 1000, Size: 1000}, + {FileId: "chunk3", Offset: 2000, Size: 1000}, + }, + } + + // Test extraction + result, err := engine.getLogBufferStartFromFile(entry) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, int64(1609459100000000001), result.StartIndex) + + // Test extraction works correctly with the new format +} + +func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) { + engine := NewTestSQLEngine() + + // Create file entry without buffer start + entry := &filer_pb.Entry{ + Name: "test-log-file", + Extended: nil, + } + + // Test extraction + result, err := engine.getLogBufferStartFromFile(entry) + assert.NoError(t, err) + assert.Nil(t, result) +} + +func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) { + engine := NewTestSQLEngine() + + // Create file entry with invalid buffer start + entry := &filer_pb.Entry{ + Name: "test-log-file", + Extended: map[string][]byte{ + "buffer_start": []byte("invalid-json"), + }, + } + + // Test extraction + result, err := engine.getLogBufferStartFromFile(entry) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse buffer start") + assert.Nil(t, result) +} + +func TestSQLEngine_BuildLogBufferDeduplicationMap_NoBrokerClient(t *testing.T) { + engine := NewTestSQLEngine() + engine.catalog.brokerClient = nil // Simulate no broker client + + ctx := context.Background() + result, err := engine.buildLogBufferDeduplicationMap(ctx, "/topics/test/test-topic") + + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Empty(t, result) +} + +func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) { + // Simulate scenario: Buffer indexes are now initialized with process start time + // This tests that buffer start indexes are globally unique across server restarts + + // Before server restart: Process 1 buffer start (3 chunks) + beforeRestartStart := LogBufferStart{ + StartIndex: 1609459100000000000, // Process 1 start time + } + + // After server restart: Process 2 buffer start (3 chunks) + afterRestartStart := LogBufferStart{ + StartIndex: 1609459300000000000, // Process 2 start time (DIFFERENT) + } + + // Simulate 3 chunks for each file + chunkCount := int64(3) + + // Calculate end indexes for range comparison + beforeEnd := beforeRestartStart.StartIndex + chunkCount - 1 // [start, start+2] + afterStart := afterRestartStart.StartIndex // [start, start+2] + + // Test range overlap detection (should NOT overlap) + overlaps := beforeRestartStart.StartIndex <= (afterStart+chunkCount-1) && beforeEnd >= afterStart + assert.False(t, overlaps, "Buffer ranges after restart should not overlap") + + // Verify the start indexes are globally unique + assert.NotEqual(t, beforeRestartStart.StartIndex, afterRestartStart.StartIndex, "Start indexes should be different") + assert.Less(t, beforeEnd, afterStart, "Ranges should be completely separate") + + // Expected values: + // Before restart: [1609459100000000000, 1609459100000000002] + // After restart: [1609459300000000000, 1609459300000000002] + expectedBeforeEnd := int64(1609459100000000002) + expectedAfterStart := int64(1609459300000000000) + + assert.Equal(t, expectedBeforeEnd, beforeEnd) + assert.Equal(t, expectedAfterStart, afterStart) + + // This demonstrates that buffer start indexes initialized with process start time + // prevent false positive duplicates across server restarts +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 8683dfffc..f99f1a7dd 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -63,6 +63,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc notifyFn: notifyFn, flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), + batchIndex: time.Now().UnixNano(), // Initialize with process start time for uniqueness } go lb.loopFlush() go lb.loopInterval() @@ -343,6 +344,20 @@ func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } +// GetName returns the log buffer name for metadata tracking +func (logBuffer *LogBuffer) GetName() string { + logBuffer.RLock() + defer logBuffer.RUnlock() + return logBuffer.name +} + +// GetBatchIndex returns the current batch index for metadata tracking +func (logBuffer *LogBuffer) GetBatchIndex() int64 { + logBuffer.RLock() + defer logBuffer.RUnlock() + return logBuffer.batchIndex +} + var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer)