Browse Source

scan topic messages

pull/7185/head
chrislu 1 month ago
parent
commit
cf9ad26608
  1. 8
      weed/mq/logstore/read_log_from_disk.go
  2. 3
      weed/query/engine/broker_client.go
  3. 41
      weed/query/engine/engine.go
  4. 81
      weed/query/engine/hybrid_message_scanner.go

8
weed/mq/logstore/read_log_from_disk.go

@ -3,6 +3,10 @@ package logstore
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
@ -11,9 +15,6 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"math"
"strings"
"time"
) )
func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
@ -90,7 +91,6 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
// TODO optimization opportunity: reuse the buffer // TODO optimization opportunity: reuse the buffer
var data []byte var data []byte
// fmt.Printf("reading %s/%s %s\n", partitionDir, entry.Name, urlString)
if data, _, err = util_http.Get(urlString); err == nil { if data, _, err = util_http.Get(urlString); err == nil {
processed = true processed = true
if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {

3
weed/query/engine/broker_client.go

@ -162,8 +162,7 @@ func (f *filerClientImpl) WithFilerClient(followRedirect bool, fn func(client fi
// AdjustedUrl implements the FilerClient interface (placeholder implementation) // AdjustedUrl implements the FilerClient interface (placeholder implementation)
func (f *filerClientImpl) AdjustedUrl(location *filer_pb.Location) string { func (f *filerClientImpl) AdjustedUrl(location *filer_pb.Location) string {
// Simple implementation for MQ data access - may need adjustment for production
return fmt.Sprintf("http://%s", location.Url)
return location.Url
} }
// GetDataCenter implements the FilerClient interface (placeholder implementation) // GetDataCenter implements the FilerClient interface (placeholder implementation)

41
weed/query/engine/engine.go

@ -13,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/xwb1989/sqlparser" "github.com/xwb1989/sqlparser"
) )
@ -36,6 +37,12 @@ type QueryResult struct {
// NewSQLEngine creates a new SQL execution engine // NewSQLEngine creates a new SQL execution engine
// Uses master address for service discovery and initialization // Uses master address for service discovery and initialization
func NewSQLEngine(masterAddress string) *SQLEngine { func NewSQLEngine(masterAddress string) *SQLEngine {
// Initialize global HTTP client if not already done
// This is needed for reading partition data from the filer
if http.GetGlobalHttpClient() == nil {
http.InitGlobalHttpClient()
}
return &SQLEngine{ return &SQLEngine{
catalog: NewSchemaCatalog(masterAddress), catalog: NewSchemaCatalog(masterAddress),
} }
@ -135,6 +142,14 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
} }
} }
// Auto-discover and register topic if not already in catalog
if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
// Topic not in catalog, try to discover and register it
if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr)
}
}
// Create HybridMessageScanner for the topic (reads both live logs + Parquet files) // Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
// ✅ RESOLVED TODO: Get real filerClient from broker connection // ✅ RESOLVED TODO: Get real filerClient from broker connection
var filerClient filer_pb.FilerClient var filerClient filer_pb.FilerClient
@ -972,3 +987,29 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR
return result, nil return result, nil
} }
// discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog
func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error {
// First, check if topic exists by trying to get its schema from the broker/filer
recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
if err != nil {
return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err)
}
// Create a schema object from the discovered record type
mqSchema := &schema.Schema{
Namespace: database,
Name: tableName,
RecordType: recordType,
RevisionId: 1, // Default to revision 1 for discovered topics
}
// Register the topic in the SQL catalog
err = e.catalog.RegisterTopic(database, tableName, mqSchema)
if err != nil {
return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err)
}
fmt.Printf("Auto-discovered and registered topic: %s.%s\n", database, tableName)
return nil
}

81
weed/query/engine/hybrid_message_scanner.go

@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -141,52 +142,68 @@ func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOpt
return results, nil return results, nil
} }
// discoverTopicPartitions discovers the actual partitions for this topic
// Uses filerClient to read topic configuration and determine partition layout
// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
if hms.filerClient == nil { if hms.filerClient == nil {
return nil, fmt.Errorf("filerClient not available for partition discovery") return nil, fmt.Errorf("filerClient not available for partition discovery")
} }
// Read topic configuration from filer
var topicConf *mq_pb.ConfigureTopicResponse
var allPartitions []topic.Partition
var err error var err error
err = hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
topicConf, err = hms.topic.ReadConfFile(client)
return err
})
if err != nil {
return nil, fmt.Errorf("failed to read topic config for partition discovery: %v", err)
}
// Scan the topic directory for actual partition versions (timestamped directories)
// List all version directories in the topic directory
err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
if !versionEntry.IsDirectory {
return nil // Skip non-directories
}
// Generate partitions based on topic configuration
partitionCount := int32(4) // Default partition count
if len(topicConf.BrokerPartitionAssignments) > 0 {
partitionCount = int32(len(topicConf.BrokerPartitionAssignments))
}
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
if parseErr != nil {
// Skip directories that don't match the version format
return nil
}
// Scan partition directories within this version
versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
if !partitionEntry.IsDirectory {
return nil // Skip non-directories
}
// Create partition ranges following SeaweedFS MQ pattern
rangeSize := topic.PartitionCount / partitionCount
var partitions []topic.Partition
// Parse partition boundary from directory name (e.g., "0000-0630")
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
if rangeStart == rangeStop {
return nil // Skip invalid partition names
}
for i := int32(0); i < partitionCount; i++ {
rangeStart := i * rangeSize
rangeStop := (i + 1) * rangeSize
if i == partitionCount-1 {
// Last partition covers remaining range
rangeStop = topic.PartitionCount
}
// Create partition object
partition := topic.Partition{
RangeStart: rangeStart,
RangeStop: rangeStop,
RingSize: topic.PartitionCount,
UnixTimeNs: versionTime.UnixNano(),
}
partitions = append(partitions, topic.Partition{
RangeStart: rangeStart,
RangeStop: rangeStop,
RingSize: topic.PartitionCount,
UnixTimeNs: time.Now().UnixNano(),
allPartitions = append(allPartitions, partition)
return nil
}) })
})
if err != nil {
return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
}
// If no partitions found, use fallback
if len(allPartitions) == 0 {
fmt.Printf("No partitions found in filesystem for topic %s, using default partition\n", hms.topic.String())
return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil
} }
return partitions, nil
fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
return allPartitions, nil
} }
// scanPartitionHybrid scans a specific partition using the hybrid approach // scanPartitionHybrid scans a specific partition using the hybrid approach

Loading…
Cancel
Save