You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							296 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							296 lines
						
					
					
						
							11 KiB
						
					
					
				
								package broker
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"io"
							 | 
						|
									"math/rand/v2"
							 | 
						|
									"net"
							 | 
						|
									"sync/atomic"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"google.golang.org/grpc/peer"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// PUB
							 | 
						|
								// 1. gRPC API to configure a topic
							 | 
						|
								//    1.1 create a topic with existing partition count
							 | 
						|
								//    1.2 assign partitions to brokers
							 | 
						|
								// 2. gRPC API to lookup topic partitions
							 | 
						|
								// 3. gRPC API to publish by topic partitions
							 | 
						|
								
							 | 
						|
								// SUB
							 | 
						|
								// 1. gRPC API to lookup a topic partitions
							 | 
						|
								
							 | 
						|
								// Re-balance topic partitions for publishing
							 | 
						|
								//   1. collect stats from all the brokers
							 | 
						|
								//   2. Rebalance and configure new generation of partitions on brokers
							 | 
						|
								//   3. Tell brokers to close current gneration of publishing.
							 | 
						|
								// Publishers needs to lookup again and publish to the new generation of partitions.
							 | 
						|
								
							 | 
						|
								// Re-balance topic partitions for subscribing
							 | 
						|
								//   1. collect stats from all the brokers
							 | 
						|
								// Subscribers needs to listen for new partitions and connect to the brokers.
							 | 
						|
								// Each subscription may not get data. It can act as a backup.
							 | 
						|
								
							 | 
						|
								func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
							 | 
						|
								
							 | 
						|
									req, err := stream.Recv()
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
									response := &mq_pb.PublishMessageResponse{}
							 | 
						|
								
							 | 
						|
									initMessage := req.GetInit()
							 | 
						|
									if initMessage == nil {
							 | 
						|
										response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorInvalidRecord, "missing init message")
							 | 
						|
										glog.Errorf("missing init message")
							 | 
						|
										return stream.Send(response)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check whether current broker should be the leader for the topic partition
							 | 
						|
									leaderBroker, err := b.findBrokerForTopicPartition(initMessage.Topic, initMessage.Partition)
							 | 
						|
									if err != nil {
							 | 
						|
										response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorTopicNotFound, fmt.Sprintf("failed to find leader for topic partition: %v", err))
							 | 
						|
										glog.Errorf("failed to find leader for topic partition: %v", err)
							 | 
						|
										return stream.Send(response)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									currentBrokerAddress := fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)
							 | 
						|
									if leaderBroker != currentBrokerAddress {
							 | 
						|
										response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorNotLeaderOrFollower, fmt.Sprintf("not the leader for this partition, leader is: %s", leaderBroker))
							 | 
						|
										glog.V(1).Infof("rejecting publish request: not the leader for partition, leader is: %s", leaderBroker)
							 | 
						|
										return stream.Send(response)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// get or generate a local partition
							 | 
						|
									t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
							 | 
						|
									localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
							 | 
						|
									if getOrGenErr != nil {
							 | 
						|
										response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorTopicNotFound, fmt.Sprintf("topic %v not found: %v", t, getOrGenErr))
							 | 
						|
										glog.Errorf("topic %v not found: %v", t, getOrGenErr)
							 | 
						|
										return stream.Send(response)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// connect to follower brokers
							 | 
						|
									if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
							 | 
						|
										response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorFollowerConnectionFailed, followerErr.Error())
							 | 
						|
										glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
							 | 
						|
										return stream.Send(response)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// process each published messages
							 | 
						|
									clientName := fmt.Sprintf("%v-%4d", findClientAddress(stream.Context()), rand.IntN(10000))
							 | 
						|
									publisher := topic.NewLocalPublisher()
							 | 
						|
									localTopicPartition.Publishers.AddPublisher(clientName, publisher)
							 | 
						|
								
							 | 
						|
									// DISABLED: Periodic ack goroutine not needed with immediate per-message acks
							 | 
						|
									// Immediate acks provide correct offset information for Kafka Gateway
							 | 
						|
									var receivedSequence, acknowledgedSequence int64
							 | 
						|
									var isClosed bool
							 | 
						|
								
							 | 
						|
									if false {
							 | 
						|
										ackInterval := int64(1)
							 | 
						|
										if initMessage.AckInterval > 0 {
							 | 
						|
											ackInterval = int64(initMessage.AckInterval)
							 | 
						|
										}
							 | 
						|
										go func() {
							 | 
						|
											defer func() {
							 | 
						|
												// println("stop sending ack to publisher", initMessage.PublisherName)
							 | 
						|
											}()
							 | 
						|
								
							 | 
						|
											lastAckTime := time.Now()
							 | 
						|
											for !isClosed {
							 | 
						|
												receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
							 | 
						|
												if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 100*time.Millisecond) {
							 | 
						|
													acknowledgedSequence = receivedSequence
							 | 
						|
													response := &mq_pb.PublishMessageResponse{
							 | 
						|
														AckTsNs: acknowledgedSequence,
							 | 
						|
													}
							 | 
						|
													if err := stream.Send(response); err != nil {
							 | 
						|
														glog.Errorf("Error sending response %v: %v", response, err)
							 | 
						|
													}
							 | 
						|
													// Update acknowledged offset for this publisher
							 | 
						|
													publisher.UpdateAckedOffset(acknowledgedSequence)
							 | 
						|
													// println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
							 | 
						|
													lastAckTime = time.Now()
							 | 
						|
												} else {
							 | 
						|
													time.Sleep(10 * time.Millisecond) // Reduced from 1s to 10ms for faster acknowledgments
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									defer func() {
							 | 
						|
										// remove the publisher
							 | 
						|
										localTopicPartition.Publishers.RemovePublisher(clientName)
							 | 
						|
										// Use topic-aware shutdown logic to prevent aggressive removal of system topics
							 | 
						|
										if localTopicPartition.MaybeShutdownLocalPartitionForTopic(t.Name) {
							 | 
						|
											b.localTopicManager.RemoveLocalPartition(t, p)
							 | 
						|
											glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// send a hello message
							 | 
						|
									stream.Send(&mq_pb.PublishMessageResponse{})
							 | 
						|
								
							 | 
						|
									defer func() {
							 | 
						|
										isClosed = true
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// process each published messages
							 | 
						|
									for {
							 | 
						|
										// receive a message
							 | 
						|
										req, err := stream.Recv()
							 | 
						|
										if err != nil {
							 | 
						|
											if err == io.EOF {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Process the received message
							 | 
						|
										dataMessage := req.GetData()
							 | 
						|
										if dataMessage == nil {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Validate RecordValue structure only for schema-based messages
							 | 
						|
										// Note: Only messages sent via ProduceRecordValue should be in RecordValue format
							 | 
						|
										// Regular Kafka messages and offset management messages are stored as raw bytes
							 | 
						|
										if dataMessage.Value != nil {
							 | 
						|
											record := &schema_pb.RecordValue{}
							 | 
						|
											if err := proto.Unmarshal(dataMessage.Value, record); err == nil {
							 | 
						|
												// Successfully unmarshaled as RecordValue - validate structure
							 | 
						|
												if err := b.validateRecordValue(record, initMessage.Topic); err != nil {
							 | 
						|
													glog.V(1).Infof("RecordValue validation failed on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err)
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
											// Note: We don't log errors for non-RecordValue messages since most Kafka messages
							 | 
						|
											// are raw bytes and should not be expected to be in RecordValue format
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// The control message should still be sent to the follower
							 | 
						|
										// to avoid timing issue when ack messages.
							 | 
						|
								
							 | 
						|
										// Send to the local partition with offset assignment
							 | 
						|
										t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
							 | 
						|
								
							 | 
						|
										// Create offset assignment function for this partition
							 | 
						|
										assignOffsetFn := func() (int64, error) {
							 | 
						|
											return b.offsetManager.AssignOffset(t, p)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Use offset-aware publishing
							 | 
						|
										assignedOffset, err := localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn)
							 | 
						|
										if err != nil {
							 | 
						|
											return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// No ForceFlush - subscribers use per-subscriber notification channels for instant wake-up
							 | 
						|
										// Data is served from in-memory LogBuffer with <1ms latency
							 | 
						|
										glog.V(2).Infof("Published offset %d to %s", assignedOffset, initMessage.Topic.Name)
							 | 
						|
								
							 | 
						|
										// Send immediate per-message ack WITH offset
							 | 
						|
										// This is critical for Gateway to return correct offsets to Kafka clients
							 | 
						|
										response := &mq_pb.PublishMessageResponse{
							 | 
						|
											AckTsNs:        dataMessage.TsNs,
							 | 
						|
											AssignedOffset: assignedOffset,
							 | 
						|
										}
							 | 
						|
										if err := stream.Send(response); err != nil {
							 | 
						|
											glog.Errorf("Error sending immediate ack %v: %v", response, err)
							 | 
						|
											return fmt.Errorf("failed to send ack: %v", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Update published offset and last seen time for this publisher
							 | 
						|
										publisher.UpdatePublishedOffset(assignedOffset)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateRecordValue validates the structure and content of a RecordValue message
							 | 
						|
								// Since RecordValue messages are created from successful protobuf unmarshaling,
							 | 
						|
								// their structure is already guaranteed to be valid by the protobuf library.
							 | 
						|
								// Schema validation (if applicable) already happened during Kafka gateway decoding.
							 | 
						|
								func (b *MessageQueueBroker) validateRecordValue(record *schema_pb.RecordValue, topic *schema_pb.Topic) error {
							 | 
						|
									// Check for nil RecordValue
							 | 
						|
									if record == nil {
							 | 
						|
										return fmt.Errorf("RecordValue is nil")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check for nil Fields map
							 | 
						|
									if record.Fields == nil {
							 | 
						|
										return fmt.Errorf("RecordValue.Fields is nil")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check for empty Fields map
							 | 
						|
									if len(record.Fields) == 0 {
							 | 
						|
										return fmt.Errorf("RecordValue has no fields")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If protobuf unmarshaling succeeded, the RecordValue is structurally valid
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// duplicated from master_grpc_server.go
							 | 
						|
								func findClientAddress(ctx context.Context) string {
							 | 
						|
									// fmt.Printf("FromContext %+v\n", ctx)
							 | 
						|
									pr, ok := peer.FromContext(ctx)
							 | 
						|
									if !ok {
							 | 
						|
										glog.Error("failed to get peer from ctx")
							 | 
						|
										return ""
							 | 
						|
									}
							 | 
						|
									if pr.Addr == net.Addr(nil) {
							 | 
						|
										glog.Error("failed to get peer address")
							 | 
						|
										return ""
							 | 
						|
									}
							 | 
						|
									return pr.Addr.String()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetPartitionRangeInfo returns comprehensive range information for a partition (offsets, timestamps, etc.)
							 | 
						|
								func (b *MessageQueueBroker) GetPartitionRangeInfo(ctx context.Context, req *mq_pb.GetPartitionRangeInfoRequest) (*mq_pb.GetPartitionRangeInfoResponse, error) {
							 | 
						|
									if req.Topic == nil || req.Partition == nil {
							 | 
						|
										return &mq_pb.GetPartitionRangeInfoResponse{
							 | 
						|
											Error: "topic and partition are required",
							 | 
						|
										}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									t := topic.FromPbTopic(req.Topic)
							 | 
						|
									p := topic.FromPbPartition(req.Partition)
							 | 
						|
								
							 | 
						|
									// Get offset information from the broker's internal method
							 | 
						|
									info, err := b.GetPartitionOffsetInfoInternal(t, p)
							 | 
						|
									if err != nil {
							 | 
						|
										return &mq_pb.GetPartitionRangeInfoResponse{
							 | 
						|
											Error: fmt.Sprintf("failed to get partition range info: %v", err),
							 | 
						|
										}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// TODO: Get timestamp range information from chunk metadata or log buffer
							 | 
						|
									// For now, we'll return zero values for timestamps - this can be enhanced later
							 | 
						|
									// to read from Extended attributes (ts_min, ts_max) from filer metadata
							 | 
						|
									timestampRange := &mq_pb.TimestampRangeInfo{
							 | 
						|
										EarliestTimestampNs: 0, // TODO: Read from chunk metadata ts_min
							 | 
						|
										LatestTimestampNs:   0, // TODO: Read from chunk metadata ts_max
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &mq_pb.GetPartitionRangeInfoResponse{
							 | 
						|
										OffsetRange: &mq_pb.OffsetRangeInfo{
							 | 
						|
											EarliestOffset: info.EarliestOffset,
							 | 
						|
											LatestOffset:   info.LatestOffset,
							 | 
						|
											HighWaterMark:  info.HighWaterMark,
							 | 
						|
										},
							 | 
						|
										TimestampRange:      timestampRange,
							 | 
						|
										RecordCount:         info.RecordCount,
							 | 
						|
										ActiveSubscriptions: info.ActiveSubscriptions,
							 | 
						|
									}, nil
							 | 
						|
								}
							 |