diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index daac30794..1e52becb3 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -518,7 +518,13 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi } // getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition -// This checks both live log files and Parquet files for the most precise deduplication +// +// This method handles three scenarios for seamless broker querying: +// 1. Live log files exist: Uses their buffer_start metadata (most recent boundaries) +// 2. Only Parquet files exist: Uses Parquet buffer_start metadata (preserved from archived sources) +// 3. Mixed files: Uses earliest buffer_start from all sources for comprehensive coverage +// +// This ensures continuous real-time querying capability even after log file compaction/archival func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { filerClient, err := c.GetFilerClient() if err != nil { @@ -526,6 +532,8 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath } var earliestBufferIndex int64 = -1 // -1 means no buffer_start found + var logFileCount, parquetFileCount int + var bufferStartSources []string // Track which files provide buffer_start err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Skip directories @@ -533,17 +541,38 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath return nil } + // Count file types for scenario detection + if strings.HasSuffix(entry.Name, ".parquet") { + parquetFileCount++ + } else { + logFileCount++ + } + // Extract buffer_start from file extended attributes (both log files and parquet files) bufferStart := c.getBufferStartFromEntry(entry) if bufferStart != nil && bufferStart.StartIndex > 0 { if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { earliestBufferIndex = bufferStart.StartIndex } + bufferStartSources = append(bufferStartSources, entry.Name) } return nil }) + // Debug: Show buffer_start determination logic in EXPLAIN mode + if isDebugMode(ctx) && len(bufferStartSources) > 0 { + if logFileCount == 0 && parquetFileCount > 0 { + fmt.Printf("Debug: Using Parquet buffer_start metadata (no log files) - sources: %v\n", bufferStartSources) + } else if logFileCount > 0 && parquetFileCount > 0 { + fmt.Printf("Debug: Using mixed sources for buffer_start - log files: %d, Parquet files: %d, sources: %v\n", + logFileCount, parquetFileCount, bufferStartSources) + } else { + fmt.Printf("Debug: Using log file buffer_start metadata - sources: %v\n", bufferStartSources) + } + fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) + } + if err != nil { return 0, fmt.Errorf("failed to scan partition directory: %v", err) } diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 1f0f844d3..505d53699 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/binary" "encoding/json" "errors" "testing" @@ -1254,3 +1255,50 @@ func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) { // This demonstrates that buffer start indexes initialized with process start time // prevent false positive duplicates across server restarts } + +func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) { + // Test scenario: getBufferStartFromEntry should handle both JSON and binary formats + // This tests the dual format support for buffer_start metadata + realBrokerClient := &BrokerClient{} + + // Test binary format (Parquet files) + parquetEntry := &filer_pb.Entry{ + Name: "2025-01-07-14-30.parquet", + IsDirectory: false, + Extended: map[string][]byte{ + "buffer_start": func() []byte { + // Binary format: 8-byte BigEndian + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(2000001)) + return buf + }(), + }, + } + + bufferStart := realBrokerClient.getBufferStartFromEntry(parquetEntry) + assert.NotNil(t, bufferStart) + assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file") + + // Test JSON format (log files) + logEntry := &filer_pb.Entry{ + Name: "2025-01-07-14-30-45", + IsDirectory: false, + Extended: map[string][]byte{ + "buffer_start": []byte(`{"start_index": 1500001}`), + }, + } + + bufferStart = realBrokerClient.getBufferStartFromEntry(logEntry) + assert.NotNil(t, bufferStart) + assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse JSON buffer_start from log file") + + // Test missing metadata + emptyEntry := &filer_pb.Entry{ + Name: "no-metadata", + IsDirectory: false, + Extended: nil, + } + + bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry) + assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata") +}