From ac8e6c8c8247258f79805c4ec6664ec605505250 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 08:56:27 -0700 Subject: [PATCH] actual column types --- weed/query/engine/broker_client.go | 78 ++++++++++++++++-------------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 9c678df4d..495573fa2 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -17,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + jsonpb "google.golang.org/protobuf/encoding/protojson" ) // BrokerClient handles communication with SeaweedFS MQ broker @@ -268,53 +270,59 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri } // GetTopicSchema retrieves schema information for a specific topic -// Assumption: Topic metadata includes schema information +// Reads the actual schema from topic configuration stored in filer func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) { - if err := c.findBrokerBalancer(); err != nil { - return nil, err + // Get filer client to read topic configuration + filerClient, err := c.GetFilerClient() + if err != nil { + return nil, fmt.Errorf("failed to get filer client: %v", err) } - // TODO: Implement proper schema retrieval - // This might be part of LookupTopicBrokers or a dedicated GetTopicSchema method + var recordType *schema_pb.RecordType + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Read topic.conf file from /topics/{namespace}/{topic}/topic.conf + topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName) - conn, err := grpc.Dial(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("failed to connect to broker at %s: %v", c.brokerAddress, err) - } - defer conn.Close() + // First check if topic directory exists + _, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: topicDir, + Name: "topic.conf", + }) + if err != nil { + return fmt.Errorf("topic %s.%s not found: %v", namespace, topicName, err) + } - client := mq_pb.NewSeaweedMessagingClient(conn) + // Read the topic.conf file content + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("failed to read topic.conf for %s.%s: %v", namespace, topicName, err) + } - // Use LookupTopicBrokers to get topic information - resp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{ - Topic: &schema_pb.Topic{ - Namespace: namespace, - Name: topicName, - }, + // Parse the configuration + conf := &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err) + } + + // Extract the record type (schema) + if conf.RecordType != nil { + recordType = conf.RecordType + } else { + return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName) + } + + return nil }) + if err != nil { - return nil, fmt.Errorf("failed to lookup topic %s.%s: %v", namespace, topicName, err) + return nil, err } - // TODO: Extract schema from topic metadata - // For now, return a placeholder schema - if len(resp.BrokerPartitionAssignments) == 0 { - return nil, fmt.Errorf("topic %s.%s not found", namespace, topicName) + if recordType == nil { + return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName) } - // Placeholder schema - real implementation would extract from topic metadata - return &schema_pb.RecordType{ - Fields: []*schema_pb.Field{ - { - Name: "timestamp", - Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, - }, - { - Name: "data", - Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, - }, - }, - }, nil + return recordType, nil } // ConfigureTopic creates or modifies a topic configuration