|
|
@ -9,6 +9,7 @@ import ( |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
@ -17,6 +18,7 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
|
"google.golang.org/grpc" |
|
|
|
"google.golang.org/grpc/credentials/insecure" |
|
|
|
jsonpb "google.golang.org/protobuf/encoding/protojson" |
|
|
|
) |
|
|
|
|
|
|
|
// BrokerClient handles communication with SeaweedFS MQ broker
|
|
|
@ -268,53 +270,59 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri |
|
|
|
} |
|
|
|
|
|
|
|
// GetTopicSchema retrieves schema information for a specific topic
|
|
|
|
// Assumption: Topic metadata includes schema information
|
|
|
|
// Reads the actual schema from topic configuration stored in filer
|
|
|
|
func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) { |
|
|
|
if err := c.findBrokerBalancer(); err != nil { |
|
|
|
return nil, err |
|
|
|
// Get filer client to read topic configuration
|
|
|
|
filerClient, err := c.GetFilerClient() |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to get filer client: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// TODO: Implement proper schema retrieval
|
|
|
|
// This might be part of LookupTopicBrokers or a dedicated GetTopicSchema method
|
|
|
|
var recordType *schema_pb.RecordType |
|
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
// Read topic.conf file from /topics/{namespace}/{topic}/topic.conf
|
|
|
|
topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName) |
|
|
|
|
|
|
|
conn, err := grpc.Dial(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to connect to broker at %s: %v", c.brokerAddress, err) |
|
|
|
} |
|
|
|
defer conn.Close() |
|
|
|
// First check if topic directory exists
|
|
|
|
_, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ |
|
|
|
Directory: topicDir, |
|
|
|
Name: "topic.conf", |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("topic %s.%s not found: %v", namespace, topicName, err) |
|
|
|
} |
|
|
|
|
|
|
|
client := mq_pb.NewSeaweedMessagingClient(conn) |
|
|
|
// Read the topic.conf file content
|
|
|
|
data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to read topic.conf for %s.%s: %v", namespace, topicName, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Use LookupTopicBrokers to get topic information
|
|
|
|
resp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{ |
|
|
|
Topic: &schema_pb.Topic{ |
|
|
|
Namespace: namespace, |
|
|
|
Name: topicName, |
|
|
|
}, |
|
|
|
// Parse the configuration
|
|
|
|
conf := &mq_pb.ConfigureTopicResponse{} |
|
|
|
if err = jsonpb.Unmarshal(data, conf); err != nil { |
|
|
|
return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Extract the record type (schema)
|
|
|
|
if conf.RecordType != nil { |
|
|
|
recordType = conf.RecordType |
|
|
|
} else { |
|
|
|
return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to lookup topic %s.%s: %v", namespace, topicName, err) |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
// TODO: Extract schema from topic metadata
|
|
|
|
// For now, return a placeholder schema
|
|
|
|
if len(resp.BrokerPartitionAssignments) == 0 { |
|
|
|
return nil, fmt.Errorf("topic %s.%s not found", namespace, topicName) |
|
|
|
if recordType == nil { |
|
|
|
return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName) |
|
|
|
} |
|
|
|
|
|
|
|
// Placeholder schema - real implementation would extract from topic metadata
|
|
|
|
return &schema_pb.RecordType{ |
|
|
|
Fields: []*schema_pb.Field{ |
|
|
|
{ |
|
|
|
Name: "timestamp", |
|
|
|
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, |
|
|
|
}, |
|
|
|
{ |
|
|
|
Name: "data", |
|
|
|
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, |
|
|
|
}, |
|
|
|
}, |
|
|
|
}, nil |
|
|
|
return recordType, nil |
|
|
|
} |
|
|
|
|
|
|
|
// ConfigureTopic creates or modifies a topic configuration
|
|
|
|