You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							312 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							312 lines
						
					
					
						
							11 KiB
						
					
					
				| package broker | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"strings" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | |
| ) | |
| 
 | |
| // LookupTopicBrokers returns the brokers that are serving the topic | |
| func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { | |
| 	if !b.isLockOwner() { | |
| 		proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { | |
| 			resp, err = client.LookupTopicBrokers(ctx, request) | |
| 			return nil | |
| 		}) | |
| 		if proxyErr != nil { | |
| 			return nil, proxyErr | |
| 		} | |
| 		return resp, err | |
| 	} | |
| 
 | |
| 	t := topic.FromPbTopic(request.Topic) | |
| 	ret := &mq_pb.LookupTopicBrokersResponse{} | |
| 	conf := &mq_pb.ConfigureTopicResponse{} | |
| 	ret.Topic = request.Topic | |
| 	if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil { | |
| 		glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) | |
| 	} else { | |
| 		err = b.ensureTopicActiveAssignments(t, conf) | |
| 		ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments | |
| 	} | |
| 
 | |
| 	return ret, err | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { | |
| 	if !b.isLockOwner() { | |
| 		proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { | |
| 			resp, err = client.ListTopics(ctx, request) | |
| 			return nil | |
| 		}) | |
| 		if proxyErr != nil { | |
| 			return nil, proxyErr | |
| 		} | |
| 		return resp, err | |
| 	} | |
| 
 | |
| 	ret := &mq_pb.ListTopicsResponse{} | |
| 
 | |
| 	// Scan the filer directory structure to find all topics | |
| 	err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		// List all namespaces under /topics | |
| 		stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ | |
| 			Directory: filer.TopicsDir, | |
| 			Limit:     1000, | |
| 		}) | |
| 		if err != nil { | |
| 			glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err) | |
| 			return err | |
| 		} | |
| 
 | |
| 		// Process each namespace | |
| 		for { | |
| 			resp, err := stream.Recv() | |
| 			if err != nil { | |
| 				if err.Error() == "EOF" { | |
| 					break | |
| 				} | |
| 				return err | |
| 			} | |
| 
 | |
| 			if !resp.Entry.IsDirectory { | |
| 				continue | |
| 			} | |
| 
 | |
| 			namespaceName := resp.Entry.Name | |
| 			namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName) | |
| 
 | |
| 			// List all topics in this namespace | |
| 			topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ | |
| 				Directory: namespacePath, | |
| 				Limit:     1000, | |
| 			}) | |
| 			if err != nil { | |
| 				glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err) | |
| 				continue | |
| 			} | |
| 
 | |
| 			// Process each topic in the namespace | |
| 			for { | |
| 				topicResp, err := topicStream.Recv() | |
| 				if err != nil { | |
| 					if err.Error() == "EOF" { | |
| 						break | |
| 					} | |
| 					glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err) | |
| 					break | |
| 				} | |
| 
 | |
| 				if !topicResp.Entry.IsDirectory { | |
| 					continue | |
| 				} | |
| 
 | |
| 				topicName := topicResp.Entry.Name | |
| 
 | |
| 				// Check if topic.conf exists | |
| 				topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName) | |
| 				confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ | |
| 					Directory: topicPath, | |
| 					Name:      filer.TopicConfFile, | |
| 				}) | |
| 				if err != nil { | |
| 					glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err) | |
| 					continue | |
| 				} | |
| 
 | |
| 				if confResp.Entry != nil { | |
| 					// This is a valid topic | |
| 					topic := &schema_pb.Topic{ | |
| 						Namespace: namespaceName, | |
| 						Name:      topicName, | |
| 					} | |
| 					ret.Topics = append(ret.Topics, topic) | |
| 				} | |
| 			} | |
| 		} | |
| 
 | |
| 		return nil | |
| 	}) | |
| 
 | |
| 	if err != nil { | |
| 		glog.V(0).Infof("list topics from filer: %v", err) | |
| 		// Return empty response on error | |
| 		return &mq_pb.ListTopicsResponse{}, nil | |
| 	} | |
| 
 | |
| 	return ret, nil | |
| } | |
| 
 | |
| // GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments | |
| func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) { | |
| 	if !b.isLockOwner() { | |
| 		proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { | |
| 			resp, err = client.GetTopicConfiguration(ctx, request) | |
| 			return nil | |
| 		}) | |
| 		if proxyErr != nil { | |
| 			return nil, proxyErr | |
| 		} | |
| 		return resp, err | |
| 	} | |
| 
 | |
| 	t := topic.FromPbTopic(request.Topic) | |
| 	var conf *mq_pb.ConfigureTopicResponse | |
| 	var createdAtNs, modifiedAtNs int64 | |
| 
 | |
| 	if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil { | |
| 		glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err) | |
| 		return nil, fmt.Errorf("failed to read topic configuration: %v", err) | |
| 	} | |
| 
 | |
| 	// Ensure topic assignments are active | |
| 	err = b.ensureTopicActiveAssignments(t, conf) | |
| 	if err != nil { | |
| 		glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err) | |
| 		return nil, fmt.Errorf("failed to ensure topic assignments: %v", err) | |
| 	} | |
| 
 | |
