diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 3bf8b7c51..be98d0b7c 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -172,7 +172,7 @@ func (f *filerClientImpl) GetDataCenter() string { } // ListNamespaces retrieves all MQ namespaces (databases) from the filer -// ✅ RESOLVED: Now queries actual topic directories instead of hardcoded values +// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { // Get filer client to list directories under /topics filerClient, err := c.GetFilerClient() @@ -219,7 +219,7 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { } // ListTopics retrieves all topics in a namespace from the filer -// ✅ RESOLVED: Now queries actual topic directories instead of hardcoded values +// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) { // Get filer client to list directories under /topics/{namespace} filerClient, err := c.GetFilerClient() diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 3c6a92157..afff1a889 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -152,7 +152,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. } // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) - // ✅ RESOLVED TODO: Get real filerClient from broker connection + // RESOLVED TODO: Get real filerClient from broker connection var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { var filerClientErr error @@ -235,7 +235,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. } // Build hybrid scan options - // ✅ RESOLVED TODO: Extract from WHERE clause time filters + // RESOLVED TODO: Extract from WHERE clause time filters startTimeNs, stopTimeNs := int64(0), int64(0) if stmt.Where != nil { startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) @@ -1097,18 +1097,18 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner * startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) } - // 🚀 FAST PATH: Try to use parquet statistics for optimization + // FAST PATH: Try to use parquet statistics for optimization // This can be ~130x faster than scanning all data if stmt.Where == nil { // Only optimize when no complex WHERE clause fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) if canOptimize { - fmt.Printf("✅ Using fast parquet statistics for aggregation (skipped full scan)\n") + fmt.Printf("Using fast parquet statistics for aggregation (skipped full scan)\n") return fastResult, nil } } // SLOW PATH: Fall back to full table scan - fmt.Printf("⚠️ Using full table scan for aggregation (parquet optimization not applicable)\n") + fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") // Build scan options for full table scan (aggregations need all data) hybridScanOptions := HybridScanOptions{ diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index f0ae5ff6c..5bcb65534 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -137,7 +137,7 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt var results []HybridScanResult // Get all partitions for this topic - // ✅ RESOLVED TODO: Implement proper partition discovery via MQ broker + // RESOLVED TODO: Implement proper partition discovery via MQ broker partitions, err := hms.discoverTopicPartitions(ctx) if err != nil { // Fallback to default partition if discovery fails @@ -316,7 +316,7 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { // This is an archived message from Parquet files - // ✅ FIX: Add system columns from LogEntry to RecordValue + // FIX: Add system columns from LogEntry to RecordValue if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } @@ -333,7 +333,7 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb } // If not a RecordValue, this is raw live message data - // ✅ RESOLVED TODO: Implement proper schema-aware parsing based on topic schema + // RESOLVED TODO: Implement proper schema-aware parsing based on topic schema return hms.parseRawMessageWithSchema(logEntry) }