diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index ec7d156f8..5eec21b69 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -174,7 +174,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List } if err != nil { - glog.V(0).Infof("📋 ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics)) + glog.V(0).Infof("ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics)) // Still return in-memory topics even if filer fails } else { glog.V(4).Infof("📋 ListTopics completed successfully: %d total topics (in-memory + persisted)", len(ret.Topics)) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index f5af77cbc..138d1023e 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -49,20 +49,20 @@ func (b *MessageQueueBroker) invalidateTopicCache(t topic.Topic) { // This is the public API for reading topic config - always use this instead of direct filer reads. func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.ConfigureTopicResponse, error) { topicKey := t.String() - + // Check unified cache first b.topicCacheMu.RLock() if entry, found := b.topicCache[topicKey]; found { if time.Now().Before(entry.expiresAt) { conf := entry.conf b.topicCacheMu.RUnlock() - + // If conf is nil, topic was cached as non-existent if conf == nil { glog.V(4).Infof("Topic cache HIT for %s: topic doesn't exist", topicKey) return nil, fmt.Errorf("topic %v not found (cached)", t) } - + glog.V(4).Infof("Topic cache HIT for %s (skipping assignment validation)", topicKey) // Cache hit - return immediately without validating assignments // Assignments were validated when we first cached this config @@ -70,11 +70,11 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config } } b.topicCacheMu.RUnlock() - + // Cache miss or expired - read from filer glog.V(4).Infof("Topic cache MISS for %s, reading from filer", topicKey) conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) - + if readConfErr != nil { // Negative cache: topic doesn't exist b.topicCacheMu.Lock() @@ -86,7 +86,7 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr) } - + // Validate broker assignments before caching (NOT holding cache lock) // This ensures cached configs always have valid broker assignments // Only done on cache miss (not on every lookup), saving 14% CPU @@ -100,15 +100,21 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config // Don't cache on error - let next request retry return conf, err } - // Invalidate cache to force fresh read on next request - // This ensures all brokers see the updated assignments + // CRITICAL FIX: Invalidate cache while holding lock to prevent race condition + // Before the fix, between checking the cache and invalidating it, another goroutine + // could read stale data. Now we hold the lock throughout. b.topicCacheMu.Lock() delete(b.topicCache, topicKey) + // Cache the updated config with validated assignments + b.topicCache[topicKey] = &topicCacheEntry{ + conf: conf, + expiresAt: time.Now().Add(b.topicCacheTTL), + } b.topicCacheMu.Unlock() - glog.V(4).Infof("Invalidated cache for %s after assignment update", topicKey) + glog.V(4).Infof("Updated cache for %s after assignment update", topicKey) return conf, nil } - + // Positive cache: topic exists with validated assignments b.topicCacheMu.Lock() b.topicCache[topicKey] = &topicCacheEntry{ @@ -117,7 +123,7 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config } b.topicCacheMu.Unlock() glog.V(4).Infof("Topic config cached for %s", topicKey) - + return conf, nil } diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 4c24c6d91..64a62cf53 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -3,6 +3,7 @@ package integration import ( "context" "fmt" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -145,14 +146,46 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br } bc.publishersLock.RUnlock() - // Create new publisher stream - bc.publishersLock.Lock() - defer bc.publishersLock.Unlock() + // CRITICAL FIX: Prevent multiple concurrent attempts to create the same publisher + // Use a creation lock that is specific to each topic-partition pair + // This ensures only ONE goroutine tries to create/initialize for each publisher + if bc.publisherCreationLocks == nil { + bc.publishersLock.Lock() + if bc.publisherCreationLocks == nil { + bc.publisherCreationLocks = make(map[string]*sync.Mutex) + } + bc.publishersLock.Unlock() + } + + bc.publishersLock.RLock() + creationLock, exists := bc.publisherCreationLocks[key] + if !exists { + // Need to create a creation lock for this topic-partition + bc.publishersLock.RUnlock() + bc.publishersLock.Lock() + // Double-check if someone else created it + if lock, exists := bc.publisherCreationLocks[key]; exists { + creationLock = lock + } else { + creationLock = &sync.Mutex{} + bc.publisherCreationLocks[key] = creationLock + } + bc.publishersLock.Unlock() + } else { + bc.publishersLock.RUnlock() + } - // Double-check after acquiring write lock + // Acquire the creation lock - only ONE goroutine will proceed + creationLock.Lock() + defer creationLock.Unlock() + + // Double-check if publisher was created while we were waiting for the lock + bc.publishersLock.RLock() if session, exists := bc.publishers[key]; exists { + bc.publishersLock.RUnlock() return session, nil } + bc.publishersLock.RUnlock() // Create the stream stream, err := bc.client.PublishMessage(bc.ctx) @@ -160,13 +193,13 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br return nil, fmt.Errorf("failed to create publish stream: %v", err) } - // Get the actual partition assignment from the broker instead of using Kafka partition mapping + // Get the actual partition assignment from the broker actualPartition, err := bc.getActualPartitionAssignment(topic, partition) if err != nil { return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) } - // Send init message using the actual partition structure that the broker allocated + // Send init message if err := stream.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ @@ -183,9 +216,7 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br return nil, fmt.Errorf("failed to send init message: %v", err) } - // CRITICAL: Consume the "hello" message sent by broker after init - // Broker sends empty PublishMessageResponse{} on line 137 of broker_grpc_pub.go - // Without this, first Recv() in PublishRecord gets hello instead of data ack + // Consume the "hello" message sent by broker after init helloResp, err := stream.Recv() if err != nil { return nil, fmt.Errorf("failed to receive hello message: %v", err) @@ -200,7 +231,11 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br Stream: stream, } + // Store in the map under the publishersLock + bc.publishersLock.Lock() bc.publishers[key] = session + bc.publishersLock.Unlock() + return session, nil } diff --git a/weed/mq/kafka/integration/types.go b/weed/mq/kafka/integration/types.go index 3e7efcbdd..d707045e6 100644 --- a/weed/mq/kafka/integration/types.go +++ b/weed/mq/kafka/integration/types.go @@ -179,6 +179,9 @@ type BrokerClient struct { publishersLock sync.RWMutex publishers map[string]*BrokerPublisherSession + // Publisher creation locks to prevent concurrent creation attempts for the same topic-partition + publisherCreationLocks map[string]*sync.Mutex + // Subscriber streams for offset tracking subscribersLock sync.RWMutex subscribers map[string]*BrokerSubscriberSession diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 319568a78..75d82d8a4 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -55,7 +55,15 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - _ = timeout // unused for now + + // CRITICAL FIX: Apply client-specified timeout to context + // If client specifies a timeout, create a new context with that timeout + // This ensures broker connections respect the client's expectations + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) + defer cancel() + } topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 @@ -620,9 +628,18 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 - _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) // timeout + timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 + // CRITICAL FIX: Apply client-specified timeout to context + // If client specifies a timeout, create a new context with that timeout + // This ensures broker connections respect the client's expectations + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) + defer cancel() + } + // Remember if this is fire-and-forget mode isFireAndForget := acks == 0 if isFireAndForget {