From e1a4bff7941e8c531e775c1c42f739f6a318341d Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 20:31:44 -0700 Subject: [PATCH] feat: add context timeout propagation to produce path This commit adds proper context propagation throughout the produce path, enabling client-side timeouts to be honored on the broker side. Previously, only fetch operations respected client timeouts - produce operations continued indefinitely even if the client gave up. Changes: - Add ctx parameter to ProduceRecord and ProduceRecordValue signatures - Add ctx parameter to PublishRecord and PublishRecordValue in BrokerClient - Add ctx parameter to handleProduce and related internal functions - Update all callers (protocol handlers, mocks, tests) to pass context - Add context cancellation checks in PublishRecord before operations Benefits: - Faster failure detection when client times out - No orphaned publish operations consuming broker resources - Resource efficiency improvements (no goroutine/stream/lock leaks) - Consistent timeout behavior between produce and fetch paths - Better error handling with proper cancellation signals This fixes the root cause of CI test timeouts where produce operations continued indefinitely after clients gave up, leading to cascading delays. --- weed/mq/kafka/gateway/test_mock_handler.go | 6 +-- .../integration/broker_client_publish.go | 26 +++++++++- .../mq/kafka/integration/seaweedmq_handler.go | 10 ++-- .../integration/seaweedmq_handler_test.go | 11 ++-- weed/mq/kafka/protocol/handler.go | 6 +-- .../kafka/protocol/metadata_blocking_test.go | 12 ++--- weed/mq/kafka/protocol/produce.go | 51 ++++++++++--------- 7 files changed, 76 insertions(+), 46 deletions(-) diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index 4bb0e28b1..8a76f811d 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -98,7 +98,7 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop return info, exists } -func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { m.mu.Lock() defer m.mu.Unlock() @@ -128,8 +128,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32 return offset, nil } -func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { - return m.ProduceRecord(topicName, partitionID, key, recordValueBytes) +func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { + return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes) } func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) { diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 4feda2973..15121de16 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -1,6 +1,7 @@ package integration import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -10,7 +11,12 @@ import ( ) // PublishRecord publishes a single record to SeaweedMQ broker -func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) { +// ctx controls the publish timeout - if client cancels, publish operation is cancelled +func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) { + // Check context before starting + if err := ctx.Err(); err != nil { + return 0, fmt.Errorf("context cancelled before publish: %w", err) + } session, err := bc.getOrCreatePublisher(topic, partition) if err != nil { @@ -26,6 +32,11 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, session.mu.Lock() defer session.mu.Unlock() + // Check context after acquiring lock + if err := ctx.Err(); err != nil { + return 0, fmt.Errorf("context cancelled after lock: %w", err) + } + // Send data message using broker API format dataMsg := &mq_pb.DataMessage{ Key: key, @@ -68,7 +79,13 @@ func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, } // PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker -func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) { +// ctx controls the publish timeout - if client cancels, publish operation is cancelled +func (bc *BrokerClient) PublishRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) { + // Check context before starting + if err := ctx.Err(); err != nil { + return 0, fmt.Errorf("context cancelled before publish: %w", err) + } + session, err := bc.getOrCreatePublisher(topic, partition) if err != nil { return 0, err @@ -82,6 +99,11 @@ func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key [] session.mu.Lock() defer session.mu.Unlock() + // Check context after acquiring lock + if err := ctx.Err(); err != nil { + return 0, fmt.Errorf("context cancelled after lock: %w", err) + } + // Send data message with RecordValue in the Value field dataMsg := &mq_pb.DataMessage{ Key: key, diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 0cb2e47bc..734916bb6 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -216,7 +216,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string { } // ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset -func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) { +// ctx controls the publish timeout - if client cancels, broker operation is cancelled +func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) { if len(key) > 0 { } if len(value) > 0 { @@ -237,7 +238,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by if h.brokerClient == nil { publishErr = fmt.Errorf("no broker client available") } else { - smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp) + smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp) } if publishErr != nil { @@ -258,7 +259,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by // ProduceRecordValue produces a record using RecordValue format to SeaweedMQ // ALWAYS uses broker's assigned offset - no ledger involved -func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) { +// ctx controls the publish timeout - if client cancels, broker operation is cancelled +func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) { // Verify topic exists if !h.TopicExists(topic) { return 0, fmt.Errorf("topic %s does not exist", topic) @@ -273,7 +275,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key if h.brokerClient == nil { publishErr = fmt.Errorf("no broker client available") } else { - smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp) + smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp) } if publishErr != nil { diff --git a/weed/mq/kafka/integration/seaweedmq_handler_test.go b/weed/mq/kafka/integration/seaweedmq_handler_test.go index a01152e79..d16d8e10f 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler_test.go +++ b/weed/mq/kafka/integration/seaweedmq_handler_test.go @@ -1,6 +1,7 @@ package integration import ( + "context" "testing" "time" ) @@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) { key := []byte("produce-key") value := []byte("produce-value") - offset, err := handler.ProduceRecord(topicName, 0, key, value) + offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value) if err != nil { t.Fatalf("Failed to produce record: %v", err) } @@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) { key := []byte("partition-key") value := []byte("partition-value") - offset, err := handler.ProduceRecord(topicName, partitionID, key, value) + offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value) if err != nil { t.Fatalf("Failed to produce to partition %d: %v", partitionID, err) } @@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) { var producedOffsets []int64 for i, record := range testRecords { - offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value)) + offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value)) if err != nil { t.Fatalf("Failed to produce record %d: %v", i, err) } @@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) { } // Test with very small maxBytes - _, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value")) + _, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value")) if err != nil { t.Fatalf("Failed to produce test record: %v", err) } @@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) { defer handler.Close() // Try to produce to non-existent topic - _, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value")) + _, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value")) if err == nil { t.Errorf("Producing to non-existent topic should fail") } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 87bb0d5ed..a2066a467 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -132,8 +132,8 @@ type SeaweedMQHandlerInterface interface { DeleteTopic(topic string) error GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) // Ledger methods REMOVED - SMQ handles Kafka offsets natively - ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) - ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) + ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) + ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) // GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations) // ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) @@ -1060,7 +1060,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { response, err = h.handleDeleteTopics(req.correlationID, req.requestBody) case APIKeyProduce: - response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody) + response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody) case APIKeyFetch: response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody) diff --git a/weed/mq/kafka/protocol/metadata_blocking_test.go b/weed/mq/kafka/protocol/metadata_blocking_test.go index 403489210..d6e5ee893 100644 --- a/weed/mq/kafka/protocol/metadata_blocking_test.go +++ b/weed/mq/kafka/protocol/metadata_blocking_test.go @@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo return nil, false } -func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } @@ -234,11 +234,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic return nil, false } -func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } @@ -320,11 +320,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT return nil, false } -func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index cae73aaa1..126410175 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "encoding/binary" "fmt" "strings" @@ -12,20 +13,20 @@ import ( "google.golang.org/protobuf/proto" ) -func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { +func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Version-specific handling switch apiVersion { case 0, 1: - return h.handleProduceV0V1(correlationID, apiVersion, requestBody) + return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody) case 2, 3, 4, 5, 6, 7: - return h.handleProduceV2Plus(correlationID, apiVersion, requestBody) + return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) } } -func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { +func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Parse Produce v0/v1 request // Request format: client_id + acks(2) + timeout(4) + topics_array @@ -147,11 +148,11 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req // Process the record set recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused if parseErr != nil { - errorCode = 42 // INVALID_RECORD - } else if recordCount > 0 { - // Use SeaweedMQ integration - offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) - if err != nil { + errorCode = 42 // INVALID_RECORD + } else if recordCount > 0 { + // Use SeaweedMQ integration + offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData) + if err != nil { // Check if this is a schema validation error and add delay to prevent overloading if h.isSchemaValidationError(err) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures @@ -232,7 +233,8 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total } // produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2) -func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) { +// ctx controls the publish timeout - if client cancels, produce operation is cancelled +func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) { // Extract all records from the record set and publish each one // extractAllRecords handles fallback internally for various cases records := h.extractAllRecords(recordSetData) @@ -244,7 +246,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat // Publish all records and return the offset of the first record (base offset) var baseOffset int64 for idx, kv := range records { - offsetProduced, err := h.produceSchemaBasedRecord(topic, partition, kv.Key, kv.Value) + offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value) if err != nil { return 0, err } @@ -581,7 +583,7 @@ func decodeVarint(data []byte) (int64, int) { } // handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) -func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { +func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { startTime := time.Now() // For now, use simplified parsing similar to v0/v1 but handle v2+ response format @@ -725,7 +727,7 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r } else { var firstOffsetSet bool for idx, kv := range records { - offsetProduced, prodErr := h.produceSchemaBasedRecord(topicName, int32(partitionID), kv.Key, kv.Value) + offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { // Check if this is a schema validation error and add delay to prevent overloading if h.isSchemaValidationError(prodErr) { @@ -795,7 +797,8 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r } // processSchematizedMessage processes a message that may contain schema information -func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error { +// ctx controls the publish timeout - if client cancels, process operation is cancelled +func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error { // System topics should bypass schema processing entirely if h.isSystemTopic(topicName) { return nil // Skip schema processing for system topics @@ -820,11 +823,12 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, } // Store the decoded message using SeaweedMQ - return h.storeDecodedMessage(topicName, partitionID, originalKey, decodedMsg) + return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg) } // storeDecodedMessage stores a decoded message using mq.broker integration -func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error { +// ctx controls the publish timeout - if client cancels, store operation is cancelled +func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error { // Use broker client if available if h.IsBrokerIntegrationEnabled() { // Use the original Kafka message key @@ -853,7 +857,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, origi // NOT just the Avro payload, so we can return them as-is during fetch without re-encoding value := decodedMsg.Envelope.OriginalBytes - _, err := h.seaweedMQHandler.ProduceRecord(topicName, partitionID, key, value) + _, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value) if err != nil { return fmt.Errorf("failed to produce to SeaweedMQ: %w", err) } @@ -1141,18 +1145,19 @@ func (h *Handler) isSystemTopic(topicName string) bool { } // produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue -func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key []byte, value []byte) (int64, error) { +// ctx controls the publish timeout - if client cancels, produce operation is cancelled +func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) { // System topics should always bypass schema processing and be stored as-is if h.isSystemTopic(topic) { - offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) + offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value) return offset, err } // If schema management is not enabled, fall back to raw message handling isEnabled := h.IsSchemaEnabled() if !isEnabled { - return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) + return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value) } var keyDecodedMsg *schema.DecodedMessage @@ -1190,7 +1195,7 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key [] // If neither key nor value is schematized, fall back to raw message handling // This is OK for non-schematized messages (no magic byte 0x00) if keyDecodedMsg == nil && valueDecodedMsg == nil { - return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) + return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value) } // Process key schema if present @@ -1261,10 +1266,10 @@ func (h *Handler) produceSchemaBasedRecord(topic string, partition int32, key [] // CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format) // This enables SQL queries to work properly. Kafka consumers will receive the RecordValue // which can be re-encoded to Confluent Wire Format during fetch if needed - return h.seaweedMQHandler.ProduceRecordValue(topic, partition, finalKey, recordValueBytes) + return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes) } else { // Send with raw format for non-schematized data - return h.seaweedMQHandler.ProduceRecord(topic, partition, finalKey, recordValueBytes) + return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes) } }