From 9e78705a98faaa229f943137b78ca397d58d11d6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 13:05:08 -0700 Subject: [PATCH] refactor dedup --- .../integration/broker_client_subscribe.go | 142 +++++------------- 1 file changed, 40 insertions(+), 102 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index e1509642a..721b71b22 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -10,6 +10,30 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) +// createSubscribeInitMessage creates a subscribe init message with the given parameters +func createSubscribeInitMessage(topic string, actualPartition *schema_pb.Partition, startOffset int64, offsetType schema_pb.OffsetType, consumerGroup string, consumerID string) *mq_pb.SubscribeMessageRequest { + return &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: consumerGroup, + ConsumerId: consumerID, + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: actualPartition, + StartTsNs: 0, + StartOffset: startOffset, + }, + OffsetType: offsetType, + SlidingWindowSize: 10, + }, + }, + } +} + // CreateFreshSubscriber creates a new subscriber session without caching // This ensures each fetch gets fresh data from the requested offset // consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ @@ -28,40 +52,14 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) } - // Convert Kafka offset to SeaweedMQ OffsetType - var offsetType schema_pb.OffsetType - var startTimestamp int64 - var startOffsetValue int64 - // Use EXACT_OFFSET to read from the specific offset - offsetType = schema_pb.OffsetType_EXACT_OFFSET - startTimestamp = 0 - startOffsetValue = startOffset + offsetType := schema_pb.OffsetType_EXACT_OFFSET // Send init message to start subscription with Kafka client's consumer group and ID - initReq := &mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: consumerGroup, - ConsumerId: consumerID, - ClientId: "kafka-gateway", - Topic: &schema_pb.Topic{ - Namespace: "kafka", - Name: topic, - }, - PartitionOffset: &schema_pb.PartitionOffset{ - Partition: actualPartition, - StartTsNs: startTimestamp, - StartOffset: startOffsetValue, - }, - OffsetType: offsetType, - SlidingWindowSize: 10, - }, - }, - } + initReq := createSubscribeInitMessage(topic, actualPartition, startOffset, offsetType, consumerGroup, consumerID) glog.V(0).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", - topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + topic, partition, startOffset, offsetType, consumerGroup, consumerID) if err := stream.Send(initReq); err != nil { return nil, fmt.Errorf("failed to send subscribe init: %v", err) @@ -186,53 +184,32 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) } - // Convert Kafka offset to appropriate SeaweedMQ OffsetType and parameters + // Convert Kafka offset to appropriate SeaweedMQ OffsetType var offsetType schema_pb.OffsetType - var startTimestamp int64 - var startOffsetValue int64 + var offsetValue int64 if startOffset == -1 { // Kafka offset -1 typically means "latest" offsetType = schema_pb.OffsetType_RESET_TO_LATEST - startTimestamp = 0 // Not used with RESET_TO_LATEST - startOffsetValue = 0 // Not used with RESET_TO_LATEST + offsetValue = 0 // Not used with RESET_TO_LATEST glog.V(0).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") } else { // CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset // This allows the subscriber to read from both buffer and disk at the correct position offsetType = schema_pb.OffsetType_EXACT_OFFSET - startTimestamp = 0 // Not used with EXACT_OFFSET - startOffsetValue = startOffset // Use the exact Kafka offset + offsetValue = startOffset // Use the exact Kafka offset glog.V(0).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) } - glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)", - topic, partition, startOffset, offsetType, startTimestamp) + glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s", + topic, partition, startOffset, offsetType) glog.V(0).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", - topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + topic, partition, offsetValue, offsetType, consumerGroup, consumerID) // Send init message using the actual partition structure that the broker allocated - if err := stream.Send(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: consumerGroup, - ConsumerId: consumerID, - ClientId: "kafka-gateway", - Topic: &schema_pb.Topic{ - Namespace: "kafka", - Name: topic, - }, - PartitionOffset: &schema_pb.PartitionOffset{ - Partition: actualPartition, - StartTsNs: startTimestamp, - StartOffset: startOffsetValue, - }, - OffsetType: offsetType, // Use the correct offset type - SlidingWindowSize: 10, - }, - }, - }); err != nil { + initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID) + if err := stream.Send(initReq); err != nil { return nil, fmt.Errorf("failed to send subscribe init: %v", err) } @@ -383,36 +360,16 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // Use EXACT_OFFSET to position subscriber at the exact Kafka offset offsetType := schema_pb.OffsetType_EXACT_OFFSET - startTimestamp := int64(0) - startOffsetValue := requestedOffset glog.V(0).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", topic, partition, requestedOffset) glog.V(0).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", - topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID) + topic, partition, requestedOffset, offsetType, consumerGroup, consumerID) // Send init message using the actual partition structure - if err := stream.Send(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: consumerGroup, - ConsumerId: consumerID, - ClientId: "kafka-gateway", - Topic: &schema_pb.Topic{ - Namespace: "kafka", - Name: topic, - }, - PartitionOffset: &schema_pb.PartitionOffset{ - Partition: actualPartition, - StartTsNs: startTimestamp, - StartOffset: startOffsetValue, - }, - OffsetType: offsetType, - SlidingWindowSize: 10, - }, - }, - }); err != nil { + initReq := createSubscribeInitMessage(topic, actualPartition, requestedOffset, offsetType, consumerGroup, consumerID) + if err := stream.Send(initReq); err != nil { bc.subscribersLock.Unlock() _ = stream.CloseSend() return nil, fmt.Errorf("failed to send subscribe init: %v", err) @@ -779,26 +736,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO } // Send init message with new offset - initReq := &mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Init{ - Init: &mq_pb.SubscribeMessageRequest_InitMessage{ - ConsumerGroup: consumerGroup, - ConsumerId: consumerID, - ClientId: "kafka-gateway", - Topic: &schema_pb.Topic{ - Namespace: "kafka", - Name: session.Topic, - }, - PartitionOffset: &schema_pb.PartitionOffset{ - Partition: actualPartition, - StartTsNs: 0, - StartOffset: newOffset, - }, - OffsetType: schema_pb.OffsetType_EXACT_OFFSET, - SlidingWindowSize: 10, - }, - }, - } + initReq := createSubscribeInitMessage(session.Topic, actualPartition, newOffset, schema_pb.OffsetType_EXACT_OFFSET, consumerGroup, consumerID) if err := stream.Send(initReq); err != nil { cancel()