From 6fb88a8edbc114d2ba8f7e3a728d287af5491414 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Sep 2025 00:57:48 -0700 Subject: [PATCH] buffer start stored as 8 bytes --- weed/mq/broker/broker_grpc_query.go | 17 ++--- .../broker_topic_partition_read_write.go | 3 +- weed/mq/broker/broker_write.go | 47 +++++++------- weed/mq/logstore/log_to_parquet.go | 16 +++-- weed/query/engine/broker_client.go | 17 ++--- weed/query/engine/engine.go | 16 +++-- weed/query/engine/engine_test.go | 63 ++++++++++++------- 7 files changed, 95 insertions(+), 84 deletions(-) diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 1c0b3a072..701133e38 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -2,7 +2,7 @@ package broker import ( "context" - "encoding/json" + "encoding/binary" "fmt" "strings" @@ -186,13 +186,16 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (* return nil, nil } - // Only support buffer_start format - if startJson, exists := entry.Extended["buffer_start"]; exists { - var bufferStart LogBufferStart - if err := json.Unmarshal(startJson, &bufferStart); err != nil { - return nil, fmt.Errorf("failed to parse buffer start: %v", err) + // Only support binary buffer_start format + if startData, exists := entry.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex}, nil + } + } else { + return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData)) } - return &bufferStart, nil } return nil, nil diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index aa3ffe29d..4b0a95217 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -12,8 +12,9 @@ import ( // LogBufferStart tracks the starting buffer index for a live log file // Buffer indexes are monotonically increasing, count = number of chunks +// Now stored in binary format for efficiency type LogBufferStart struct { - StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) + StartIndex int64 // Starting buffer index (count = len(chunks)) } func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index c1b22bff9..71399de78 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -2,7 +2,7 @@ package broker import ( "context" - "encoding/json" + "encoding/binary" "fmt" "os" "time" @@ -42,14 +42,12 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data }, } - // Add buffer start index for deduplication tracking + // Add buffer start index for deduplication tracking (binary format) if bufferIndex != 0 { entry.Extended = make(map[string][]byte) - bufferStart := LogBufferStart{ - StartIndex: bufferIndex, - } - startJson, _ := json.Marshal(bufferStart) - entry.Extended["buffer_start"] = startJson + bufferStartBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) + entry.Extended["buffer_start"] = bufferStartBytes } } else if err != nil { return fmt.Errorf("find %s: %v", fullpath, err) @@ -62,28 +60,27 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data entry.Extended = make(map[string][]byte) } - // Check for existing buffer start + // Check for existing buffer start (binary format) if existingData, exists := entry.Extended["buffer_start"]; exists { - var bufferStart LogBufferStart - json.Unmarshal(existingData, &bufferStart) - - // Verify that the new buffer index is consecutive - // Expected index = start + number of existing chunks - expectedIndex := bufferStart.StartIndex + int64(len(entry.GetChunks())) - if bufferIndex != expectedIndex { - // This shouldn't happen in normal operation - // Log warning but continue (don't crash the system) - fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n", - expectedIndex, bufferIndex) + if len(existingData) == 8 { + existingStartIndex := int64(binary.BigEndian.Uint64(existingData)) + + // Verify that the new buffer index is consecutive + // Expected index = start + number of existing chunks + expectedIndex := existingStartIndex + int64(len(entry.GetChunks())) + if bufferIndex != expectedIndex { + // This shouldn't happen in normal operation + // Log warning but continue (don't crash the system) + fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n", + expectedIndex, bufferIndex) + } + // Note: We don't update the start index - it stays the same } - // Note: We don't update the start index - it stays the same } else { // No existing buffer start, create new one (shouldn't happen for existing files) - bufferStart := LogBufferStart{ - StartIndex: bufferIndex, - } - startJson, _ := json.Marshal(bufferStart) - entry.Extended["buffer_start"] = startJson + bufferStartBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) + entry.Extended["buffer_start"] = bufferStartBytes } } } diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index f0f62fa26..e8994d9fa 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -494,15 +494,13 @@ func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 { return 0 } - // Parse buffer_start format (same as used in query engine) - if startJson, exists := logFile.Extended["buffer_start"]; exists { - // LogBufferStart struct (JSON format) - type LogBufferStart struct { - StartIndex int64 `json:"start_index"` - } - var bufferStart LogBufferStart - if err := json.Unmarshal(startJson, &bufferStart); err == nil { - return bufferStart.StartIndex + // Parse buffer_start binary format + if startData, exists := logFile.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return startIndex + } } } diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 1e52becb3..bdf1ebecf 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -3,7 +3,6 @@ package engine import ( "context" "encoding/binary" - "encoding/json" "fmt" "io" "strconv" @@ -563,12 +562,12 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath // Debug: Show buffer_start determination logic in EXPLAIN mode if isDebugMode(ctx) && len(bufferStartSources) > 0 { if logFileCount == 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using Parquet buffer_start metadata (no log files) - sources: %v\n", bufferStartSources) + fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources) } else if logFileCount > 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using mixed sources for buffer_start - log files: %d, Parquet files: %d, sources: %v\n", + fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n", logFileCount, parquetFileCount, bufferStartSources) } else { - fmt.Printf("Debug: Using log file buffer_start metadata - sources: %v\n", bufferStartSources) + fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources) } fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) } @@ -585,26 +584,20 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath } // getBufferStartFromEntry extracts LogBufferStart from file entry metadata -// Handles both JSON format (log files) and binary format (Parquet files) +// Only supports binary format (used by both log files and Parquet files) func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { if entry.Extended == nil { return nil } if startData, exists := entry.Extended["buffer_start"]; exists { - // Try binary format first (Parquet files) + // Only support binary format if len(startData) == 8 { startIndex := int64(binary.BigEndian.Uint64(startData)) if startIndex > 0 { return &LogBufferStart{StartIndex: startIndex} } } - - // Try JSON format (log files) - var bufferStart LogBufferStart - if err := json.Unmarshal(startData, &bufferStart); err == nil { - return &bufferStart - } } return nil diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 1d9eac0dd..18aeb3f1b 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/binary" "encoding/json" "fmt" "math" @@ -2158,13 +2159,16 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer return nil, nil } - // Only support buffer_start format - if startJson, exists := entry.Extended["buffer_start"]; exists { - var bufferStart LogBufferStart - if err := json.Unmarshal(startJson, &bufferStart); err != nil { - return nil, fmt.Errorf("failed to parse buffer start: %v", err) + // Only support binary buffer_start format + if startData, exists := entry.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex}, nil + } + } else { + return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData)) } - return &bufferStart, nil } return nil, nil diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 505d53699..6d1c3c9c1 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -3,7 +3,6 @@ package engine import ( "context" "encoding/binary" - "encoding/json" "errors" "testing" @@ -1140,18 +1139,18 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { } // Tests for log buffer deduplication functionality -func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) { +func TestSQLEngine_GetLogBufferStartFromFile_BinaryFormat(t *testing.T) { engine := NewTestSQLEngine() - // Create sample buffer start (new ultra-efficient format) - bufferStart := LogBufferStart{StartIndex: 1609459100000000001} - startJson, _ := json.Marshal(bufferStart) + // Create sample buffer start (binary format) + bufferStartBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(1609459100000000001)) // Create file entry with buffer start + some chunks entry := &filer_pb.Entry{ Name: "test-log-file", Extended: map[string][]byte{ - "buffer_start": startJson, + "buffer_start": bufferStartBytes, }, Chunks: []*filer_pb.FileChunk{ {FileId: "chunk1", Offset: 0, Size: 1000}, @@ -1166,7 +1165,7 @@ func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) { assert.NotNil(t, result) assert.Equal(t, int64(1609459100000000001), result.StartIndex) - // Test extraction works correctly with the new format + // Test extraction works correctly with the binary format } func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) { @@ -1187,18 +1186,18 @@ func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) { func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) { engine := NewTestSQLEngine() - // Create file entry with invalid buffer start + // Create file entry with invalid buffer start (wrong size) entry := &filer_pb.Entry{ Name: "test-log-file", Extended: map[string][]byte{ - "buffer_start": []byte("invalid-json"), + "buffer_start": []byte("invalid-binary"), }, } // Test extraction result, err := engine.getLogBufferStartFromFile(entry) assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse buffer start") + assert.Contains(t, err.Error(), "invalid buffer_start format: expected 8 bytes") assert.Nil(t, result) } @@ -1256,14 +1255,14 @@ func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) { // prevent false positive duplicates across server restarts } -func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) { - // Test scenario: getBufferStartFromEntry should handle both JSON and binary formats - // This tests the dual format support for buffer_start metadata +func TestBrokerClient_BinaryBufferStartFormat(t *testing.T) { + // Test scenario: getBufferStartFromEntry should only support binary format + // This tests the standardized binary format for buffer_start metadata realBrokerClient := &BrokerClient{} - // Test binary format (Parquet files) - parquetEntry := &filer_pb.Entry{ - Name: "2025-01-07-14-30.parquet", + // Test binary format (used by both log files and Parquet files) + binaryEntry := &filer_pb.Entry{ + Name: "2025-01-07-14-30-45", IsDirectory: false, Extended: map[string][]byte{ "buffer_start": func() []byte { @@ -1275,22 +1274,26 @@ func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) { }, } - bufferStart := realBrokerClient.getBufferStartFromEntry(parquetEntry) + bufferStart := realBrokerClient.getBufferStartFromEntry(binaryEntry) assert.NotNil(t, bufferStart) - assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file") + assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start metadata") - // Test JSON format (log files) - logEntry := &filer_pb.Entry{ - Name: "2025-01-07-14-30-45", + // Test Parquet file (same binary format) + parquetEntry := &filer_pb.Entry{ + Name: "2025-01-07-14-30.parquet", IsDirectory: false, Extended: map[string][]byte{ - "buffer_start": []byte(`{"start_index": 1500001}`), + "buffer_start": func() []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(1500001)) + return buf + }(), }, } - bufferStart = realBrokerClient.getBufferStartFromEntry(logEntry) + bufferStart = realBrokerClient.getBufferStartFromEntry(parquetEntry) assert.NotNil(t, bufferStart) - assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse JSON buffer_start from log file") + assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file") // Test missing metadata emptyEntry := &filer_pb.Entry{ @@ -1301,4 +1304,16 @@ func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) { bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry) assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata") + + // Test invalid format (wrong size) + invalidEntry := &filer_pb.Entry{ + Name: "invalid-metadata", + IsDirectory: false, + Extended: map[string][]byte{ + "buffer_start": []byte("invalid"), + }, + } + + bufferStart = realBrokerClient.getBufferStartFromEntry(invalidEntry) + assert.Nil(t, bufferStart, "Should return nil for invalid buffer_start metadata") }