Browse Source

refactor dedup

pull/7329/head
chrislu 7 days ago
parent
commit
9e78705a98
  1. 142
      weed/mq/kafka/integration/broker_client_subscribe.go

142
weed/mq/kafka/integration/broker_client_subscribe.go

@ -10,6 +10,30 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// createSubscribeInitMessage creates a subscribe init message with the given parameters
func createSubscribeInitMessage(topic string, actualPartition *schema_pb.Partition, startOffset int64, offsetType schema_pb.OffsetType, consumerGroup string, consumerID string) *mq_pb.SubscribeMessageRequest {
return &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: 0,
StartOffset: startOffset,
},
OffsetType: offsetType,
SlidingWindowSize: 10,
},
},
}
}
// CreateFreshSubscriber creates a new subscriber session without caching
// This ensures each fetch gets fresh data from the requested offset
// consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ
@ -28,40 +52,14 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
}
// Convert Kafka offset to SeaweedMQ OffsetType
var offsetType schema_pb.OffsetType
var startTimestamp int64
var startOffsetValue int64
// Use EXACT_OFFSET to read from the specific offset
offsetType = schema_pb.OffsetType_EXACT_OFFSET
startTimestamp = 0
startOffsetValue = startOffset
offsetType := schema_pb.OffsetType_EXACT_OFFSET
// Send init message to start subscription with Kafka client's consumer group and ID
initReq := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: startTimestamp,
StartOffset: startOffsetValue,
},
OffsetType: offsetType,
SlidingWindowSize: 10,
},
},
}
initReq := createSubscribeInitMessage(topic, actualPartition, startOffset, offsetType, consumerGroup, consumerID)
glog.V(0).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
topic, partition, startOffset, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
@ -186,53 +184,32 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
}
// Convert Kafka offset to appropriate SeaweedMQ OffsetType and parameters
// Convert Kafka offset to appropriate SeaweedMQ OffsetType
var offsetType schema_pb.OffsetType
var startTimestamp int64
var startOffsetValue int64
var offsetValue int64
if startOffset == -1 {
// Kafka offset -1 typically means "latest"
offsetType = schema_pb.OffsetType_RESET_TO_LATEST
startTimestamp = 0 // Not used with RESET_TO_LATEST
startOffsetValue = 0 // Not used with RESET_TO_LATEST
offsetValue = 0 // Not used with RESET_TO_LATEST
glog.V(0).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)")
} else {
// CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset
// This allows the subscriber to read from both buffer and disk at the correct position
offsetType = schema_pb.OffsetType_EXACT_OFFSET
startTimestamp = 0 // Not used with EXACT_OFFSET
startOffsetValue = startOffset // Use the exact Kafka offset
offsetValue = startOffset // Use the exact Kafka offset
glog.V(0).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset)
}
glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)",
topic, partition, startOffset, offsetType, startTimestamp)
glog.V(0).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s",
topic, partition, startOffset, offsetType)
glog.V(0).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
topic, partition, offsetValue, offsetType, consumerGroup, consumerID)
// Send init message using the actual partition structure that the broker allocated
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: startTimestamp,
StartOffset: startOffsetValue,
},
OffsetType: offsetType, // Use the correct offset type
SlidingWindowSize: 10,
},
},
}); err != nil {
initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
@ -383,36 +360,16 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
// Use EXACT_OFFSET to position subscriber at the exact Kafka offset
offsetType := schema_pb.OffsetType_EXACT_OFFSET
startTimestamp := int64(0)
startOffsetValue := requestedOffset
glog.V(0).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d",
topic, partition, requestedOffset)
glog.V(0).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, startOffsetValue, offsetType, consumerGroup, consumerID)
topic, partition, requestedOffset, offsetType, consumerGroup, consumerID)
// Send init message using the actual partition structure
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: startTimestamp,
StartOffset: startOffsetValue,
},
OffsetType: offsetType,
SlidingWindowSize: 10,
},
},
}); err != nil {
initReq := createSubscribeInitMessage(topic, actualPartition, requestedOffset, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
bc.subscribersLock.Unlock()
_ = stream.CloseSend()
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
@ -779,26 +736,7 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO
}
// Send init message with new offset
initReq := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: session.Topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: 0,
StartOffset: newOffset,
},
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
SlidingWindowSize: 10,
},
},
}
initReq := createSubscribeInitMessage(session.Topic, actualPartition, newOffset, schema_pb.OffsetType_EXACT_OFFSET, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
cancel()

Loading…
Cancel
Save