From 80d3085d5490a02ed92950d40087eb30f8610b2a Mon Sep 17 00:00:00 2001 From: Lars Lehtonen Date: Tue, 31 Mar 2026 20:53:41 -0700 Subject: [PATCH] Prune Query Engine (#8865) * chore(weed/query/engine): prune unused functions * chore(weed/query/engine): prune unused test-only function --- weed/query/engine/broker_client.go | 76 ----- weed/query/engine/data_conversion.go | 21 -- weed/query/engine/engine.go | 355 -------------------- weed/query/engine/engine_test.go | 63 ---- weed/query/engine/hybrid_message_scanner.go | 128 ------- weed/query/engine/parquet_scanner.go | 86 ----- weed/query/engine/system_columns.go | 8 - 7 files changed, 737 deletions(-) diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index a34e79b3e..8df893546 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -2,7 +2,6 @@ package engine import ( "context" - "encoding/binary" "fmt" "io" "strings" @@ -18,7 +17,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" jsonpb "google.golang.org/protobuf/encoding/protojson" @@ -509,77 +507,3 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi return logEntries, nil } - -// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition -// -// This method handles three scenarios for seamless broker querying: -// 1. Live log files exist: Uses their buffer_start metadata (most recent boundaries) -// 2. Only Parquet files exist: Uses Parquet buffer_start metadata (preserved from archived sources) -// 3. Mixed files: Uses earliest buffer_start from all sources for comprehensive coverage -// -// This ensures continuous real-time querying capability even after log file compaction/archival -func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { - filerClient, err := c.GetFilerClient() - if err != nil { - return 0, fmt.Errorf("failed to get filer client: %v", err) - } - - var earliestBufferIndex int64 = -1 // -1 means no buffer_start found - var logFileCount, parquetFileCount int - var bufferStartSources []string // Track which files provide buffer_start - - err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories - if entry.IsDirectory { - return nil - } - - // Count file types for scenario detection - if strings.HasSuffix(entry.Name, ".parquet") { - parquetFileCount++ - } else { - logFileCount++ - } - - // Extract buffer_start from file extended attributes (both log files and parquet files) - bufferStart := c.getBufferStartFromEntry(entry) - if bufferStart != nil && bufferStart.StartIndex > 0 { - if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { - earliestBufferIndex = bufferStart.StartIndex - } - bufferStartSources = append(bufferStartSources, entry.Name) - } - - return nil - }) - - if err != nil { - return 0, fmt.Errorf("failed to scan partition directory: %v", err) - } - - if earliestBufferIndex == -1 { - return 0, fmt.Errorf("no buffer_start metadata found in partition") - } - - return earliestBufferIndex, nil -} - -// getBufferStartFromEntry extracts LogBufferStart from file entry metadata -// Only supports binary format (used by both log files and Parquet files) -func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { - if entry.Extended == nil { - return nil - } - - if startData, exists := entry.Extended["buffer_start"]; exists { - // Only support binary format - if len(startData) == 8 { - startIndex := int64(binary.BigEndian.Uint64(startData)) - if startIndex > 0 { - return &LogBufferStart{StartIndex: startIndex} - } - } - } - - return nil -} diff --git a/weed/query/engine/data_conversion.go b/weed/query/engine/data_conversion.go index f626d8f2e..77a4b4cb3 100644 --- a/weed/query/engine/data_conversion.go +++ b/weed/query/engine/data_conversion.go @@ -170,27 +170,6 @@ func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_p } } -// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value -func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value { - switch v := jsonValue.(type) { - case string: - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} - case float64: - // JSON numbers are always float64, try to detect if it's actually an integer - if v == float64(int64(v)) { - return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}} - } - return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} - case bool: - return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} - case nil: - return nil - default: - // Convert other types to string - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} - } -} - // Helper functions for aggregation processing // isNullValue checks if a schema_pb.Value is null or empty diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index e00fd78ca..ac66a7453 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -2175,361 +2175,6 @@ func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, h return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil } -// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture -// This is used by EXPLAIN queries to capture complete data source information including broker memory -func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { - // Parse FROM clause to get table (topic) information - if len(stmt.From) != 1 { - err := fmt.Errorf("SELECT supports single table queries only") - return &QueryResult{Error: err}, err - } - - // Extract table reference - var database, tableName string - switch table := stmt.From[0].(type) { - case *AliasedTableExpr: - switch tableExpr := table.Expr.(type) { - case TableName: - tableName = tableExpr.Name.String() - if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { - database = tableExpr.Qualifier.String() - } - default: - err := fmt.Errorf("unsupported table expression: %T", tableExpr) - return &QueryResult{Error: err}, err - } - default: - err := fmt.Errorf("unsupported FROM clause: %T", table) - return &QueryResult{Error: err}, err - } - - // Use current database context if not specified - if database == "" { - database = e.catalog.GetCurrentDatabase() - if database == "" { - database = "default" - } - } - - // Auto-discover and register topic if not already in catalog - if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { - // Topic not in catalog, try to discover and register it - if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { - // Return error immediately for non-existent topics instead of falling back to sample data - return &QueryResult{Error: regErr}, regErr - } - } - - // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) - // Get filerClient from broker connection (works with both real and mock brokers) - var filerClient filer_pb.FilerClient - var filerClientErr error - filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() - if filerClientErr != nil { - // Return error if filer client is not available for topic access - return &QueryResult{Error: filerClientErr}, filerClientErr - } - - hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) - if err != nil { - // Handle quiet topics gracefully: topics exist but have no active schema/brokers - if IsNoSchemaError(err) { - // Return empty result for quiet topics (normal in production environments) - return &QueryResult{ - Columns: []string{}, - Rows: [][]sqltypes.Value{}, - Database: database, - Table: tableName, - }, nil - } - // Return error for other access issues (truly non-existent topics, etc.) - topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) - return &QueryResult{Error: topicErr}, topicErr - } - - // Parse SELECT columns and detect aggregation functions - var columns []string - var aggregations []AggregationSpec - selectAll := false - hasAggregations := false - _ = hasAggregations // Used later in aggregation routing - // Track required base columns for arithmetic expressions - baseColumnsSet := make(map[string]bool) - - for _, selectExpr := range stmt.SelectExprs { - switch expr := selectExpr.(type) { - case *StarExpr: - selectAll = true - case *AliasedExpr: - switch col := expr.Expr.(type) { - case *ColName: - colName := col.Name.String() - columns = append(columns, colName) - baseColumnsSet[colName] = true - case *ArithmeticExpr: - // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix - columns = append(columns, e.getArithmeticExpressionAlias(col)) - // Extract base columns needed for this arithmetic expression - e.extractBaseColumns(col, baseColumnsSet) - case *SQLVal: - // Handle string/numeric literals like 'good', 123, etc. - columns = append(columns, e.getSQLValAlias(col)) - case *FuncExpr: - // Distinguish between aggregation functions and string functions - funcName := strings.ToUpper(col.Name.String()) - if e.isAggregationFunction(funcName) { - // Handle aggregation functions - aggSpec, err := e.parseAggregationFunction(col, expr) - if err != nil { - return &QueryResult{Error: err}, err - } - aggregations = append(aggregations, *aggSpec) - hasAggregations = true - } else if e.isStringFunction(funcName) { - // Handle string functions like UPPER, LENGTH, etc. - columns = append(columns, e.getStringFunctionAlias(col)) - // Extract base columns needed for this string function - e.extractBaseColumnsFromFunction(col, baseColumnsSet) - } else if e.isDateTimeFunction(funcName) { - // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC - columns = append(columns, e.getDateTimeFunctionAlias(col)) - // Extract base columns needed for this datetime function - e.extractBaseColumnsFromFunction(col, baseColumnsSet) - } else { - return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) - } - default: - err := fmt.Errorf("unsupported SELECT expression: %T", col) - return &QueryResult{Error: err}, err - } - default: - err := fmt.Errorf("unsupported SELECT expression: %T", expr) - return &QueryResult{Error: err}, err - } - } - - // If we have aggregations, use aggregation query path - if hasAggregations { - return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) - } - - // Parse WHERE clause for predicate pushdown - var predicate func(*schema_pb.RecordValue) bool - if stmt.Where != nil { - predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) - if err != nil { - return &QueryResult{Error: err}, err - } - } - - // Parse LIMIT and OFFSET clauses - // Use -1 to distinguish "no LIMIT" from "LIMIT 0" - limit := -1 - offset := 0 - if stmt.Limit != nil && stmt.Limit.Rowcount != nil { - switch limitExpr := stmt.Limit.Rowcount.(type) { - case *SQLVal: - if limitExpr.Type == IntVal { - var parseErr error - limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) - if parseErr != nil { - return &QueryResult{Error: parseErr}, parseErr - } - if limit64 > math.MaxInt32 || limit64 < 0 { - return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) - } - limit = int(limit64) - } - } - } - - // Parse OFFSET clause if present - if stmt.Limit != nil && stmt.Limit.Offset != nil { - switch offsetExpr := stmt.Limit.Offset.(type) { - case *SQLVal: - if offsetExpr.Type == IntVal { - var parseErr error - offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) - if parseErr != nil { - return &QueryResult{Error: parseErr}, parseErr - } - if offset64 > math.MaxInt32 || offset64 < 0 { - return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) - } - offset = int(offset64) - } - } - } - - // Build hybrid scan options - // Extract time filters from WHERE clause to optimize scanning - startTimeNs, stopTimeNs := int64(0), int64(0) - if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) - } - - hybridScanOptions := HybridScanOptions{ - StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons - StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons - Limit: limit, - Offset: offset, - Predicate: predicate, - } - - if !selectAll { - // Convert baseColumnsSet to slice for hybrid scan options - baseColumns := make([]string, 0, len(baseColumnsSet)) - for columnName := range baseColumnsSet { - baseColumns = append(baseColumns, columnName) - } - // Use base columns (not expression aliases) for data retrieval - if len(baseColumns) > 0 { - hybridScanOptions.Columns = baseColumns - } else { - // If no base columns found (shouldn't happen), use original columns - hybridScanOptions.Columns = columns - } - } - - // Execute the hybrid scan with stats capture for EXPLAIN - var results []HybridScanResult - if plan != nil { - // EXPLAIN mode - capture broker buffer stats - var stats *HybridScanStats - results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) - if err != nil { - return &QueryResult{Error: err}, err - } - - // Populate plan with broker buffer information - if stats != nil { - plan.BrokerBufferQueried = stats.BrokerBufferQueried - plan.BrokerBufferMessages = stats.BrokerBufferMessages - plan.BufferStartIndex = stats.BufferStartIndex - - // Add broker_buffer to data sources if buffer was queried - if stats.BrokerBufferQueried { - // Check if broker_buffer is already in data sources - hasBrokerBuffer := false - for _, source := range plan.DataSources { - if source == "broker_buffer" { - hasBrokerBuffer = true - break - } - } - if !hasBrokerBuffer { - plan.DataSources = append(plan.DataSources, "broker_buffer") - } - } - } - - // Populate execution plan details with source file information for Data Sources Tree - if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { - // Add partition paths to execution plan details - plan.Details["partition_paths"] = partitions - // Persist time filter details for downstream pruning/diagnostics - plan.Details[PlanDetailStartTimeNs] = startTimeNs - plan.Details[PlanDetailStopTimeNs] = stopTimeNs - - // Collect actual file information for each partition - var parquetFiles []string - var liveLogFiles []string - parquetSources := make(map[string]bool) - - var parquetReadErrors []string - var liveLogListErrors []string - for _, partitionPath := range partitions { - // Get parquet files for this partition - if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - // Prune files by time range with debug logging - filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) - - // Further prune by column statistics from WHERE clause - if stmt.Where != nil { - beforeColumnPrune := len(filteredStats) - filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) - columnPrunedCount := beforeColumnPrune - len(filteredStats) - - if columnPrunedCount > 0 { - // Track column statistics optimization - if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { - plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") - } - } - } - for _, stats := range filteredStats { - parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) - } - } else { - parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - } - - // Merge accurate parquet sources from metadata - if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { - for src := range sources { - parquetSources[src] = true - } - } - - // Get live log files for this partition - if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { - for _, fileName := range liveFiles { - // Exclude live log files that have been converted to parquet (deduplicated) - if parquetSources[fileName] { - continue - } - liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) - } - } else { - liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - } - } - - if len(parquetFiles) > 0 { - plan.Details["parquet_files"] = parquetFiles - } - if len(liveLogFiles) > 0 { - plan.Details["live_log_files"] = liveLogFiles - } - if len(parquetReadErrors) > 0 { - plan.Details["error_parquet_statistics"] = parquetReadErrors - } - if len(liveLogListErrors) > 0 { - plan.Details["error_live_log_listing"] = liveLogListErrors - } - - // Update scan statistics for execution plan display - plan.PartitionsScanned = len(partitions) - plan.ParquetFilesScanned = len(parquetFiles) - plan.LiveLogFilesScanned = len(liveLogFiles) - } else { - // Handle partition discovery error - plan.Details["error_partition_discovery"] = discoverErr.Error() - } - } else { - // Normal mode - just get results - results, err = hybridScanner.Scan(ctx, hybridScanOptions) - if err != nil { - return &QueryResult{Error: err}, err - } - } - - // Convert to SQL result format - if selectAll { - if len(columns) > 0 { - // SELECT *, specific_columns - include both auto-discovered and explicit columns - return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil - } else { - // SELECT * only - let converter determine all columns (excludes system columns) - columns = nil - return hybridScanner.ConvertToSQLResult(results, columns), nil - } - } - - // Handle custom column expressions (including arithmetic) - return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil -} - // extractTimeFilters extracts time range filters from WHERE clause for optimization // This allows push-down of time-based queries to improve scan performance // Returns (startTimeNs, stopTimeNs) where 0 means unbounded diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 96c5507b0..42a5f4911 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -1254,69 +1254,6 @@ func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) { // prevent false positive duplicates across server restarts } -func TestBrokerClient_BinaryBufferStartFormat(t *testing.T) { - // Test scenario: getBufferStartFromEntry should only support binary format - // This tests the standardized binary format for buffer_start metadata - realBrokerClient := &BrokerClient{} - - // Test binary format (used by both log files and Parquet files) - binaryEntry := &filer_pb.Entry{ - Name: "2025-01-07-14-30-45", - IsDirectory: false, - Extended: map[string][]byte{ - "buffer_start": func() []byte { - // Binary format: 8-byte BigEndian - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(2000001)) - return buf - }(), - }, - } - - bufferStart := realBrokerClient.getBufferStartFromEntry(binaryEntry) - assert.NotNil(t, bufferStart) - assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start metadata") - - // Test Parquet file (same binary format) - parquetEntry := &filer_pb.Entry{ - Name: "2025-01-07-14-30.parquet", - IsDirectory: false, - Extended: map[string][]byte{ - "buffer_start": func() []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(1500001)) - return buf - }(), - }, - } - - bufferStart = realBrokerClient.getBufferStartFromEntry(parquetEntry) - assert.NotNil(t, bufferStart) - assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file") - - // Test missing metadata - emptyEntry := &filer_pb.Entry{ - Name: "no-metadata", - IsDirectory: false, - Extended: nil, - } - - bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry) - assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata") - - // Test invalid format (wrong size) - invalidEntry := &filer_pb.Entry{ - Name: "invalid-metadata", - IsDirectory: false, - Extended: map[string][]byte{ - "buffer_start": []byte("invalid"), - }, - } - - bufferStart = realBrokerClient.getBufferStartFromEntry(invalidEntry) - assert.Nil(t, bufferStart, "Should return nil for invalid buffer_start metadata") -} - // TestGetSQLValAlias tests the getSQLValAlias function, particularly for SQL injection prevention func TestGetSQLValAlias(t *testing.T) { engine := &SQLEngine{} diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index b8477acfb..4df91024b 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -320,12 +320,6 @@ func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options Hybr return results, stats, nil } -// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication -func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { - results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options) - return results, err -} - // scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult @@ -436,27 +430,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, return results, stats, nil } -// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue -func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { - // Parse the message data as RecordValue - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(msg.Value, recordValue); err != nil { - return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err) - } - - // Add system columns - if recordValue.Fields == nil { - recordValue.Fields = make(map[string]*schema_pb.Value) - } - - // Add timestamp - recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs}, - } - - return recordValue, string(msg.Key), nil -} - // discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem // This finds real partition directories like v2025-09-01-07-16-34/0000-0630/ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { @@ -521,15 +494,6 @@ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([ return allPartitions, nil } -// scanPartitionHybrid scans a specific partition using the hybrid approach -// This is where the magic happens - seamlessly reading ALL data sources: -// 1. Unflushed in-memory data from brokers (REAL-TIME) -// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED) -func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { - results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options) - return results, err -} - // scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency // PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting // - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records @@ -647,23 +611,6 @@ func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (i return fileCount, nil } -// isControlEntry checks if a log entry is a control entry without actual data -// Based on MQ system analysis, control entries are: -// 1. DataMessages with populated Ctrl field (publisher close signals) -// 2. Entries with empty keys (as filtered by subscriber) -// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries -func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool { - // Pre-decode DataMessage if needed - var dataMessage *mq_pb.DataMessage - if len(logEntry.Data) > 0 { - dataMessage = &mq_pb.DataMessage{} - if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { - dataMessage = nil // Failed to decode, treat as raw data - } - } - return hms.isControlEntryWithDecoded(logEntry, dataMessage) -} - // isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage // This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool { @@ -682,26 +629,6 @@ func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.Lo return false } -// isNullOrEmpty checks if a schema_pb.Value is null or empty -func isNullOrEmpty(value *schema_pb.Value) bool { - if value == nil { - return true - } - - switch v := value.Kind.(type) { - case *schema_pb.Value_StringValue: - return v.StringValue == "" - case *schema_pb.Value_BytesValue: - return len(v.BytesValue) == 0 - case *schema_pb.Value_ListValue: - return v.ListValue == nil || len(v.ListValue.Values) == 0 - case nil: - return true // No kind set means null - default: - return false - } -} - // isSchemaless checks if the scanner is configured for a schema-less topic // Schema-less topics only have system fields: _ts_ns, _key, and _value func (hms *HybridMessageScanner) isSchemaless() bool { @@ -736,61 +663,6 @@ func (hms *HybridMessageScanner) isSchemaless() bool { return hasValue && dataFieldCount == 1 } -// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue -// This handles both: -// 1. Live log entries (raw message format) -// 2. Parquet entries (already in schema_pb.RecordValue format) -// 3. Schema-less topics (raw bytes in _value field) -func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { - // For schema-less topics, put raw data directly into _value field - if hms.isSchemaless() { - recordValue := &schema_pb.RecordValue{ - Fields: make(map[string]*schema_pb.Value), - } - recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, - } - recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, - } - recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, - } - return recordValue, "live_log", nil - } - - // Try to unmarshal as RecordValue first (Parquet format) - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { - // This is an archived message from Parquet files - // FIX: Add system columns from LogEntry to RecordValue - if recordValue.Fields == nil { - recordValue.Fields = make(map[string]*schema_pb.Value) - } - - // Add system columns from LogEntry - recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, - } - recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, - } - - return recordValue, "parquet_archive", nil - } - - // If not a RecordValue, this is raw live message data - parse with schema - return hms.parseRawMessageWithSchema(logEntry) -} - -// min returns the minimum of two integers -func min(a, b int) int { - if a < b { - return a - } - return b -} - // parseRawMessageWithSchema parses raw live message data using the topic's schema // This provides proper type conversion and field mapping instead of treating everything as strings func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index 7a470817b..9bcced904 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -6,8 +6,6 @@ import ( "math/big" "time" - "github.com/parquet-go/parquet-go" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -172,90 +170,6 @@ func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Par return results, nil } -// scanParquetFile scans a single Parquet file (real implementation) -func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) { - var results []ScanResult - - // Create reader for the Parquet file (same pattern as logstore) - lookupFileIdFn := filer.LookupFn(ps.filerClient) - fileSize := filer.FileSize(entry) - visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) - chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) - readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn) - readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount) - - // Create Parquet reader - parquetReader := parquet.NewReader(readerAt) - defer parquetReader.Close() - - rows := make([]parquet.Row, 128) // Read in batches like logstore - - for { - rowCount, readErr := parquetReader.ReadRows(rows) - - // Process rows even if EOF - for i := 0; i < rowCount; i++ { - // Convert Parquet row to schema value - recordValue, err := schema.ToRecordValue(ps.recordSchema, ps.parquetLevels, rows[i]) - if err != nil { - return nil, fmt.Errorf("failed to convert row: %v", err) - } - - // Extract system columns - timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() - key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() - - // Apply time filtering - if options.StartTimeNs > 0 && timestamp < options.StartTimeNs { - continue - } - if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs { - break // Assume data is time-ordered - } - - // Apply predicate filtering (WHERE clause) - if options.Predicate != nil && !options.Predicate(recordValue) { - continue - } - - // Apply column projection - values := make(map[string]*schema_pb.Value) - if len(options.Columns) == 0 { - // Select all columns (excluding system columns from user view) - for name, value := range recordValue.Fields { - if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { - values[name] = value - } - } - } else { - // Select specified columns only - for _, columnName := range options.Columns { - if value, exists := recordValue.Fields[columnName]; exists { - values[columnName] = value - } - } - } - - results = append(results, ScanResult{ - Values: values, - Timestamp: timestamp, - Key: key, - }) - - // Apply row limit - if options.Limit > 0 && len(results) >= options.Limit { - return results, nil - } - } - - if readErr != nil { - break // EOF or error - } - } - - return results, nil -} - // generateSampleData creates sample data for testing when no real Parquet files exist func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult { now := time.Now().UnixNano() diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go index a982416ed..256ad90ff 100644 --- a/weed/query/engine/system_columns.go +++ b/weed/query/engine/system_columns.go @@ -50,14 +50,6 @@ func (e *SQLEngine) getSystemColumnDisplayName(columnName string) string { } } -// isSystemColumnDisplayName checks if a column name is a system column display name -func (e *SQLEngine) isSystemColumnDisplayName(columnName string) bool { - lowerName := strings.ToLower(columnName) - return lowerName == SW_DISPLAY_NAME_TIMESTAMP || - lowerName == SW_COLUMN_NAME_KEY || - lowerName == SW_COLUMN_NAME_SOURCE -} - // getSystemColumnInternalName returns the internal name for a system column display name func (e *SQLEngine) getSystemColumnInternalName(displayName string) string { lowerName := strings.ToLower(displayName)