You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

615 lines
19 KiB

package dash
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// GetTopics retrieves message queue topics data
func (s *AdminServer) GetTopics() (*TopicsData, error) {
var topics []TopicInfo
// Find broker leader and get topics
brokerLeader, err := s.findBrokerLeader()
if err != nil {
// If no broker leader found, return empty data
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
}, nil
}
// Connect to broker leader and list topics
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
if err != nil {
return err
}
// Convert protobuf topics to TopicInfo - only include available data
for _, pbTopic := range resp.Topics {
topicInfo := TopicInfo{
Name: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
Partitions: 0, // Will be populated by LookupTopicBrokers call
Retention: TopicRetentionInfo{
Enabled: false,
DisplayValue: 0,
DisplayUnit: "days",
},
}
// Get topic configuration to get partition count and retention info
lookupResp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{
Topic: pbTopic,
})
if err == nil {
topicInfo.Partitions = len(lookupResp.BrokerPartitionAssignments)
}
// Get topic configuration for retention information
configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
Topic: pbTopic,
})
if err == nil && configResp.Retention != nil {
topicInfo.Retention = convertTopicRetention(configResp.Retention)
}
topics = append(topics, topicInfo)
}
return nil
})
if err != nil {
// If connection fails, return empty data
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
}, nil
}
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
// Don't include TotalMessages and TotalSize as they're not available
}, nil
}
// GetSubscribers retrieves message queue subscribers data
func (s *AdminServer) GetSubscribers() (*SubscribersData, error) {
var subscribers []SubscriberInfo
// Find broker leader and get subscriber info from broker stats
brokerLeader, err := s.findBrokerLeader()
if err != nil {
// If no broker leader found, return empty data
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: 0,
LastUpdated: time.Now(),
}, nil
}
// Connect to broker leader and get subscriber information
// Note: SeaweedMQ doesn't have a direct API to list all subscribers
// We would need to collect this information from broker statistics
// For now, return empty data structure as subscriber info is not
// directly available through the current MQ API
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
// TODO: Implement subscriber data collection from broker statistics
// This would require access to broker internal statistics about
// active subscribers, consumer groups, etc.
return nil
})
if err != nil {
// If connection fails, return empty data
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: 0,
LastUpdated: time.Now(),
}, nil
}
activeCount := 0
for _, sub := range subscribers {
if sub.Status == "active" {
activeCount++
}
}
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: activeCount,
LastUpdated: time.Now(),
}, nil
}
// GetTopicDetails retrieves detailed information about a specific topic
func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetailsData, error) {
// Find broker leader
brokerLeader, err := s.findBrokerLeader()
if err != nil {
return nil, fmt.Errorf("failed to find broker leader: %v", err)
}
var topicDetails *TopicDetailsData
// Connect to broker leader and get topic configuration
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Get topic configuration using the new API
configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
return fmt.Errorf("failed to get topic configuration: %v", err)
}
// Initialize topic details
topicDetails = &TopicDetailsData{
TopicName: fmt.Sprintf("%s.%s", namespace, topicName),
Namespace: namespace,
Name: topicName,
Partitions: []PartitionInfo{},
Schema: []SchemaFieldInfo{},
Publishers: []PublisherInfo{},
Subscribers: []TopicSubscriberInfo{},
ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
Retention: convertTopicRetention(configResp.Retention),
CreatedAt: time.Unix(0, configResp.CreatedAtNs),
LastUpdated: time.Unix(0, configResp.LastUpdatedNs),
}
// Set current time if timestamps are not available
if configResp.CreatedAtNs == 0 {
topicDetails.CreatedAt = time.Now()
}
if configResp.LastUpdatedNs == 0 {
topicDetails.LastUpdated = time.Now()
}
// Process partitions
for _, assignment := range configResp.BrokerPartitionAssignments {
if assignment.Partition != nil {
partitionInfo := PartitionInfo{
ID: assignment.Partition.RangeStart,
LeaderBroker: assignment.LeaderBroker,
FollowerBroker: assignment.FollowerBroker,
MessageCount: 0, // Will be enhanced later with actual stats
TotalSize: 0, // Will be enhanced later with actual stats
LastDataTime: time.Time{}, // Will be enhanced later
CreatedAt: time.Now(),
}
topicDetails.Partitions = append(topicDetails.Partitions, partitionInfo)
}
}
// Process schema from RecordType
if configResp.RecordType != nil {
topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
}
// Get publishers information
publishersResp, err := client.GetTopicPublishers(ctx, &mq_pb.GetTopicPublishersRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get topic publishers for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d publishers for topic %s.%s", len(publishersResp.Publishers), namespace, topicName)
topicDetails.Publishers = convertTopicPublishers(publishersResp.Publishers)
}
// Get subscribers information
subscribersResp, err := client.GetTopicSubscribers(ctx, &mq_pb.GetTopicSubscribersRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get topic subscribers for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d subscribers for topic %s.%s", len(subscribersResp.Subscribers), namespace, topicName)
topicDetails.Subscribers = convertTopicSubscribers(subscribersResp.Subscribers)
}
return nil
})
if err != nil {
return nil, err
}
// Get consumer group offsets from the filer
offsets, err := s.GetConsumerGroupOffsets(namespace, topicName)
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get consumer group offsets for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d consumer group offsets for topic %s.%s", len(offsets), namespace, topicName)
topicDetails.ConsumerGroupOffsets = offsets
}
return topicDetails, nil
}
// GetConsumerGroupOffsets retrieves consumer group offsets for a topic from the filer
func (s *AdminServer) GetConsumerGroupOffsets(namespace, topicName string) ([]ConsumerGroupOffsetInfo, error) {
var offsets []ConsumerGroupOffsetInfo
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Get the topic directory: /topics/namespace/topicName
topicObj := topic.NewTopic(namespace, topicName)
topicDir := topicObj.Dir()
// List all version directories under the topic directory (e.g., v2025-07-10-05-44-34)
versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: topicDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
}
// Process each version directory
for {
versionResp, err := versionStream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to receive version entries: %v", err)
}
// Only process directories that are versions (start with "v")
if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
versionDir := filepath.Join(topicDir, versionResp.Entry.Name)
// List all partition directories under the version directory (e.g., 0315-0630)
partitionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: versionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, err)
continue
}
// Process each partition directory
for {
partitionResp, err := partitionStream.Recv()
if err != nil {
if err == io.EOF {
break
}
glog.Warningf("Failed to receive partition entries: %v", err)
break
}
// Only process directories that are partitions (format: NNNN-NNNN)
if partitionResp.Entry.IsDirectory {
// Parse partition range to get partition start ID (e.g., "0315-0630" -> 315)
var partitionStart, partitionStop int32
if n, err := fmt.Sscanf(partitionResp.Entry.Name, "%04d-%04d", &partitionStart, &partitionStop); n != 2 || err != nil {
// Skip directories that don't match the partition format
continue
}
partitionDir := filepath.Join(versionDir, partitionResp.Entry.Name)
// List all .offset files in this partition directory
offsetStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: partitionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
glog.Warningf("Failed to list partition directory %s: %v", partitionDir, err)
continue
}
// Process each offset file
for {
offsetResp, err := offsetStream.Recv()
if err != nil {
if err == io.EOF {
break
}
glog.Warningf("Failed to receive offset entries: %v", err)
break
}
// Only process .offset files
if !offsetResp.Entry.IsDirectory && strings.HasSuffix(offsetResp.Entry.Name, ".offset") {
consumerGroup := strings.TrimSuffix(offsetResp.Entry.Name, ".offset")
// Read the offset value from the file
offsetData, err := filer.ReadInsideFiler(client, partitionDir, offsetResp.Entry.Name)
if err != nil {
glog.Warningf("Failed to read offset file %s: %v", offsetResp.Entry.Name, err)
continue
}
if len(offsetData) == 8 {
offset := int64(util.BytesToUint64(offsetData))
// Get the file modification time
lastUpdated := time.Unix(offsetResp.Entry.Attributes.Mtime, 0)
offsets = append(offsets, ConsumerGroupOffsetInfo{
ConsumerGroup: consumerGroup,
PartitionID: partitionStart, // Use partition start as the ID
Offset: offset,
LastUpdated: lastUpdated,
})
}
}
}
}
}
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to get consumer group offsets: %v", err)
}
return offsets, nil
}
// convertRecordTypeToSchemaFields converts a protobuf RecordType to SchemaFieldInfo slice
func convertRecordTypeToSchemaFields(recordType *schema_pb.RecordType) []SchemaFieldInfo {
var schemaFields []SchemaFieldInfo
if recordType == nil || recordType.Fields == nil {
return schemaFields
}
for _, field := range recordType.Fields {
schemaField := SchemaFieldInfo{
Name: field.Name,
Type: getFieldTypeString(field.Type),
Required: field.IsRequired,
}
schemaFields = append(schemaFields, schemaField)
}
return schemaFields
}
// getFieldTypeString converts a protobuf Type to a human-readable string
func getFieldTypeString(fieldType *schema_pb.Type) string {
if fieldType == nil {
return "unknown"
}
switch kind := fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
return getScalarTypeString(kind.ScalarType)
case *schema_pb.Type_RecordType:
return "record"
case *schema_pb.Type_ListType:
elementType := getFieldTypeString(kind.ListType.ElementType)
return fmt.Sprintf("list<%s>", elementType)
default:
return "unknown"
}
}
// getScalarTypeString converts a protobuf ScalarType to a string
func getScalarTypeString(scalarType schema_pb.ScalarType) string {
switch scalarType {
case schema_pb.ScalarType_BOOL:
return "bool"
case schema_pb.ScalarType_INT32:
return "int32"
case schema_pb.ScalarType_INT64:
return "int64"
case schema_pb.ScalarType_FLOAT:
return "float"
case schema_pb.ScalarType_DOUBLE:
return "double"
case schema_pb.ScalarType_BYTES:
return "bytes"
case schema_pb.ScalarType_STRING:
return "string"
default:
return "unknown"
}
}
// convertTopicPublishers converts protobuf TopicPublisher slice to PublisherInfo slice
func convertTopicPublishers(publishers []*mq_pb.TopicPublisher) []PublisherInfo {
publisherInfos := make([]PublisherInfo, 0, len(publishers))
for _, publisher := range publishers {
publisherInfo := PublisherInfo{
PublisherName: publisher.PublisherName,
ClientID: publisher.ClientId,
PartitionID: publisher.Partition.RangeStart,
Broker: publisher.Broker,
IsActive: publisher.IsActive,
LastPublishedOffset: publisher.LastPublishedOffset,
LastAckedOffset: publisher.LastAckedOffset,
}
// Convert timestamps
if publisher.ConnectTimeNs > 0 {
publisherInfo.ConnectTime = time.Unix(0, publisher.ConnectTimeNs)
}
if publisher.LastSeenTimeNs > 0 {
publisherInfo.LastSeenTime = time.Unix(0, publisher.LastSeenTimeNs)
}
publisherInfos = append(publisherInfos, publisherInfo)
}
return publisherInfos
}
// convertTopicSubscribers converts protobuf TopicSubscriber slice to TopicSubscriberInfo slice
func convertTopicSubscribers(subscribers []*mq_pb.TopicSubscriber) []TopicSubscriberInfo {
subscriberInfos := make([]TopicSubscriberInfo, 0, len(subscribers))
for _, subscriber := range subscribers {
subscriberInfo := TopicSubscriberInfo{
ConsumerGroup: subscriber.ConsumerGroup,
ConsumerID: subscriber.ConsumerId,
ClientID: subscriber.ClientId,
PartitionID: subscriber.Partition.RangeStart,
Broker: subscriber.Broker,
IsActive: subscriber.IsActive,
CurrentOffset: subscriber.CurrentOffset,
LastReceivedOffset: subscriber.LastReceivedOffset,
}
// Convert timestamps
if subscriber.ConnectTimeNs > 0 {
subscriberInfo.ConnectTime = time.Unix(0, subscriber.ConnectTimeNs)
}
if subscriber.LastSeenTimeNs > 0 {
subscriberInfo.LastSeenTime = time.Unix(0, subscriber.LastSeenTimeNs)
}
subscriberInfos = append(subscriberInfos, subscriberInfo)
}
return subscriberInfos
}
// findBrokerLeader finds the current broker leader
func (s *AdminServer) findBrokerLeader() (string, error) {
// First, try to find any broker from the cluster
var brokers []string
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.BrokerType,
})
if err != nil {
return err
}
for _, node := range resp.ClusterNodes {
brokers = append(brokers, node.Address)
}
return nil
})
if err != nil {
return "", fmt.Errorf("failed to list brokers: %v", err)
}
if len(brokers) == 0 {
return "", fmt.Errorf("no brokers found in cluster")
}
// Try each broker to find the leader
for _, brokerAddr := range brokers {
err := s.withBrokerClient(brokerAddr, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// Try to find broker leader
_, err := client.FindBrokerLeader(ctx, &mq_pb.FindBrokerLeaderRequest{
FilerGroup: "",
})
if err == nil {
return nil // This broker is the leader
}
return err
})
if err == nil {
return brokerAddr, nil
}
}
return "", fmt.Errorf("no broker leader found")
}
// withBrokerClient connects to a message queue broker and executes a function
func (s *AdminServer) withBrokerClient(brokerAddress string, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return pb.WithBrokerGrpcClient(false, brokerAddress, s.grpcDialOption, fn)
}
// convertTopicRetention converts protobuf retention to TopicRetentionInfo
func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
if retention == nil || !retention.Enabled {
return TopicRetentionInfo{
Enabled: false,
RetentionSeconds: 0,
DisplayValue: 0,
DisplayUnit: "days",
}
}
// Convert seconds to human-readable format
seconds := retention.RetentionSeconds
var displayValue int32
var displayUnit string
if seconds >= 86400 { // >= 1 day
displayValue = int32(seconds / 86400)
displayUnit = "days"
} else if seconds >= 3600 { // >= 1 hour
displayValue = int32(seconds / 3600)
displayUnit = "hours"
} else {
displayValue = int32(seconds)
displayUnit = "seconds"
}
return TopicRetentionInfo{
Enabled: retention.Enabled,
RetentionSeconds: seconds,
DisplayValue: displayValue,
DisplayUnit: displayUnit,
}
}