You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							423 lines
						
					
					
						
							15 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							423 lines
						
					
					
						
							15 KiB
						
					
					
				| package broker | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"io" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer" | |
| ) | |
| 
 | |
| func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { | |
| 
 | |
| 	req, err := stream.Recv() | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 	if req.GetInit() == nil { | |
| 		glog.Errorf("missing init message") | |
| 		return fmt.Errorf("missing init message") | |
| 	} | |
| 
 | |
| 	// Create a cancellable context so we can properly clean up when the client disconnects | |
| 	ctx, cancel := context.WithCancel(stream.Context()) | |
| 	defer cancel() // Ensure context is cancelled when function exits | |
|  | |
| 	clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) | |
| 
 | |
| 	t := topic.FromPbTopic(req.GetInit().Topic) | |
| 	partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) | |
| 
 | |
| 	glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) | |
| 
 | |
| 	glog.V(4).Infof("Calling GetOrGenerateLocalPartition for %s %s", t, partition) | |
| 	localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) | |
| 	if getOrGenErr != nil { | |
| 		glog.V(4).Infof("GetOrGenerateLocalPartition failed: %v", getOrGenErr) | |
| 		return getOrGenErr | |
| 	} | |
| 	glog.V(4).Infof("GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil) | |
| 	if localTopicPartition == nil { | |
| 		return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition) | |
| 	} | |
| 
 | |
| 	subscriber := topic.NewLocalSubscriber() | |
| 	localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber) | |
| 	glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) | |
| 	isConnected := true | |
| 
 | |
| 	var counter int64 | |
| 	startPosition := b.getRequestPosition(req.GetInit()) | |
| 	imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize)) | |
| 
 | |
| 	defer func() { | |
| 		isConnected = false | |
| 		// Clean up any in-flight messages to prevent them from blocking other subscribers | |
| 		if cleanedCount := imt.Cleanup(); cleanedCount > 0 { | |
| 			glog.V(0).Infof("Subscriber %s cleaned up %d in-flight messages on disconnect", clientName, cleanedCount) | |
| 		} | |
| 		localTopicPartition.Subscribers.RemoveSubscriber(clientName) | |
| 		glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) | |
| 		// Use topic-aware shutdown logic to prevent aggressive removal of system topics | |
| 		if localTopicPartition.MaybeShutdownLocalPartitionForTopic(t.Name) { | |
| 			b.localTopicManager.RemoveLocalPartition(t, partition) | |
| 		} | |
| 	}() | |
| 
 | |
| 	// connect to the follower | |
| 	var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient | |
| 	glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) | |
| 	if req.GetInit().FollowerBroker != "" { | |
| 		follower := req.GetInit().FollowerBroker | |
| 		if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil { | |
| 			return fmt.Errorf("fail to dial %s: %v", follower, err) | |
| 		} else { | |
| 			defer func() { | |
| 				println("closing SubscribeFollowMe connection", follower) | |
| 				if subscribeFollowMeStream != nil { | |
| 					subscribeFollowMeStream.CloseSend() | |
| 				} | |
| 				// followerGrpcConnection.Close() | |
| 			}() | |
| 			followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection) | |
| 			if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil { | |
| 				return fmt.Errorf("fail to subscribe to %s: %v", follower, err) | |
| 			} else { | |
| 				if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ | |
| 					Message: &mq_pb.SubscribeFollowMeRequest_Init{ | |
| 						Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{ | |
| 							Topic:         req.GetInit().Topic, | |
| 							Partition:     req.GetInit().GetPartitionOffset().Partition, | |
| 							ConsumerGroup: req.GetInit().ConsumerGroup, | |
| 						}, | |
| 					}, | |
| 				}); err != nil { | |
| 					return fmt.Errorf("fail to send init to %s: %v", follower, err) | |
| 				} | |
| 			} | |
| 		} | |
| 		glog.V(0).Infof("follower %s connected", follower) | |
| 	} | |
| 
 | |
