Browse Source

feat: add context timeout propagation to produce path

This commit adds proper context propagation throughout the produce path,
enabling client-side timeouts to be honored on the broker side. Previously,
only fetch operations respected client timeouts - produce operations continued
indefinitely even if the client gave up.

Changes:
- Add ctx parameter to ProduceRecord and ProduceRecordValue signatures
- Add ctx parameter to PublishRecord and PublishRecordValue in BrokerClient
- Add ctx parameter to handleProduce and related internal functions
- Update all callers (protocol handlers, mocks, tests) to pass context
- Add context cancellation checks in PublishRecord before operations

Benefits:
- Faster failure detection when client times out
- No orphaned publish operations consuming broker resources
- Resource efficiency improvements (no goroutine/stream/lock leaks)
- Consistent timeout behavior between produce and fetch paths
- Better error handling with proper cancellation signals

This fixes the root cause of CI test timeouts where produce operations
continued indefinitely after clients gave up, leading to cascading delays.
pull/7329/head
chrislu 4 weeks ago
parent
commit
e1a4bff794
  1. 6
      weed/mq/kafka/gateway/test_mock_handler.go
  2. 26
      weed/mq/kafka/integration/broker_client_publish.go
  3. 10
      weed/mq/kafka/integration/seaweedmq_handler.go
  4. 11
      weed/mq/kafka/integration/seaweedmq_handler_test.go
  5. 6
      weed/mq/kafka/protocol/handler.go
  6. 12
      weed/mq/kafka/protocol/metadata_blocking_test.go
  7. 51
      weed/mq/kafka/protocol/produce.go

6
weed/mq/kafka/gateway/test_mock_handler.go

@ -98,7 +98,7 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
return info, exists return info, exists
} }
func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -128,8 +128,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
return offset, nil return offset, nil
} }
func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes)
} }
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) { func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {

26
weed/mq/kafka/integration/broker_client_publish.go

@ -1,6 +1,7 @@
package integration package integration
import ( import (
"context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -10,7 +11,12 @@ import (
) )
// PublishRecord publishes a single record to SeaweedMQ broker // PublishRecord publishes a single record to SeaweedMQ broker
func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
// Check context before starting
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled before publish: %w", err)
}
session, err := bc.getOrCreatePublisher(topic, partition) session, err := bc.getOrCreatePublisher(topic, partition)
if err != nil { if err != nil {
@ -26,6 +32,11 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
session.mu.Lock() session.mu.Lock()
defer session.mu.Unlock() defer session.mu.Unlock()
// Check context after acquiring lock
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled after lock: %w", err)
}
// Send data message using broker API format // Send data message using broker API format
dataMsg := &mq_pb.DataMessage{ dataMsg := &mq_pb.DataMessage{
Key: key, Key: key,
@ -68,7 +79,13 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte,
} }
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker // PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker
func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
// ctx controls the publish timeout - if client cancels, publish operation is cancelled
func (bc *BrokerClient) PublishRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
// Check context before starting
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled before publish: %w", err)
}
session, err := bc.getOrCreatePublisher(topic, partition) session, err := bc.getOrCreatePublisher(topic, partition)
if err != nil { if err != nil {
return 0, err return 0, err
@ -82,6 +99,11 @@ func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []
session.mu.Lock() session.mu.Lock()
defer session.mu.Unlock() defer session.mu.Unlock()
// Check context after acquiring lock
if err := ctx.Err(); err != nil {
return 0, fmt.Errorf("context cancelled after lock: %w", err)
}
// Send data message with RecordValue in the Value field // Send data message with RecordValue in the Value field
dataMsg := &mq_pb.DataMessage{ dataMsg := &mq_pb.DataMessage{
Key: key, Key: key,

10
weed/mq/kafka/integration/seaweedmq_handler.go

@ -216,7 +216,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string {
} }
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset // ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
if len(key) > 0 { if len(key) > 0 {
} }
if len(value) > 0 { if len(value) > 0 {
@ -237,7 +238,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
if h.brokerClient == nil { if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available") publishErr = fmt.Errorf("no broker client available")
} else { } else {
smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp)
} }
if publishErr != nil { if publishErr != nil {
@ -258,7 +259,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ // ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
// ALWAYS uses broker's assigned offset - no ledger involved // ALWAYS uses broker's assigned offset - no ledger involved
func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, broker operation is cancelled
func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
// Verify topic exists // Verify topic exists
if !h.TopicExists(topic) { if !h.TopicExists(topic) {
return 0, fmt.Errorf("topic %s does not exist", topic) return 0, fmt.Errorf("topic %s does not exist", topic)
@ -273,7 +275,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key
if h.brokerClient == nil { if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available") publishErr = fmt.Errorf("no broker client available")
} else { } else {
smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp)
smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp)
} }
if publishErr != nil { if publishErr != nil {

11
weed/mq/kafka/integration/seaweedmq_handler_test.go

@ -1,6 +1,7 @@
package integration package integration
import ( import (
"context"
"testing" "testing"
"time" "time"
) )
@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) {
key := []byte("produce-key") key := []byte("produce-key")
value := []byte("produce-value") value := []byte("produce-value")
offset, err := handler.ProduceRecord(topicName, 0, key, value)
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value)
if err != nil { if err != nil {
t.Fatalf("Failed to produce record: %v", err) t.Fatalf("Failed to produce record: %v", err)
} }
@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) {
key := []byte("partition-key") key := []byte("partition-key")
value := []byte("partition-value") value := []byte("partition-value")
offset, err := handler.ProduceRecord(topicName, partitionID, key, value)
offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value)
if err != nil { if err != nil {
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err) t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
} }
@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) {
var producedOffsets []int64 var producedOffsets []int64
for i, record := range testRecords { for i, record := range testRecords {
offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value))
offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value))
if err != nil { if err != nil {
t.Fatalf("Failed to produce record %d: %v", i, err) t.Fatalf("Failed to produce record %d: %v", i, err)
} }
@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) {
} }
// Test with very small maxBytes // Test with very small maxBytes
_, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value"))
_, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value"))
if err != nil { if err != nil {
t.Fatalf("Failed to produce test record: %v", err) t.Fatalf("Failed to produce test record: %v", err)
} }
@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) {
defer handler.Close() defer handler.Close()
// Try to produce to non-existent topic // Try to produce to non-existent topic
_, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value"))
_, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value"))
if err == nil { if err == nil {
t.Errorf("Producing to non-existent topic should fail") t.Errorf("Producing to non-existent topic should fail")
} }

6
weed/mq/kafka/protocol/handler.go

@ -132,8 +132,8 @@ type SeaweedMQHandlerInterface interface {
DeleteTopic(topic string) error DeleteTopic(topic string) error
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
// Ledger methods REMOVED - SMQ handles Kafka offsets natively // Ledger methods REMOVED - SMQ handles Kafka offsets natively
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations) // GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime) // ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
@ -1060,7 +1060,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody) response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
case APIKeyProduce: case APIKeyProduce:
response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
case APIKeyFetch: case APIKeyFetch:
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody) response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)

