Browse Source

handling errors

pull/7185/head
chrislu 1 month ago
parent
commit
69e6902072
  1. 65
      weed/mq/topic/topic.go
  2. 10
      weed/query/engine/catalog.go
  3. 47
      weed/query/engine/engine.go
  4. 53
      weed/query/engine/errors.go
  5. 38
      weed/server/postgres/protocol.go
  6. 44
      weed/shell/command_mq_topic_truncate.go

65
weed/mq/topic/topic.go

@ -5,11 +5,14 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
jsonpb "google.golang.org/protobuf/encoding/protojson"
)
@ -102,3 +105,65 @@ func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.Con
}
return nil
}
// DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem
// This centralizes partition discovery logic used across query engine, shell commands, etc.
func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) {
var partitionPaths []string
// Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34)
err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
if !versionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
if !IsValidVersionDirectory(versionEntry.Name) {
// Skip directories that don't match the version format
return nil
}
// Scan partition directories within this version (e.g., 0000-0630)
versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name)
return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
if !partitionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse partition boundary from directory name (e.g., "0000-0630")
if !IsValidPartitionDirectory(partitionEntry.Name) {
return nil // Skip invalid partition names
}
// Add this partition path to the list
partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name)
partitionPaths = append(partitionPaths, partitionPath)
return nil
})
})
return partitionPaths, err
}
// IsValidVersionDirectory checks if a directory name matches the topic version format
// Format: v2025-09-01-07-16-34
func IsValidVersionDirectory(name string) bool {
if !strings.HasPrefix(name, "v") || len(name) != 20 {
return false
}
// Try to parse the timestamp part
timestampStr := name[1:] // Remove 'v' prefix
_, err := time.Parse("2006-01-02-15-04-05", timestampStr)
return err == nil
}
// IsValidPartitionDirectory checks if a directory name matches the partition boundary format
// Format: 0000-0630 (rangeStart-rangeStop)
func IsValidPartitionDirectory(name string) bool {
// Use existing ParsePartitionBoundary function to validate
start, stop := ParsePartitionBoundary(name)
// Valid partition ranges should have start < stop (and not both be 0, which indicates parse error)
return start < stop && start >= 0
}

10
weed/query/engine/catalog.go

@ -150,12 +150,18 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error)
db, exists := c.databases[database]
if !exists {
return nil, fmt.Errorf("database '%s' not found", database)
return nil, TableNotFoundError{
Database: database,
Table: "",
}
}
tableInfo, exists := db.Tables[table]
if !exists {
return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database)
return nil, TableNotFoundError{
Database: database,
Table: table,
}
}
return tableInfo, nil

47
weed/query/engine/engine.go

@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
@ -265,7 +266,10 @@ func ParseSQL(sql string) (Statement, error) {
return parseSelectStatement(sql)
}
return nil, fmt.Errorf("unsupported statement type: %s", sqlUpper)
return nil, UnsupportedFeatureError{
Feature: fmt.Sprintf("statement type: %s", strings.Fields(sqlUpper)[0]),
Reason: "statement parsing not implemented",
}
}
// parseSelectStatement parses SELECT statements using a lightweight parser
@ -280,7 +284,10 @@ func parseSelectStatement(sql string) (*SelectStatement, error) {
// Find SELECT clause
selectIdx := strings.Index(sqlUpper, "SELECT")
if selectIdx == -1 {
return nil, fmt.Errorf("SELECT keyword not found")
return nil, ParseError{
Query: sql,
Message: "SELECT keyword not found",
}
}
// Find FROM clause
@ -3089,10 +3096,10 @@ func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partiti
return rowCount, nil
}
// discoverTopicPartitions discovers all partitions for a given topic
// discoverTopicPartitions discovers all partitions for a given topic using centralized logic
func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) {
// Use the same discovery logic as in hybrid_message_scanner.go
topicPath := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
// Use centralized topic partition discovery
t := topic.NewTopic(namespace, topicName)
// Get FilerClient from BrokerClient
filerClient, err := e.catalog.brokerClient.GetFilerClient()
@ -3100,35 +3107,7 @@ func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]stri
return nil, err
}
var partitions []string
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(topicPath), "", func(entry *filer_pb.Entry, isLast bool) error {
if !entry.IsDirectory {
return nil
}
// Check if this looks like a partition directory (format: vYYYY-MM-DD-HH-MM-SS)
if strings.HasPrefix(entry.Name, "v") && len(entry.Name) == 20 {
// This is a time-based partition directory
// Look for numeric subdirectories (partition IDs)
partitionBasePath := fmt.Sprintf("%s/%s", topicPath, entry.Name)
err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionBasePath), "", func(subEntry *filer_pb.Entry, isLast bool) error {
if subEntry.IsDirectory {
// Check if this is a numeric partition directory (format: 0000-XXXX)
if len(subEntry.Name) >= 4 {
partitionPath := fmt.Sprintf("%s/%s", entry.Name, subEntry.Name)
partitions = append(partitions, partitionPath)
}
}
return nil
})
if err != nil {
return err
}
}
return nil
})
return partitions, err
return t.DiscoverPartitions(context.Background(), filerClient)
}
// getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs)

53
weed/query/engine/errors.go

