Browse Source

buffer start stored as 8 bytes

pull/7185/head
chrislu 1 month ago
parent
commit
6fb88a8edb
  1. 17
      weed/mq/broker/broker_grpc_query.go
  2. 3
      weed/mq/broker/broker_topic_partition_read_write.go
  3. 47
      weed/mq/broker/broker_write.go
  4. 16
      weed/mq/logstore/log_to_parquet.go
  5. 17
      weed/query/engine/broker_client.go
  6. 16
      weed/query/engine/engine.go
  7. 63
      weed/query/engine/engine_test.go

17
weed/mq/broker/broker_grpc_query.go

@ -2,7 +2,7 @@ package broker
import ( import (
"context" "context"
"encoding/json"
"encoding/binary"
"fmt" "fmt"
"strings" "strings"
@ -186,13 +186,16 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*
return nil, nil return nil, nil
} }
// Only support buffer_start format
if startJson, exists := entry.Extended["buffer_start"]; exists {
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err != nil {
return nil, fmt.Errorf("failed to parse buffer start: %v", err)
// Only support binary buffer_start format
if startData, exists := entry.Extended["buffer_start"]; exists {
if len(startData) == 8 {
startIndex := int64(binary.BigEndian.Uint64(startData))
if startIndex > 0 {
return &LogBufferStart{StartIndex: startIndex}, nil
}
} else {
return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
} }
return &bufferStart, nil
} }
return nil, nil return nil, nil

3
weed/mq/broker/broker_topic_partition_read_write.go

@ -12,8 +12,9 @@ import (
// LogBufferStart tracks the starting buffer index for a live log file // LogBufferStart tracks the starting buffer index for a live log file
// Buffer indexes are monotonically increasing, count = number of chunks // Buffer indexes are monotonically increasing, count = number of chunks
// Now stored in binary format for efficiency
type LogBufferStart struct { type LogBufferStart struct {
StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks))
StartIndex int64 // Starting buffer index (count = len(chunks))
} }
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType { func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType {

47
weed/mq/broker/broker_write.go

@ -2,7 +2,7 @@ package broker
import ( import (
"context" "context"
"encoding/json"
"encoding/binary"
"fmt" "fmt"
"os" "os"
"time" "time"
@ -42,14 +42,12 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
}, },
} }
// Add buffer start index for deduplication tracking
// Add buffer start index for deduplication tracking (binary format)
if bufferIndex != 0 { if bufferIndex != 0 {
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
bufferStart := LogBufferStart{
StartIndex: bufferIndex,
}
startJson, _ := json.Marshal(bufferStart)
entry.Extended["buffer_start"] = startJson
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
entry.Extended["buffer_start"] = bufferStartBytes
} }
} else if err != nil { } else if err != nil {
return fmt.Errorf("find %s: %v", fullpath, err) return fmt.Errorf("find %s: %v", fullpath, err)
@ -62,28 +60,27 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
} }
// Check for existing buffer start
// Check for existing buffer start (binary format)
if existingData, exists := entry.Extended["buffer_start"]; exists { if existingData, exists := entry.Extended["buffer_start"]; exists {
var bufferStart LogBufferStart
json.Unmarshal(existingData, &bufferStart)
// Verify that the new buffer index is consecutive
// Expected index = start + number of existing chunks
expectedIndex := bufferStart.StartIndex + int64(len(entry.GetChunks()))
if bufferIndex != expectedIndex {
// This shouldn't happen in normal operation
// Log warning but continue (don't crash the system)
fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n",
expectedIndex, bufferIndex)
if len(existingData) == 8 {
existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
// Verify that the new buffer index is consecutive
// Expected index = start + number of existing chunks
expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
if bufferIndex != expectedIndex {
// This shouldn't happen in normal operation
// Log warning but continue (don't crash the system)
fmt.Printf("Warning: non-consecutive buffer index. Expected %d, got %d\n",
expectedIndex, bufferIndex)
}
// Note: We don't update the start index - it stays the same
} }
// Note: We don't update the start index - it stays the same
} else { } else {
// No existing buffer start, create new one (shouldn't happen for existing files) // No existing buffer start, create new one (shouldn't happen for existing files)
bufferStart := LogBufferStart{
StartIndex: bufferIndex,
}
startJson, _ := json.Marshal(bufferStart)
entry.Extended["buffer_start"] = startJson
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
entry.Extended["buffer_start"] = bufferStartBytes
} }
} }
} }

16
weed/mq/logstore/log_to_parquet.go

