|
|
@ -713,7 +713,8 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. |
|
|
|
if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { |
|
|
|
// Topic not in catalog, try to discover and register it
|
|
|
|
if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { |
|
|
|
fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) |
|
|
|
// Return error immediately for non-existent topics instead of falling back to sample data
|
|
|
|
return &QueryResult{Error: regErr}, regErr |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -723,14 +724,15 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. |
|
|
|
var filerClientErr error |
|
|
|
filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() |
|
|
|
if filerClientErr != nil { |
|
|
|
// Log warning but continue with sample data fallback
|
|
|
|
fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr) |
|
|
|
// Return error if filer client is not available for topic access
|
|
|
|
return &QueryResult{Error: filerClientErr}, filerClientErr |
|
|
|
} |
|
|
|
|
|
|
|
hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName) |
|
|
|
if err != nil { |
|
|
|
// Fallback to sample data if topic doesn't exist or filer unavailable
|
|
|
|
return e.executeSelectWithSampleData(ctx, stmt, database, tableName) |
|
|
|
// Return error for topic access issues instead of misleading sample data
|
|
|
|
topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) |
|
|
|
return &QueryResult{Error: topicErr}, topicErr |
|
|
|
} |
|
|
|
|
|
|
|
// Parse SELECT columns and detect aggregation functions
|
|
|
|