You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							312 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							312 lines
						
					
					
						
							11 KiB
						
					
					
				
								package broker
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"strings"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// LookupTopicBrokers returns the brokers that are serving the topic
							 | 
						|
								func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
							 | 
						|
									if !b.isLockOwner() {
							 | 
						|
										proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
											resp, err = client.LookupTopicBrokers(ctx, request)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if proxyErr != nil {
							 | 
						|
											return nil, proxyErr
							 | 
						|
										}
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t := topic.FromPbTopic(request.Topic)
							 | 
						|
									ret := &mq_pb.LookupTopicBrokersResponse{}
							 | 
						|
									conf := &mq_pb.ConfigureTopicResponse{}
							 | 
						|
									ret.Topic = request.Topic
							 | 
						|
									if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
							 | 
						|
										glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
							 | 
						|
									} else {
							 | 
						|
										err = b.ensureTopicActiveAssignments(t, conf)
							 | 
						|
										ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return ret, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
							 | 
						|
									if !b.isLockOwner() {
							 | 
						|
										proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
											resp, err = client.ListTopics(ctx, request)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if proxyErr != nil {
							 | 
						|
											return nil, proxyErr
							 | 
						|
										}
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									ret := &mq_pb.ListTopicsResponse{}
							 | 
						|
								
							 | 
						|
									// Scan the filer directory structure to find all topics
							 | 
						|
									err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
										// List all namespaces under /topics
							 | 
						|
										stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
							 | 
						|
											Directory: filer.TopicsDir,
							 | 
						|
											Limit:     1000,
							 | 
						|
										})
							 | 
						|
										if err != nil {
							 | 
						|
											glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Process each namespace
							 | 
						|
										for {
							 | 
						|
											resp, err := stream.Recv()
							 | 
						|
											if err != nil {
							 | 
						|
												if err.Error() == "EOF" {
							 | 
						|
													break
							 | 
						|
												}
							 | 
						|
												return err
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											if !resp.Entry.IsDirectory {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											namespaceName := resp.Entry.Name
							 | 
						|
											namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
							 | 
						|
								
							 | 
						|
											// List all topics in this namespace
							 | 
						|
											topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
							 | 
						|
												Directory: namespacePath,
							 | 
						|
												Limit:     1000,
							 | 
						|
											})
							 | 
						|
											if err != nil {
							 | 
						|
												glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err)
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Process each topic in the namespace
							 | 
						|
											for {
							 | 
						|
												topicResp, err := topicStream.Recv()
							 | 
						|
												if err != nil {
							 | 
						|
													if err.Error() == "EOF" {
							 | 
						|
														break
							 | 
						|
													}
							 | 
						|
													glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err)
							 | 
						|
													break
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												if !topicResp.Entry.IsDirectory {
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												topicName := topicResp.Entry.Name
							 | 
						|
								
							 | 
						|
												// Check if topic.conf exists
							 | 
						|
												topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
							 | 
						|
												confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
							 | 
						|
													Directory: topicPath,
							 | 
						|
													Name:      filer.TopicConfFile,
							 | 
						|
												})
							 | 
						|
												if err != nil {
							 | 
						|
													glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err)
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
								
							 | 
						|
												if confResp.Entry != nil {
							 | 
						|
													// This is a valid topic
							 | 
						|
													topic := &schema_pb.Topic{
							 | 
						|
														Namespace: namespaceName,
							 | 
						|
														Name:      topicName,
							 | 
						|
													}
							 | 
						|
													ret.Topics = append(ret.Topics, topic)
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return nil
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										glog.V(0).Infof("list topics from filer: %v", err)
							 | 
						|
										// Return empty response on error
							 | 
						|
										return &mq_pb.ListTopicsResponse{}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return ret, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
							 | 
						|
								func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
							 | 
						|
									if !b.isLockOwner() {
							 | 
						|
										proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
											resp, err = client.GetTopicConfiguration(ctx, request)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if proxyErr != nil {
							 | 
						|
											return nil, proxyErr
							 | 
						|
										}
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t := topic.FromPbTopic(request.Topic)
							 | 
						|
									var conf *mq_pb.ConfigureTopicResponse
							 | 
						|
									var createdAtNs, modifiedAtNs int64
							 | 
						|
								
							 | 
						|
									if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
							 | 
						|
										glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
							 | 
						|
										return nil, fmt.Errorf("failed to read topic configuration: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Ensure topic assignments are active
							 | 
						|
									err = b.ensureTopicActiveAssignments(t, conf)
							 | 
						|
									if err != nil {
							 | 
						|
										glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
							 | 
						|
										return nil, fmt.Errorf("failed to ensure topic assignments: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Build the response with complete configuration including metadata
							 | 
						|
									ret := &mq_pb.GetTopicConfigurationResponse{
							 | 
						|
										Topic:                      request.Topic,
							 | 
						|
										PartitionCount:             int32(len(conf.BrokerPartitionAssignments)),
							 | 
						|
										RecordType:                 conf.RecordType,
							 | 
						|
										BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
							 | 
						|
										CreatedAtNs:                createdAtNs,
							 | 
						|
										LastUpdatedNs:              modifiedAtNs,
							 | 
						|
										Retention:                  conf.Retention,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return ret, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetTopicPublishers returns the active publishers for a topic
							 | 
						|
								func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) {
							 | 
						|
									if !b.isLockOwner() {
							 | 
						|
										proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
											resp, err = client.GetTopicPublishers(ctx, request)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if proxyErr != nil {
							 | 
						|
											return nil, proxyErr
							 | 
						|
										}
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t := topic.FromPbTopic(request.Topic)
							 | 
						|
									var publishers []*mq_pb.TopicPublisher
							 | 
						|
								
							 | 
						|
									// Get topic configuration to find partition assignments
							 | 
						|
									var conf *mq_pb.ConfigureTopicResponse
							 | 
						|
									if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
							 | 
						|
										glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
							 | 
						|
										return nil, fmt.Errorf("failed to read topic configuration: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Collect publishers from each partition that is hosted on this broker
							 | 
						|
									for _, assignment := range conf.BrokerPartitionAssignments {
							 | 
						|
										// Only collect from partitions where this broker is the leader
							 | 
						|
										if assignment.LeaderBroker == b.option.BrokerAddress().String() {
							 | 
						|
											partition := topic.FromPbPartition(assignment.Partition)
							 | 
						|
											if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
							 | 
						|
												// Get publisher information from local partition
							 | 
						|
												localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) {
							 | 
						|
													connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps()
							 | 
						|
													lastPublishedOffset, lastAckedOffset := publisher.GetOffsets()
							 | 
						|
													publishers = append(publishers, &mq_pb.TopicPublisher{
							 | 
						|
														PublisherName:       clientName,
							 | 
						|
														ClientId:            clientName, // For now, client name is used as client ID
							 | 
						|
														Partition:           assignment.Partition,
							 | 
						|
														ConnectTimeNs:       connectTimeNs,
							 | 
						|
														LastSeenTimeNs:      lastSeenTimeNs,
							 | 
						|
														Broker:              assignment.LeaderBroker,
							 | 
						|
														IsActive:            true,
							 | 
						|
														LastPublishedOffset: lastPublishedOffset,
							 | 
						|
														LastAckedOffset:     lastAckedOffset,
							 | 
						|
													})
							 | 
						|
												})
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &mq_pb.GetTopicPublishersResponse{
							 | 
						|
										Publishers: publishers,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetTopicSubscribers returns the active subscribers for a topic
							 | 
						|
								func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) {
							 | 
						|
									if !b.isLockOwner() {
							 | 
						|
										proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
							 | 
						|
											resp, err = client.GetTopicSubscribers(ctx, request)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if proxyErr != nil {
							 | 
						|
											return nil, proxyErr
							 | 
						|
										}
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t := topic.FromPbTopic(request.Topic)
							 | 
						|
									var subscribers []*mq_pb.TopicSubscriber
							 | 
						|
								
							 | 
						|
									// Get topic configuration to find partition assignments
							 | 
						|
									var conf *mq_pb.ConfigureTopicResponse
							 | 
						|
									if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
							 | 
						|
										glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
							 | 
						|
										return nil, fmt.Errorf("failed to read topic configuration: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Collect subscribers from each partition that is hosted on this broker
							 | 
						|
									for _, assignment := range conf.BrokerPartitionAssignments {
							 | 
						|
										// Only collect from partitions where this broker is the leader
							 | 
						|
										if assignment.LeaderBroker == b.option.BrokerAddress().String() {
							 | 
						|
											partition := topic.FromPbPartition(assignment.Partition)
							 | 
						|
											if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
							 | 
						|
												// Get subscriber information from local partition
							 | 
						|
												localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) {
							 | 
						|
													// Parse client name to extract consumer group and consumer ID
							 | 
						|
													// Format is typically: "consumerGroup/consumerID"
							 | 
						|
													consumerGroup := "default"
							 | 
						|
													consumerID := clientName
							 | 
						|
													if idx := strings.Index(clientName, "/"); idx != -1 {
							 | 
						|
														consumerGroup = clientName[:idx]
							 | 
						|
														consumerID = clientName[idx+1:]
							 | 
						|
													}
							 | 
						|
								
							 | 
						|
													connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps()
							 | 
						|
													lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets()
							 | 
						|
								
							 | 
						|
													subscribers = append(subscribers, &mq_pb.TopicSubscriber{
							 | 
						|
														ConsumerGroup:      consumerGroup,
							 | 
						|
														ConsumerId:         consumerID,
							 | 
						|
														ClientId:           clientName, // Full client name as client ID
							 | 
						|
														Partition:          assignment.Partition,
							 | 
						|
														ConnectTimeNs:      connectTimeNs,
							 | 
						|
														LastSeenTimeNs:     lastSeenTimeNs,
							 | 
						|
														Broker:             assignment.LeaderBroker,
							 | 
						|
														IsActive:           true,
							 | 
						|
														CurrentOffset:      lastAckedOffset, // for compatibility
							 | 
						|
														LastReceivedOffset: lastReceivedOffset,
							 | 
						|
													})
							 | 
						|
												})
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &mq_pb.GetTopicSubscribersResponse{
							 | 
						|
										Subscribers: subscribers,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (b *MessageQueueBroker) isLockOwner() bool {
							 | 
						|
									return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
							 | 
						|
								}
							 |