12
weed/mq/kafka/protocol/metadata_blocking_test.go

@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo
return nil, false return nil, false
} }
func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }
func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }
@ -234,11 +234,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic
return nil, false return nil, false
} }
func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }
func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }
@ -320,11 +320,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT
return nil, false return nil, false
} }
func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }
func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented") return 0, fmt.Errorf("not implemented")
} }

51
weed/mq/kafka/protocol/produce.go

@ -1,6 +1,7 @@
package protocol package protocol
import ( import (
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"strings" "strings"
@ -12,20 +13,20 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Version-specific handling // Version-specific handling
switch apiVersion { switch apiVersion {
case 0, 1: case 0, 1:
return h.handleProduceV0V1(correlationID, apiVersion, requestBody)
return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
case 2, 3, 4, 5, 6, 7: case 2, 3, 4, 5, 6, 7:
return h.handleProduceV2Plus(correlationID, apiVersion, requestBody)
return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
default: default:
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
} }
} }
func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Parse Produce v0/v1 request // Parse Produce v0/v1 request
// Request format: client_id + acks(2) + timeout(4) + topics_array // Request format: client_id + acks(2) + timeout(4) + topics_array
@ -147,11 +148,11 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
// Process the record set // Process the record set
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
if parseErr != nil { if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Use SeaweedMQ integration
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Use SeaweedMQ integration
offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
if err != nil {
// Check if this is a schema validation error and add delay to prevent overloading // Check if this is a schema validation error and add delay to prevent overloading
if h.isSchemaValidationError(err) { if h.isSchemaValidationError(err) {
time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
@ -232,7 +233,8 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total
} }
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2) // produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
// Extract all records from the record set and publish each one // Extract all records from the record set and publish each one
// extractAllRecords handles fallback internally for various cases // extractAllRecords handles fallback internally for various cases
records := h.extractAllRecords(recordSetData) records := h.extractAllRecords(recordSetData)
@ -244,7 +246,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
// Publish all records and return the offset of the first record (base offset) // Publish all records and return the offset of the first record (base offset)
var baseOffset int64 var baseOffset int64
for idx, kv := range records { for idx, kv := range records {
offsetProduced, err := h.produceSchemaBasedRecord(topic, partition, kv.Key, kv.Value)
offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -581,7 +583,7 @@ func decodeVarint(data []byte) (int64, int) {
} }
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) // handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
startTime := time.Now() startTime := time.Now()
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format // For now, use simplified parsing similar to v0/v1 but handle v2+ response format
@ -725,7 +727,7 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
} else { } else {
var firstOffsetSet bool var firstOffsetSet bool
for idx, kv := range records { for idx, kv := range records {
offsetProduced, prodErr := h.produceSchemaBasedRecord(topicName, int32(partitionID), kv.Key, kv.Value)
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil { if prodErr != nil {
// Check if this is a schema validation error and add delay to prevent overloading // Check if this is a schema validation error and add delay to prevent overloading
if h.isSchemaValidationError(prodErr) { if h.isSchemaValidationError(prodErr) {
@ -795,7 +797,8 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
} }
// processSchematizedMessage processes a message that may contain schema information // processSchematizedMessage processes a message that may contain schema information
func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
// ctx controls the publish timeout - if client cancels, process operation is cancelled
func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error {
// System topics should bypass schema processing entirely // System topics should bypass schema processing entirely
if h.isSystemTopic(topicName) { if h.isSystemTopic(topicName) {
return nil // Skip schema processing for system topics return nil // Skip schema processing for system topics
@ -820,11 +823,12 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32,
} }
// Store the decoded message using SeaweedMQ // Store the decoded message using SeaweedMQ
return h.storeDecodedMessage(topicName, partitionID, originalKey, decodedMsg)
return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg)
} }
// storeDecodedMessage stores a decoded message using mq.broker integration // storeDecodedMessage stores a decoded message using mq.broker integration
func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
// ctx controls the publish timeout - if client cancels, store operation is cancelled
func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error {
// Use broker client if available // Use broker client if available
if h.IsBrokerIntegrationEnabled() { if h.IsBrokerIntegrationEnabled() {
// Use the original Kafka message key // Use the original Kafka message key
@ -853,7 +857,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, origi
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding // NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
value := decodedMsg.Envelope.OriginalBytes value := decodedMsg.Envelope.OriginalBytes
_, err := h.seaweedMQHandler.ProduceRecord(topicName, partitionID, key, value)
_, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value)
if err != nil { if err != nil {
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err) return fmt.Errorf("failed to produce to SeaweedMQ: %w", err)
} }
@ -1141,18 +1145,19 @@ func (h *Handler) isSystemTopic(topicName string) bool {
} }
// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue // produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
// System topics should always bypass schema processing and be stored as-is // System topics should always bypass schema processing and be stored as-is
if h.isSystemTopic(topic) { if h.isSystemTopic(topic) {
offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
return offset, err return offset, err
} }
// If schema management is not enabled, fall back to raw message handling // If schema management is not enabled, fall back to raw message handling
isEnabled := h.IsSchemaEnabled() isEnabled := h.IsSchemaEnabled()
if !isEnabled { if !isEnabled {
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
} }
var keyDecodedMsg *schema.DecodedMessage var keyDecodedMsg *schema.DecodedMessage
@ -1190,7 +1195,7 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
// If neither key nor value is schematized, fall back to raw message handling // If neither key nor value is schematized, fall back to raw message handling
// This is OK for non-schematized messages (no magic byte 0x00) // This is OK for non-schematized messages (no magic byte 0x00)
if keyDecodedMsg == nil && valueDecodedMsg == nil { if keyDecodedMsg == nil && valueDecodedMsg == nil {
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
} }
// Process key schema if present // Process key schema if present
@ -1261,10 +1266,10 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format) // CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue // This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
// which can be re-encoded to Confluent Wire Format during fetch if needed // which can be re-encoded to Confluent Wire Format during fetch if needed
return h.seaweedMQHandler.ProduceRecordValue(topic, partition, finalKey, recordValueBytes)
return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
} else { } else {
// Send with raw format for non-schematized data // Send with raw format for non-schematized data
return h.seaweedMQHandler.ProduceRecord(topic, partition, finalKey, recordValueBytes)
return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
} }
} }

Loading…
Cancel
Save