@ -34,3 +34,56 @@ type OptimizationError struct {
func (e OptimizationError) Error() string {
return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason)
}
// ParseError represents SQL parsing errors
type ParseError struct {
Query string
Message string
Cause error
}
func (e ParseError) Error() string {
if e.Cause != nil {
return fmt.Sprintf("SQL parse error: %s (%v)", e.Message, e.Cause)
}
return fmt.Sprintf("SQL parse error: %s", e.Message)
}
// TableNotFoundError represents table/topic not found errors
type TableNotFoundError struct {
Database string
Table string
}
func (e TableNotFoundError) Error() string {
if e.Database != "" {
return fmt.Sprintf("table %s.%s not found", e.Database, e.Table)
}
return fmt.Sprintf("table %s not found", e.Table)
}
// ColumnNotFoundError represents column not found errors
type ColumnNotFoundError struct {
Table string
Column string
}
func (e ColumnNotFoundError) Error() string {
if e.Table != "" {
return fmt.Sprintf("column %s not found in table %s", e.Column, e.Table)
}
return fmt.Sprintf("column %s not found", e.Column)
}
// UnsupportedFeatureError represents unsupported SQL features
type UnsupportedFeatureError struct {
Feature string
Reason string
}
func (e UnsupportedFeatureError) Error() string {
if e.Reason != "" {
return fmt.Sprintf("feature not supported: %s (%s)", e.Feature, e.Reason)
}
return fmt.Sprintf("feature not supported: %s", e.Feature)
}

38
weed/server/postgres/protocol.go

@ -77,31 +77,39 @@ func mapErrorToPostgreSQLCode(err error) string {
return "00000" // Success
}
errStr := err.Error()
// Use typed errors for robust error mapping
switch err.(type) {
case engine.ParseError:
return "42601" // Syntax error
case engine.TableNotFoundError:
return "42P01" // Undefined table
case engine.ColumnNotFoundError:
return "42703" // Undefined column
case engine.UnsupportedFeatureError:
return "0A000" // Feature not supported
// Map specific engine error types
switch e := err.(type) {
case engine.AggregationError:
// Aggregation errors are usually function-related issues
if strings.Contains(e.Error(), "unsupported") {
return "0A000" // Feature not supported
}
return "42703" // Undefined column (column-related aggregation issues)
return "42883" // Undefined function (aggregation function issues)
case engine.DataSourceError:
// Data source errors could be table/topic not found
if strings.Contains(e.Error(), "not found") || strings.Contains(e.Error(), "topic") {
return "42P01" // Undefined table
}
return "08000" // Connection exception (data source access issues)
// Data source errors are usually access or connection issues
return "08000" // Connection exception
case engine.OptimizationError:
// Optimization failures are usually feature limitations
return "0A000" // Feature not supported
case engine.NoSchemaError:
// Topic exists but no schema available
return "42P01" // Undefined table (treat as table not found)
}
// Map based on error message patterns
errLower := strings.ToLower(errStr)
// Fallback: analyze error message for backward compatibility with non-typed errors
errLower := strings.ToLower(err.Error())
// Parsing and syntax errors
if strings.Contains(errLower, "parse error") || strings.Contains(errLower, "syntax") {
@ -115,7 +123,7 @@ func mapErrorToPostgreSQLCode(err error) string {
// Table/topic not found
if strings.Contains(errLower, "not found") ||
strings.Contains(errLower, "topic") && strings.Contains(errLower, "available") {
(strings.Contains(errLower, "topic") && strings.Contains(errLower, "available")) {
return "42P01" // Undefined table
}

44
weed/shell/command_mq_topic_truncate.go

@ -70,8 +70,8 @@ func (c *commandMqTopicTruncate) Do(args []string, commandEnv *CommandEnv, write
fmt.Fprintf(writer, "Truncating topic %s.%s...\n", *namespace, *topicName)
// Discover and clear all partitions
partitions, err := c.discoverTopicPartitions(commandEnv, t)
// Discover and clear all partitions using centralized logic
partitions, err := t.DiscoverPartitions(context.Background(), commandEnv)
if err != nil {
return fmt.Errorf("failed to discover topic partitions: %v", err)
}
@ -101,46 +101,6 @@ func (c *commandMqTopicTruncate) Do(args []string, commandEnv *CommandEnv, write
return nil
}
// discoverTopicPartitions discovers all partition directories for a topic
func (c *commandMqTopicTruncate) discoverTopicPartitions(commandEnv *CommandEnv, t topic.Topic) ([]string, error) {
var partitionPaths []string
// Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34)
err := filer_pb.ReadDirAllEntries(context.Background(), commandEnv, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
if !versionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
_, parseErr := topic.ParseTopicVersion(versionEntry.Name)
if parseErr != nil {
// Skip directories that don't match the version format
return nil
}
// Scan partition directories within this version (e.g., 0000-0630)
versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name)
return filer_pb.ReadDirAllEntries(context.Background(), commandEnv, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
if !partitionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse partition boundary from directory name (e.g., "0000-0630")
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
if rangeStart == rangeStop {
return nil // Skip invalid partition names
}
// Add this partition path to the list
partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name)
partitionPaths = append(partitionPaths, partitionPath)
return nil
})
})
return partitionPaths, err
}
// clearPartitionData deletes all data files (log files, parquet files) from a partition directory
// Returns the number of files deleted
func (c *commandMqTopicTruncate) clearPartitionData(commandEnv *CommandEnv, partitionPath string, writer io.Writer) (int, error) {

Loading…
Cancel
Save