From cf9ad266082050ac4db03a73900399e7e34d9284 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 10:50:01 -0700 Subject: [PATCH] scan topic messages --- weed/mq/logstore/read_log_from_disk.go | 8 +- weed/query/engine/broker_client.go | 3 +- weed/query/engine/engine.go | 41 +++++++++++ weed/query/engine/hybrid_message_scanner.go | 81 +++++++++++++-------- 4 files changed, 95 insertions(+), 38 deletions(-) diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 19b96a88d..61c231461 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -3,6 +3,10 @@ package logstore import ( "context" "fmt" + "math" + "strings" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -11,9 +15,6 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "math" - "strings" - "time" ) func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { @@ -90,7 +91,6 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top for _, urlString := range urlStrings { // TODO optimization opportunity: reuse the buffer var data []byte - // fmt.Printf("reading %s/%s %s\n", partitionDir, entry.Name, urlString) if data, _, err = util_http.Get(urlString); err == nil { processed = true if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 495573fa2..3bf8b7c51 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -162,8 +162,7 @@ func (f *filerClientImpl) WithFilerClient(followRedirect bool, fn func(client fi // AdjustedUrl implements the FilerClient interface (placeholder implementation) func (f *filerClientImpl) AdjustedUrl(location *filer_pb.Location) string { - // Simple implementation for MQ data access - may need adjustment for production - return fmt.Sprintf("http://%s", location.Url) + return location.Url } // GetDataCenter implements the FilerClient interface (placeholder implementation) diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 581fceb6f..9d776fc52 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/xwb1989/sqlparser" ) @@ -36,6 +37,12 @@ type QueryResult struct { // NewSQLEngine creates a new SQL execution engine // Uses master address for service discovery and initialization func NewSQLEngine(masterAddress string) *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if http.GetGlobalHttpClient() == nil { + http.InitGlobalHttpClient() + } + return &SQLEngine{ catalog: NewSchemaCatalog(masterAddress), } @@ -135,6 +142,14 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. } } + // Auto-discover and register topic if not already in catalog + if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { + // Topic not in catalog, try to discover and register it + if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { + fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) + } + } + // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) // ✅ RESOLVED TODO: Get real filerClient from broker connection var filerClient filer_pb.FilerClient @@ -972,3 +987,29 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR return result, nil } + +// discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog +func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { + // First, check if topic exists by trying to get its schema from the broker/filer + recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) + if err != nil { + return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err) + } + + // Create a schema object from the discovered record type + mqSchema := &schema.Schema{ + Namespace: database, + Name: tableName, + RecordType: recordType, + RevisionId: 1, // Default to revision 1 for discovered topics + } + + // Register the topic in the SQL catalog + err = e.catalog.RegisterTopic(database, tableName, mqSchema) + if err != nil { + return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err) + } + + fmt.Printf("Auto-discovered and registered topic: %s.%s\n", database, tableName) + return nil +} diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2e48d5f7b..3907f333f 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" ) @@ -141,52 +142,68 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt return results, nil } -// discoverTopicPartitions discovers the actual partitions for this topic -// Uses filerClient to read topic configuration and determine partition layout +// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem +// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { if hms.filerClient == nil { return nil, fmt.Errorf("filerClient not available for partition discovery") } - // Read topic configuration from filer - var topicConf *mq_pb.ConfigureTopicResponse + var allPartitions []topic.Partition var err error - err = hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - topicConf, err = hms.topic.ReadConfFile(client) - return err - }) - if err != nil { - return nil, fmt.Errorf("failed to read topic config for partition discovery: %v", err) - } + // Scan the topic directory for actual partition versions (timestamped directories) + // List all version directories in the topic directory + err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { + if !versionEntry.IsDirectory { + return nil // Skip non-directories + } - // Generate partitions based on topic configuration - partitionCount := int32(4) // Default partition count - if len(topicConf.BrokerPartitionAssignments) > 0 { - partitionCount = int32(len(topicConf.BrokerPartitionAssignments)) - } + // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") + versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name) + if parseErr != nil { + // Skip directories that don't match the version format + return nil + } + + // Scan partition directories within this version + versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name) + return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { + if !partitionEntry.IsDirectory { + return nil // Skip non-directories + } - // Create partition ranges following SeaweedFS MQ pattern - rangeSize := topic.PartitionCount / partitionCount - var partitions []topic.Partition + // Parse partition boundary from directory name (e.g., "0000-0630") + rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name) + if rangeStart == rangeStop { + return nil // Skip invalid partition names + } - for i := int32(0); i < partitionCount; i++ { - rangeStart := i * rangeSize - rangeStop := (i + 1) * rangeSize - if i == partitionCount-1 { - // Last partition covers remaining range - rangeStop = topic.PartitionCount - } + // Create partition object + partition := topic.Partition{ + RangeStart: rangeStart, + RangeStop: rangeStop, + RingSize: topic.PartitionCount, + UnixTimeNs: versionTime.UnixNano(), + } - partitions = append(partitions, topic.Partition{ - RangeStart: rangeStart, - RangeStop: rangeStop, - RingSize: topic.PartitionCount, - UnixTimeNs: time.Now().UnixNano(), + allPartitions = append(allPartitions, partition) + return nil }) + }) + + if err != nil { + return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err) + } + + // If no partitions found, use fallback + if len(allPartitions) == 0 { + fmt.Printf("No partitions found in filesystem for topic %s, using default partition\n", hms.topic.String()) + return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil } - return partitions, nil + fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String()) + return allPartitions, nil } // scanPartitionHybrid scans a specific partition using the hybrid approach