diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 56b9cda5f..6fb0f0ce9 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -5,11 +5,14 @@ import ( "context" "errors" "fmt" + "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" jsonpb "google.golang.org/protobuf/encoding/protojson" ) @@ -102,3 +105,65 @@ func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.Con } return nil } + +// DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem +// This centralizes partition discovery logic used across query engine, shell commands, etc. +func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) { + var partitionPaths []string + + // Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34) + err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { + if !versionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") + if !IsValidVersionDirectory(versionEntry.Name) { + // Skip directories that don't match the version format + return nil + } + + // Scan partition directories within this version (e.g., 0000-0630) + versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name) + return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { + if !partitionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse partition boundary from directory name (e.g., "0000-0630") + if !IsValidPartitionDirectory(partitionEntry.Name) { + return nil // Skip invalid partition names + } + + // Add this partition path to the list + partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name) + partitionPaths = append(partitionPaths, partitionPath) + return nil + }) + }) + + return partitionPaths, err +} + +// IsValidVersionDirectory checks if a directory name matches the topic version format +// Format: v2025-09-01-07-16-34 +func IsValidVersionDirectory(name string) bool { + if !strings.HasPrefix(name, "v") || len(name) != 20 { + return false + } + + // Try to parse the timestamp part + timestampStr := name[1:] // Remove 'v' prefix + _, err := time.Parse("2006-01-02-15-04-05", timestampStr) + return err == nil +} + +// IsValidPartitionDirectory checks if a directory name matches the partition boundary format +// Format: 0000-0630 (rangeStart-rangeStop) +func IsValidPartitionDirectory(name string) bool { + // Use existing ParsePartitionBoundary function to validate + start, stop := ParsePartitionBoundary(name) + + // Valid partition ranges should have start < stop (and not both be 0, which indicates parse error) + return start < stop && start >= 0 +} diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 13857fd59..513975087 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -150,12 +150,18 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) db, exists := c.databases[database] if !exists { - return nil, fmt.Errorf("database '%s' not found", database) + return nil, TableNotFoundError{ + Database: database, + Table: "", + } } tableInfo, exists := db.Tables[table] if !exists { - return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database) + return nil, TableNotFoundError{ + Database: database, + Table: table, + } } return tableInfo, nil diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 8cd127a4c..dc250a2dc 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" @@ -265,7 +266,10 @@ func ParseSQL(sql string) (Statement, error) { return parseSelectStatement(sql) } - return nil, fmt.Errorf("unsupported statement type: %s", sqlUpper) + return nil, UnsupportedFeatureError{ + Feature: fmt.Sprintf("statement type: %s", strings.Fields(sqlUpper)[0]), + Reason: "statement parsing not implemented", + } } // parseSelectStatement parses SELECT statements using a lightweight parser @@ -280,7 +284,10 @@ func parseSelectStatement(sql string) (*SelectStatement, error) { // Find SELECT clause selectIdx := strings.Index(sqlUpper, "SELECT") if selectIdx == -1 { - return nil, fmt.Errorf("SELECT keyword not found") + return nil, ParseError{ + Query: sql, + Message: "SELECT keyword not found", + } } // Find FROM clause @@ -3089,10 +3096,10 @@ func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partiti return rowCount, nil } -// discoverTopicPartitions discovers all partitions for a given topic +// discoverTopicPartitions discovers all partitions for a given topic using centralized logic func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) { - // Use the same discovery logic as in hybrid_message_scanner.go - topicPath := fmt.Sprintf("/topics/%s/%s", namespace, topicName) + // Use centralized topic partition discovery + t := topic.NewTopic(namespace, topicName) // Get FilerClient from BrokerClient filerClient, err := e.catalog.brokerClient.GetFilerClient() @@ -3100,35 +3107,7 @@ func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]stri return nil, err } - var partitions []string - err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(topicPath), "", func(entry *filer_pb.Entry, isLast bool) error { - if !entry.IsDirectory { - return nil - } - - // Check if this looks like a partition directory (format: vYYYY-MM-DD-HH-MM-SS) - if strings.HasPrefix(entry.Name, "v") && len(entry.Name) == 20 { - // This is a time-based partition directory - // Look for numeric subdirectories (partition IDs) - partitionBasePath := fmt.Sprintf("%s/%s", topicPath, entry.Name) - err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionBasePath), "", func(subEntry *filer_pb.Entry, isLast bool) error { - if subEntry.IsDirectory { - // Check if this is a numeric partition directory (format: 0000-XXXX) - if len(subEntry.Name) >= 4 { - partitionPath := fmt.Sprintf("%s/%s", entry.Name, subEntry.Name) - partitions = append(partitions, partitionPath) - } - } - return nil - }) - if err != nil { - return err - } - } - return nil - }) - - return partitions, err + return t.DiscoverPartitions(context.Background(), filerClient) } // getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs) diff --git a/weed/query/engine/errors.go b/weed/query/engine/errors.go index 6bc3e9c21..6a297d92f 100644 --- a/weed/query/engine/errors.go +++ b/weed/query/engine/errors.go @@ -34,3 +34,56 @@ type OptimizationError struct { func (e OptimizationError) Error() string { return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason) } + +// ParseError represents SQL parsing errors +type ParseError struct { + Query string + Message string + Cause error +} + +func (e ParseError) Error() string { + if e.Cause != nil { + return fmt.Sprintf("SQL parse error: %s (%v)", e.Message, e.Cause) + } + return fmt.Sprintf("SQL parse error: %s", e.Message) +} + +// TableNotFoundError represents table/topic not found errors +type TableNotFoundError struct { + Database string + Table string +} + +func (e TableNotFoundError) Error() string { + if e.Database != "" { + return fmt.Sprintf("table %s.%s not found", e.Database, e.Table) + } + return fmt.Sprintf("table %s not found", e.Table) +} + +// ColumnNotFoundError represents column not found errors +type ColumnNotFoundError struct { + Table string + Column string +} + +func (e ColumnNotFoundError) Error() string { + if e.Table != "" { + return fmt.Sprintf("column %s not found in table %s", e.Column, e.Table) + } + return fmt.Sprintf("column %s not found", e.Column) +} + +// UnsupportedFeatureError represents unsupported SQL features +type UnsupportedFeatureError struct { + Feature string + Reason string +} + +func (e UnsupportedFeatureError) Error() string { + if e.Reason != "" { + return fmt.Sprintf("feature not supported: %s (%s)", e.Feature, e.Reason) + } + return fmt.Sprintf("feature not supported: %s", e.Feature) +} diff --git a/weed/server/postgres/protocol.go b/weed/server/postgres/protocol.go index 31773fd29..9d19d3593 100644 --- a/weed/server/postgres/protocol.go +++ b/weed/server/postgres/protocol.go @@ -77,31 +77,39 @@ func mapErrorToPostgreSQLCode(err error) string { return "00000" // Success } - errStr := err.Error() + // Use typed errors for robust error mapping + switch err.(type) { + case engine.ParseError: + return "42601" // Syntax error + + case engine.TableNotFoundError: + return "42P01" // Undefined table + + case engine.ColumnNotFoundError: + return "42703" // Undefined column + + case engine.UnsupportedFeatureError: + return "0A000" // Feature not supported - // Map specific engine error types - switch e := err.(type) { case engine.AggregationError: // Aggregation errors are usually function-related issues - if strings.Contains(e.Error(), "unsupported") { - return "0A000" // Feature not supported - } - return "42703" // Undefined column (column-related aggregation issues) + return "42883" // Undefined function (aggregation function issues) case engine.DataSourceError: - // Data source errors could be table/topic not found - if strings.Contains(e.Error(), "not found") || strings.Contains(e.Error(), "topic") { - return "42P01" // Undefined table - } - return "08000" // Connection exception (data source access issues) + // Data source errors are usually access or connection issues + return "08000" // Connection exception case engine.OptimizationError: // Optimization failures are usually feature limitations return "0A000" // Feature not supported + + case engine.NoSchemaError: + // Topic exists but no schema available + return "42P01" // Undefined table (treat as table not found) } - // Map based on error message patterns - errLower := strings.ToLower(errStr) + // Fallback: analyze error message for backward compatibility with non-typed errors + errLower := strings.ToLower(err.Error()) // Parsing and syntax errors if strings.Contains(errLower, "parse error") || strings.Contains(errLower, "syntax") { @@ -115,7 +123,7 @@ func mapErrorToPostgreSQLCode(err error) string { // Table/topic not found if strings.Contains(errLower, "not found") || - strings.Contains(errLower, "topic") && strings.Contains(errLower, "available") { + (strings.Contains(errLower, "topic") && strings.Contains(errLower, "available")) { return "42P01" // Undefined table } diff --git a/weed/shell/command_mq_topic_truncate.go b/weed/shell/command_mq_topic_truncate.go index 1128e3b88..da4bd407a 100644 --- a/weed/shell/command_mq_topic_truncate.go +++ b/weed/shell/command_mq_topic_truncate.go @@ -70,8 +70,8 @@ func (c *commandMqTopicTruncate) Do(args []string, commandEnv *CommandEnv, write fmt.Fprintf(writer, "Truncating topic %s.%s...\n", *namespace, *topicName) - // Discover and clear all partitions - partitions, err := c.discoverTopicPartitions(commandEnv, t) + // Discover and clear all partitions using centralized logic + partitions, err := t.DiscoverPartitions(context.Background(), commandEnv) if err != nil { return fmt.Errorf("failed to discover topic partitions: %v", err) } @@ -101,46 +101,6 @@ func (c *commandMqTopicTruncate) Do(args []string, commandEnv *CommandEnv, write return nil } -// discoverTopicPartitions discovers all partition directories for a topic -func (c *commandMqTopicTruncate) discoverTopicPartitions(commandEnv *CommandEnv, t topic.Topic) ([]string, error) { - var partitionPaths []string - - // Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34) - err := filer_pb.ReadDirAllEntries(context.Background(), commandEnv, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { - if !versionEntry.IsDirectory { - return nil // Skip non-directories - } - - // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") - _, parseErr := topic.ParseTopicVersion(versionEntry.Name) - if parseErr != nil { - // Skip directories that don't match the version format - return nil - } - - // Scan partition directories within this version (e.g., 0000-0630) - versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name) - return filer_pb.ReadDirAllEntries(context.Background(), commandEnv, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { - if !partitionEntry.IsDirectory { - return nil // Skip non-directories - } - - // Parse partition boundary from directory name (e.g., "0000-0630") - rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name) - if rangeStart == rangeStop { - return nil // Skip invalid partition names - } - - // Add this partition path to the list - partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name) - partitionPaths = append(partitionPaths, partitionPath) - return nil - }) - }) - - return partitionPaths, err -} - // clearPartitionData deletes all data files (log files, parquet files) from a partition directory // Returns the number of files deleted func (c *commandMqTopicTruncate) clearPartitionData(commandEnv *CommandEnv, partitionPath string, writer io.Writer) (int, error) {