| 	// Build the response with complete configuration including metadata | |
| 	ret := &mq_pb.GetTopicConfigurationResponse{ | |
| 		Topic:                      request.Topic, | |
| 		PartitionCount:             int32(len(conf.BrokerPartitionAssignments)), | |
| 		RecordType:                 conf.RecordType, | |
| 		BrokerPartitionAssignments: conf.BrokerPartitionAssignments, | |
| 		CreatedAtNs:                createdAtNs, | |
| 		LastUpdatedNs:              modifiedAtNs, | |
| 		Retention:                  conf.Retention, | |
| 	} | |
| 
 | |
| 	return ret, nil | |
| } | |
| 
 | |
| // GetTopicPublishers returns the active publishers for a topic | |
| func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) { | |
| 	if !b.isLockOwner() { | |
| 		proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { | |
| 			resp, err = client.GetTopicPublishers(ctx, request) | |
| 			return nil | |
| 		}) | |
| 		if proxyErr != nil { | |
| 			return nil, proxyErr | |
| 		} | |
| 		return resp, err | |
| 	} | |
| 
 | |
| 	t := topic.FromPbTopic(request.Topic) | |
| 	var publishers []*mq_pb.TopicPublisher | |
| 
 | |
| 	// Get topic configuration to find partition assignments | |
| 	var conf *mq_pb.ConfigureTopicResponse | |
| 	if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil { | |
| 		glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err) | |
| 		return nil, fmt.Errorf("failed to read topic configuration: %v", err) | |
| 	} | |
| 
 | |
| 	// Collect publishers from each partition that is hosted on this broker | |
| 	for _, assignment := range conf.BrokerPartitionAssignments { | |
| 		// Only collect from partitions where this broker is the leader | |
| 		if assignment.LeaderBroker == b.option.BrokerAddress().String() { | |
| 			partition := topic.FromPbPartition(assignment.Partition) | |
| 			if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil { | |
| 				// Get publisher information from local partition | |
| 				localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) { | |
| 					connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps() | |
| 					lastPublishedOffset, lastAckedOffset := publisher.GetOffsets() | |
| 					publishers = append(publishers, &mq_pb.TopicPublisher{ | |
| 						PublisherName:       clientName, | |
| 						ClientId:            clientName, // For now, client name is used as client ID | |
| 						Partition:           assignment.Partition, | |
| 						ConnectTimeNs:       connectTimeNs, | |
| 						LastSeenTimeNs:      lastSeenTimeNs, | |
| 						Broker:              assignment.LeaderBroker, | |
| 						IsActive:            true, | |
| 						LastPublishedOffset: lastPublishedOffset, | |
| 						LastAckedOffset:     lastAckedOffset, | |
| 					}) | |
| 				}) | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	return &mq_pb.GetTopicPublishersResponse{ | |
| 		Publishers: publishers, | |
| 	}, nil | |
| } | |
| 
 | |
| // GetTopicSubscribers returns the active subscribers for a topic | |
| func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) { | |
| 	if !b.isLockOwner() { | |
| 		proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { | |
| 			resp, err = client.GetTopicSubscribers(ctx, request) | |
| 			return nil | |
| 		}) | |
| 		if proxyErr != nil { | |
| 			return nil, proxyErr | |
| 		} | |
| 		return resp, err | |
| 	} | |
| 
 | |
| 	t := topic.FromPbTopic(request.Topic) | |
| 	var subscribers []*mq_pb.TopicSubscriber | |
| 
 | |
| 	// Get topic configuration to find partition assignments | |
| 	var conf *mq_pb.ConfigureTopicResponse | |
| 	if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil { | |
| 		glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err) | |
| 		return nil, fmt.Errorf("failed to read topic configuration: %v", err) | |
| 	} | |
| 
 | |
| 	// Collect subscribers from each partition that is hosted on this broker | |
| 	for _, assignment := range conf.BrokerPartitionAssignments { | |
| 		// Only collect from partitions where this broker is the leader | |
| 		if assignment.LeaderBroker == b.option.BrokerAddress().String() { | |
| 			partition := topic.FromPbPartition(assignment.Partition) | |
| 			if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil { | |
| 				// Get subscriber information from local partition | |
| 				localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) { | |
| 					// Parse client name to extract consumer group and consumer ID | |
| 					// Format is typically: "consumerGroup/consumerID" | |
| 					consumerGroup := "default" | |
| 					consumerID := clientName | |
| 					if idx := strings.Index(clientName, "/"); idx != -1 { | |
| 						consumerGroup = clientName[:idx] | |
| 						consumerID = clientName[idx+1:] | |
| 					} | |
| 
 | |
| 					connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps() | |
| 					lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets() | |
| 
 | |
| 					subscribers = append(subscribers, &mq_pb.TopicSubscriber{ | |
| 						ConsumerGroup:      consumerGroup, | |
| 						ConsumerId:         consumerID, | |
| 						ClientId:           clientName, // Full client name as client ID | |
| 						Partition:          assignment.Partition, | |
| 						ConnectTimeNs:      connectTimeNs, | |
| 						LastSeenTimeNs:     lastSeenTimeNs, | |
| 						Broker:             assignment.LeaderBroker, | |
| 						IsActive:           true, | |
| 						CurrentOffset:      lastAckedOffset, // for compatibility | |
| 						LastReceivedOffset: lastReceivedOffset, | |
| 					}) | |
| 				}) | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	return &mq_pb.GetTopicSubscribersResponse{ | |
| 		Subscribers: subscribers, | |
| 	}, nil | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) isLockOwner() bool { | |
| 	return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String() | |
| }
 |