@ -494,15 +494,13 @@ func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 {
return 0 return 0
} }
// Parse buffer_start format (same as used in query engine)
if startJson, exists := logFile.Extended["buffer_start"]; exists {
// LogBufferStart struct (JSON format)
type LogBufferStart struct {
StartIndex int64 `json:"start_index"`
}
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err == nil {
return bufferStart.StartIndex
// Parse buffer_start binary format
if startData, exists := logFile.Extended["buffer_start"]; exists {
if len(startData) == 8 {
startIndex := int64(binary.BigEndian.Uint64(startData))
if startIndex > 0 {
return startIndex
}
} }
} }

17
weed/query/engine/broker_client.go

@ -3,7 +3,6 @@ package engine
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"io" "io"
"strconv" "strconv"
@ -563,12 +562,12 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
// Debug: Show buffer_start determination logic in EXPLAIN mode // Debug: Show buffer_start determination logic in EXPLAIN mode
if isDebugMode(ctx) && len(bufferStartSources) > 0 { if isDebugMode(ctx) && len(bufferStartSources) > 0 {
if logFileCount == 0 && parquetFileCount > 0 { if logFileCount == 0 && parquetFileCount > 0 {
fmt.Printf("Debug: Using Parquet buffer_start metadata (no log files) - sources: %v\n", bufferStartSources)
fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources)
} else if logFileCount > 0 && parquetFileCount > 0 { } else if logFileCount > 0 && parquetFileCount > 0 {
fmt.Printf("Debug: Using mixed sources for buffer_start - log files: %d, Parquet files: %d, sources: %v\n",
fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n",
logFileCount, parquetFileCount, bufferStartSources) logFileCount, parquetFileCount, bufferStartSources)
} else { } else {
fmt.Printf("Debug: Using log file buffer_start metadata - sources: %v\n", bufferStartSources)
fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources)
} }
fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex)
} }
@ -585,26 +584,20 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
} }
// getBufferStartFromEntry extracts LogBufferStart from file entry metadata // getBufferStartFromEntry extracts LogBufferStart from file entry metadata
// Handles both JSON format (log files) and binary format (Parquet files)
// Only supports binary format (used by both log files and Parquet files)
func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart {
if entry.Extended == nil { if entry.Extended == nil {
return nil return nil
} }
if startData, exists := entry.Extended["buffer_start"]; exists { if startData, exists := entry.Extended["buffer_start"]; exists {
// Try binary format first (Parquet files)
// Only support binary format
if len(startData) == 8 { if len(startData) == 8 {
startIndex := int64(binary.BigEndian.Uint64(startData)) startIndex := int64(binary.BigEndian.Uint64(startData))
if startIndex > 0 { if startIndex > 0 {
return &LogBufferStart{StartIndex: startIndex} return &LogBufferStart{StartIndex: startIndex}
} }
} }
// Try JSON format (log files)
var bufferStart LogBufferStart
if err := json.Unmarshal(startData, &bufferStart); err == nil {
return &bufferStart
}
} }
return nil return nil

16
weed/query/engine/engine.go

@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
@ -2158,13 +2159,16 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer
return nil, nil return nil, nil
} }
// Only support buffer_start format
if startJson, exists := entry.Extended["buffer_start"]; exists {
var bufferStart LogBufferStart
if err := json.Unmarshal(startJson, &bufferStart); err != nil {
return nil, fmt.Errorf("failed to parse buffer start: %v", err)
// Only support binary buffer_start format
if startData, exists := entry.Extended["buffer_start"]; exists {
if len(startData) == 8 {
startIndex := int64(binary.BigEndian.Uint64(startData))
if startIndex > 0 {
return &LogBufferStart{StartIndex: startIndex}, nil
}
} else {
return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
} }
return &bufferStart, nil
} }
return nil, nil return nil, nil

63
weed/query/engine/engine_test.go