| 	// Channel to handle seek requests - signals Subscribe loop to restart from new offset | |
| 	seekChan := make(chan *mq_pb.SubscribeMessageRequest_SeekMessage, 1) | |
| 
 | |
| 	go func() { | |
| 		defer cancel() // CRITICAL: Cancel context when Recv goroutine exits (client disconnect) | |
|  | |
| 		var lastOffset int64 | |
| 
 | |
| 		for { | |
| 			ack, err := stream.Recv() | |
| 
 | |
| 			if err != nil { | |
| 				if err == io.EOF { | |
| 					// the client has called CloseSend(). This is to ack the close. | |
| 					stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ | |
| 						Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ | |
| 							IsEndOfStream: true, | |
| 						}, | |
| 					}}) | |
| 					break | |
| 				} | |
| 				glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) | |
| 				break | |
| 			} | |
| 			// Handle seek messages | |
| 			if seekMsg := ack.GetSeek(); seekMsg != nil { | |
| 				glog.V(0).Infof("Subscriber %s received seek request to offset %d (type %v)", | |
| 					clientName, seekMsg.Offset, seekMsg.OffsetType) | |
| 
 | |
| 				// Send seek request to Subscribe loop | |
| 				select { | |
| 				case seekChan <- seekMsg: | |
| 					glog.V(0).Infof("Subscriber %s seek request queued", clientName) | |
| 				default: | |
| 					glog.V(0).Infof("Subscriber %s seek request dropped (already pending)", clientName) | |
| 					// Send error response if seek is already in progress | |
| 					stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ | |
| 						Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ | |
| 							Error: "Seek already in progress", | |
| 						}, | |
| 					}}) | |
| 				} | |
| 				continue | |
| 			} | |
| 
 | |
| 			if ack.GetAck().Key == nil { | |
| 				// skip ack for control messages | |
| 				continue | |
| 			} | |
| 			imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().TsNs) | |
| 
 | |
| 			currentLastOffset := imt.GetOldestAckedTimestamp() | |
| 			// Update acknowledged offset and last seen time for this subscriber when it sends an ack | |
| 			subscriber.UpdateAckedOffset(currentLastOffset) | |
| 			// fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().TsNs, currentLastOffset) | |
| 			if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { | |
| 				if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ | |
| 					Message: &mq_pb.SubscribeFollowMeRequest_Ack{ | |
| 						Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{ | |
| 							TsNs: currentLastOffset, | |
| 						}, | |
| 					}, | |
| 				}); err != nil { | |
| 					glog.Errorf("Error sending ack to follower: %v", err) | |
| 					break | |
| 				} | |
| 				lastOffset = currentLastOffset | |
| 				// fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset) | |
| 			} | |
| 		} | |
| 		if lastOffset > 0 { | |
| 			glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) | |
| 			if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { | |
| 				glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) | |
| 			} | |
| 		} | |
| 		if subscribeFollowMeStream != nil { | |
| 			if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ | |
| 				Message: &mq_pb.SubscribeFollowMeRequest_Close{ | |
| 					Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{}, | |
| 				}, | |
| 			}); err != nil { | |
| 				if err != io.EOF { | |
| 					glog.Errorf("Error sending close to follower: %v", err) | |
| 				} | |
| 			} | |
| 		} | |
| 	}() | |
| 
 | |
| 	// Create a goroutine to handle context cancellation and wake up the condition variable | |
| 	// This is created ONCE per subscriber, not per callback invocation | |
| 	go func() { | |
| 		<-ctx.Done() | |
| 		// Wake up the condition variable when context is cancelled | |
| 		localTopicPartition.ListenersLock.Lock() | |
| 		localTopicPartition.ListenersCond.Broadcast() | |
| 		localTopicPartition.ListenersLock.Unlock() | |
| 	}() | |
| 
 | |
