From 618cb8988544034dcbc2162250646c331ec49f5e Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 2 Sep 2025 00:42:37 -0700 Subject: [PATCH] the parquet file should also remember the first buffer_start attribute from the sources --- weed/mq/logstore/log_to_parquet.go | 42 +++++++++++++++++++++++++++--- weed/query/engine/broker_client.go | 24 ++++++++++++----- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 3e04f64ae..f0f62fa26 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -287,13 +287,21 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin // write to parquet file to partitionDir parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) - // Collect source log file names for deduplication metadata + // Collect source log file names and buffer_start metadata for deduplication var sourceLogFiles []string + var earliestBufferStart int64 for _, logFile := range logFileGroups { sourceLogFiles = append(sourceLogFiles, logFile.Name) + + // Extract buffer_start from log file metadata + if bufferStart := getBufferStartFromLogFile(logFile); bufferStart > 0 { + if earliestBufferStart == 0 || bufferStart < earliestBufferStart { + earliestBufferStart = bufferStart + } + } } - if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles); err != nil { + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil { return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) } @@ -301,7 +309,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } -func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string) error { +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error { uploader, err := operation.NewUploader() if err != nil { return fmt.Errorf("new uploader: %w", err) @@ -340,6 +348,13 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile entry.Extended["sources"] = sourceLogFilesJson } + // Store earliest buffer_start for precise broker deduplication + if earliestBufferStart > 0 { + bufferStartBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart)) + entry.Extended["buffer_start"] = bufferStartBytes + } + for i := int64(0); i < chunkCount; i++ { fileId, uploadResult, err, _ := uploader.UploadWithRetry( filerClient, @@ -472,3 +487,24 @@ func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (proc return } + +// getBufferStartFromLogFile extracts the buffer_start index from log file extended metadata +func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 { + if logFile.Extended == nil { + return 0 + } + + // Parse buffer_start format (same as used in query engine) + if startJson, exists := logFile.Extended["buffer_start"]; exists { + // LogBufferStart struct (JSON format) + type LogBufferStart struct { + StartIndex int64 `json:"start_index"` + } + var bufferStart LogBufferStart + if err := json.Unmarshal(startJson, &bufferStart); err == nil { + return bufferStart.StartIndex + } + } + + return 0 +} diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 76055be25..daac30794 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -2,6 +2,7 @@ package engine import ( "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -517,7 +518,7 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi } // getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition -// This is used for precise deduplication - any buffer index >= this value may still be in memory +// This checks both live log files and Parquet files for the most precise deduplication func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { filerClient, err := c.GetFilerClient() if err != nil { @@ -527,12 +528,12 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath var earliestBufferIndex int64 = -1 // -1 means no buffer_start found err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories and parquet files - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + // Skip directories + if entry.IsDirectory { return nil } - // Extract buffer_start from file extended attributes + // Extract buffer_start from file extended attributes (both log files and parquet files) bufferStart := c.getBufferStartFromEntry(entry) if bufferStart != nil && bufferStart.StartIndex > 0 { if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { @@ -555,15 +556,24 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath } // getBufferStartFromEntry extracts LogBufferStart from file entry metadata +// Handles both JSON format (log files) and binary format (Parquet files) func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { if entry.Extended == nil { return nil } - // Parse buffer_start format - if startJson, exists := entry.Extended["buffer_start"]; exists { + if startData, exists := entry.Extended["buffer_start"]; exists { + // Try binary format first (Parquet files) + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex} + } + } + + // Try JSON format (log files) var bufferStart LogBufferStart - if err := json.Unmarshal(startJson, &bufferStart); err == nil { + if err := json.Unmarshal(startData, &bufferStart); err == nil { return &bufferStart } }