@ -3,7 +3,6 @@ package engine
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"errors" "errors"
"testing" "testing"
@ -1140,18 +1139,18 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
} }
// Tests for log buffer deduplication functionality // Tests for log buffer deduplication functionality
func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) {
func TestSQLEngine_GetLogBufferStartFromFile_BinaryFormat(t *testing.T) {
engine := NewTestSQLEngine() engine := NewTestSQLEngine()
// Create sample buffer start (new ultra-efficient format)
bufferStart := LogBufferStart{StartIndex: 1609459100000000001}
startJson, _ := json.Marshal(bufferStart)
// Create sample buffer start (binary format)
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(1609459100000000001))
// Create file entry with buffer start + some chunks // Create file entry with buffer start + some chunks
entry := &filer_pb.Entry{ entry := &filer_pb.Entry{
Name: "test-log-file", Name: "test-log-file",
Extended: map[string][]byte{ Extended: map[string][]byte{
"buffer_start": startJson,
"buffer_start": bufferStartBytes,
}, },
Chunks: []*filer_pb.FileChunk{ Chunks: []*filer_pb.FileChunk{
{FileId: "chunk1", Offset: 0, Size: 1000}, {FileId: "chunk1", Offset: 0, Size: 1000},
@ -1166,7 +1165,7 @@ func TestSQLEngine_GetLogBufferStartFromFile_NewFormat(t *testing.T) {
assert.NotNil(t, result) assert.NotNil(t, result)
assert.Equal(t, int64(1609459100000000001), result.StartIndex) assert.Equal(t, int64(1609459100000000001), result.StartIndex)
// Test extraction works correctly with the new format
// Test extraction works correctly with the binary format
} }
func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) { func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) {
@ -1187,18 +1186,18 @@ func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) {
func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) { func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) {
engine := NewTestSQLEngine() engine := NewTestSQLEngine()
// Create file entry with invalid buffer start
// Create file entry with invalid buffer start (wrong size)
entry := &filer_pb.Entry{ entry := &filer_pb.Entry{
Name: "test-log-file", Name: "test-log-file",
Extended: map[string][]byte{ Extended: map[string][]byte{
"buffer_start": []byte("invalid-json"),
"buffer_start": []byte("invalid-binary"),
}, },
} }
// Test extraction // Test extraction
result, err := engine.getLogBufferStartFromFile(entry) result, err := engine.getLogBufferStartFromFile(entry)
assert.Error(t, err) assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse buffer start")
assert.Contains(t, err.Error(), "invalid buffer_start format: expected 8 bytes")
assert.Nil(t, result) assert.Nil(t, result)
} }
@ -1256,14 +1255,14 @@ func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) {
// prevent false positive duplicates across server restarts // prevent false positive duplicates across server restarts
} }
func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) {
// Test scenario: getBufferStartFromEntry should handle both JSON and binary formats
// This tests the dual format support for buffer_start metadata
func TestBrokerClient_BinaryBufferStartFormat(t *testing.T) {
// Test scenario: getBufferStartFromEntry should only support binary format
// This tests the standardized binary format for buffer_start metadata
realBrokerClient := &BrokerClient{} realBrokerClient := &BrokerClient{}
// Test binary format (Parquet files)
parquetEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30.parquet",
// Test binary format (used by both log files and Parquet files)
binaryEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30-45",
IsDirectory: false, IsDirectory: false,
Extended: map[string][]byte{ Extended: map[string][]byte{
"buffer_start": func() []byte { "buffer_start": func() []byte {
@ -1275,22 +1274,26 @@ func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) {
}, },
} }
bufferStart := realBrokerClient.getBufferStartFromEntry(parquetEntry)
bufferStart := realBrokerClient.getBufferStartFromEntry(binaryEntry)
assert.NotNil(t, bufferStart) assert.NotNil(t, bufferStart)
assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file")
assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start metadata")
// Test JSON format (log files)
logEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30-45",
// Test Parquet file (same binary format)
parquetEntry := &filer_pb.Entry{
Name: "2025-01-07-14-30.parquet",
IsDirectory: false, IsDirectory: false,
Extended: map[string][]byte{ Extended: map[string][]byte{
"buffer_start": []byte(`{"start_index": 1500001}`),
"buffer_start": func() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(1500001))
return buf
}(),
}, },
} }
bufferStart = realBrokerClient.getBufferStartFromEntry(logEntry)
bufferStart = realBrokerClient.getBufferStartFromEntry(parquetEntry)
assert.NotNil(t, bufferStart) assert.NotNil(t, bufferStart)
assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse JSON buffer_start from log file")
assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file")
// Test missing metadata // Test missing metadata
emptyEntry := &filer_pb.Entry{ emptyEntry := &filer_pb.Entry{
@ -1301,4 +1304,16 @@ func TestBrokerClient_ParquetBufferStartForBrokerQuery(t *testing.T) {
bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry) bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry)
assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata") assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata")
// Test invalid format (wrong size)
invalidEntry := &filer_pb.Entry{
Name: "invalid-metadata",
IsDirectory: false,
Extended: map[string][]byte{
"buffer_start": []byte("invalid"),
},
}
bufferStart = realBrokerClient.getBufferStartFromEntry(invalidEntry)
assert.Nil(t, bufferStart, "Should return nil for invalid buffer_start metadata")
} }
Loading…
Cancel
Save