| 	// Subscribe loop - can be restarted when seek is requested | |
| 	currentPosition := startPosition | |
| subscribeLoop: | |
| 	for { | |
| 		// Context for this iteration of Subscribe (can be cancelled by seek) | |
| 		subscribeCtx, subscribeCancel := context.WithCancel(ctx) | |
| 
 | |
| 		// Start Subscribe in a goroutine so we can interrupt it with seek | |
| 		subscribeDone := make(chan error, 1) | |
| 		go func() { | |
| 			subscribeErr := localTopicPartition.Subscribe(clientName, currentPosition, func() bool { | |
| 				// Check cancellation before waiting | |
| 				if subscribeCtx.Err() != nil || !isConnected { | |
| 					return false | |
| 				} | |
| 
 | |
| 				// Wait for new data using condition variable (blocking, not polling) | |
| 				localTopicPartition.ListenersLock.Lock() | |
| 				localTopicPartition.ListenersCond.Wait() | |
| 				localTopicPartition.ListenersLock.Unlock() | |
| 
 | |
| 				// After waking up, check if we should stop | |
| 				return subscribeCtx.Err() == nil && isConnected | |
| 			}, func(logEntry *filer_pb.LogEntry) (bool, error) { | |
| 				// Wait for the message to be acknowledged with a timeout to prevent infinite loops | |
| 				const maxWaitTime = 30 * time.Second | |
| 				const checkInterval = 137 * time.Millisecond | |
| 				startTime := time.Now() | |
| 
 | |
| 				for imt.IsInflight(logEntry.Key) { | |
| 					// Check if we've exceeded the maximum wait time | |
| 					if time.Since(startTime) > maxWaitTime { | |
| 						glog.Warningf("Subscriber %s: message with key %s has been in-flight for more than %v, forcing acknowledgment", | |
| 							clientName, string(logEntry.Key), maxWaitTime) | |
| 						// Force remove the message from in-flight tracking to prevent infinite loop | |
| 						imt.AcknowledgeMessage(logEntry.Key, logEntry.TsNs) | |
| 						break | |
| 					} | |
| 
 | |
| 					time.Sleep(checkInterval) | |
| 
 | |
| 					// Check if the client has disconnected by monitoring the context | |
| 					select { | |
| 					case <-subscribeCtx.Done(): | |
| 						err := subscribeCtx.Err() | |
| 						if err == context.Canceled { | |
| 							// Subscribe cancelled (seek or disconnect) | |
| 							return false, nil | |
| 						} | |
| 						glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) | |
| 						return false, nil | |
| 					default: | |
| 						// Continue processing the request | |
| 					} | |
| 				} | |
| 				if logEntry.Key != nil { | |
| 					imt.EnflightMessage(logEntry.Key, logEntry.TsNs) | |
| 				} | |
| 
 | |
| 				// Create the message to send | |
| 				dataMsg := &mq_pb.DataMessage{ | |
| 					Key:   logEntry.Key, | |
| 					Value: logEntry.Data, | |
| 					TsNs:  logEntry.TsNs, | |
| 				} | |
| 
 | |
| 
 | |
| 				if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ | |
| 					Data: dataMsg, | |
| 				}}); err != nil { | |
| 					glog.Errorf("Error sending data: %v", err) | |
| 					return false, err | |
| 				} | |
| 
 | |
| 				// Update received offset and last seen time for this subscriber | |
| 				subscriber.UpdateReceivedOffset(logEntry.TsNs) | |
| 
 | |
| 				counter++ | |
| 				return false, nil | |
| 			}) | |
| 			subscribeDone <- subscribeErr | |
| 		}() | |
| 
 | |
| 		// Wait for either Subscribe to complete or a seek request | |
| 		select { | |
| 		case err = <-subscribeDone: | |
| 			subscribeCancel() | |
| 			if err != nil || ctx.Err() != nil { | |
| 				// Subscribe finished with error or main context cancelled - exit loop | |
| 				break subscribeLoop | |
| 			} | |
| 			// Subscribe completed normally (shouldn't happen in streaming mode) | |
| 			break subscribeLoop | |
| 
 | |
| 		case seekMsg := <-seekChan: | |
| 			// Seek requested - cancel current Subscribe and restart from new offset | |
| 			glog.V(0).Infof("Subscriber %s seeking from offset %d to offset %d (type %v)", | |
| 				clientName, currentPosition.GetOffset(), seekMsg.Offset, seekMsg.OffsetType) | |
| 
 | |
| 			// Cancel current Subscribe iteration | |
| 			subscribeCancel() | |
| 
 | |
| 			// Wait for Subscribe to finish cancelling | |
| 			<-subscribeDone | |
| 
 | |
| 			// Update position for next iteration | |
| 			currentPosition = b.getRequestPositionFromSeek(seekMsg) | |
| 			glog.V(0).Infof("Subscriber %s restarting Subscribe from new offset %d", clientName, seekMsg.Offset) | |
| 
 | |
| 			// Send acknowledgment that seek completed | |
| 			stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ | |
| 				Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ | |
| 					Error: "", // Empty error means success | |
| 				}, | |
| 			}}) | |
| 
 | |
| 			// Loop will restart with new position | |
| 		} | |
| 	} | |
| 
 | |
| 	return err | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) { | |
| 	if initMessage == nil { | |
| 		return | |
| 	} | |
| 	offset := initMessage.GetPartitionOffset() | |
| 	offsetType := initMessage.OffsetType | |
| 
 | |
| 	// reset to earliest or latest | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST { | |
| 		startPosition = log_buffer.NewMessagePosition(1, -3) | |
| 		return | |
| 	} | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_LATEST { | |
| 		startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) | |
| 		return | |
| 	} | |
| 
 | |
| 	// use the exact timestamp | |
| 	if offsetType == schema_pb.OffsetType_EXACT_TS_NS { | |
| 		startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) | |
| 		return | |
| 	} | |
| 
 | |
| 	// use exact offset (native offset-based positioning) | |
| 	if offsetType == schema_pb.OffsetType_EXACT_OFFSET { | |
| 		startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset) | |
| 		return | |
| 	} | |
| 
 | |
| 	// reset to specific offset | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET { | |
| 		startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset) | |
| 		return | |
| 	} | |
| 
 | |
| 	// try to resume | |
| 	if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { | |
| 		glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) | |
| 		startPosition = log_buffer.NewMessagePosition(storedOffset, -2) | |
| 		return | |
| 	} | |
| 
 | |
| 	if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST { | |
| 		startPosition = log_buffer.NewMessagePosition(1, -5) | |
| 	} else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST { | |
| 		startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6) | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| // getRequestPositionFromSeek converts a seek request to a MessagePosition | |
| // This is used when implementing full seek support in Subscribe loop | |
| func (b *MessageQueueBroker) getRequestPositionFromSeek(seekMsg *mq_pb.SubscribeMessageRequest_SeekMessage) (startPosition log_buffer.MessagePosition) { | |
| 	if seekMsg == nil { | |
| 		return | |
| 	} | |
| 
 | |
| 	offsetType := seekMsg.OffsetType | |
| 	offset := seekMsg.Offset | |
| 
 | |
| 	// reset to earliest or latest | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST { | |
| 		startPosition = log_buffer.NewMessagePosition(1, -3) | |
| 		return | |
| 	} | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_LATEST { | |
| 		startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) | |
| 		return | |
| 	} | |
| 
 | |
| 	// use the exact timestamp | |
| 	if offsetType == schema_pb.OffsetType_EXACT_TS_NS { | |
| 		startPosition = log_buffer.NewMessagePosition(offset, -2) | |
| 		return | |
| 	} | |
| 
 | |
| 	// use exact offset (native offset-based positioning) | |
| 	if offsetType == schema_pb.OffsetType_EXACT_OFFSET { | |
| 		startPosition = log_buffer.NewMessagePositionFromOffset(offset) | |
| 		return | |
| 	} | |
| 
 | |
| 	// reset to specific offset | |
| 	if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET { | |
| 		startPosition = log_buffer.NewMessagePositionFromOffset(offset) | |
| 		return | |
| 	} | |
| 
 | |
| 	// default to exact offset | |
| 	startPosition = log_buffer.NewMessagePositionFromOffset(offset) | |
| 	return | |
| }
 |