Browse Source

Apply client-specified timeout to context

pull/7329/head
chrislu 5 days ago
parent
commit
3e32331f38
  1. 2
      weed/mq/broker/broker_grpc_lookup.go
  2. 28
      weed/mq/broker/broker_topic_conf_read_write.go
  3. 53
      weed/mq/kafka/integration/broker_client_publish.go
  4. 3
      weed/mq/kafka/integration/types.go
  5. 21
      weed/mq/kafka/protocol/produce.go

2
weed/mq/broker/broker_grpc_lookup.go

@ -174,7 +174,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
}
if err != nil {
glog.V(0).Infof("📋 ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics))
glog.V(0).Infof("ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics))
// Still return in-memory topics even if filer fails
} else {
glog.V(4).Infof("📋 ListTopics completed successfully: %d total topics (in-memory + persisted)", len(ret.Topics))

28
weed/mq/broker/broker_topic_conf_read_write.go

@ -49,20 +49,20 @@ func (b *MessageQueueBroker) invalidateTopicCache(t topic.Topic) {
// This is the public API for reading topic config - always use this instead of direct filer reads.
func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.ConfigureTopicResponse, error) {
topicKey := t.String()
// Check unified cache first
b.topicCacheMu.RLock()
if entry, found := b.topicCache[topicKey]; found {
if time.Now().Before(entry.expiresAt) {
conf := entry.conf
b.topicCacheMu.RUnlock()
// If conf is nil, topic was cached as non-existent
if conf == nil {
glog.V(4).Infof("Topic cache HIT for %s: topic doesn't exist", topicKey)
return nil, fmt.Errorf("topic %v not found (cached)", t)
}
glog.V(4).Infof("Topic cache HIT for %s (skipping assignment validation)", topicKey)
// Cache hit - return immediately without validating assignments
// Assignments were validated when we first cached this config
@ -70,11 +70,11 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config
}
}
b.topicCacheMu.RUnlock()
// Cache miss or expired - read from filer
glog.V(4).Infof("Topic cache MISS for %s, reading from filer", topicKey)
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
// Negative cache: topic doesn't exist
b.topicCacheMu.Lock()
@ -86,7 +86,7 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config
glog.V(4).Infof("Topic cached as non-existent: %s", topicKey)
return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr)
}
// Validate broker assignments before caching (NOT holding cache lock)
// This ensures cached configs always have valid broker assignments
// Only done on cache miss (not on every lookup), saving 14% CPU
@ -100,15 +100,21 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config
// Don't cache on error - let next request retry
return conf, err
}
// Invalidate cache to force fresh read on next request
// This ensures all brokers see the updated assignments
// CRITICAL FIX: Invalidate cache while holding lock to prevent race condition
// Before the fix, between checking the cache and invalidating it, another goroutine
// could read stale data. Now we hold the lock throughout.
b.topicCacheMu.Lock()
delete(b.topicCache, topicKey)
// Cache the updated config with validated assignments
b.topicCache[topicKey] = &topicCacheEntry{
conf: conf,
expiresAt: time.Now().Add(b.topicCacheTTL),
}
b.topicCacheMu.Unlock()
glog.V(4).Infof("Invalidated cache for %s after assignment update", topicKey)
glog.V(4).Infof("Updated cache for %s after assignment update", topicKey)
return conf, nil
}
// Positive cache: topic exists with validated assignments
b.topicCacheMu.Lock()
b.topicCache[topicKey] = &topicCacheEntry{
@ -117,7 +123,7 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config
}
b.topicCacheMu.Unlock()
glog.V(4).Infof("Topic config cached for %s", topicKey)
return conf, nil
}

53
weed/mq/kafka/integration/broker_client_publish.go

@ -3,6 +3,7 @@ package integration
import (
"context"
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -145,14 +146,46 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br
}
bc.publishersLock.RUnlock()
// Create new publisher stream
bc.publishersLock.Lock()
defer bc.publishersLock.Unlock()
// CRITICAL FIX: Prevent multiple concurrent attempts to create the same publisher
// Use a creation lock that is specific to each topic-partition pair
// This ensures only ONE goroutine tries to create/initialize for each publisher
if bc.publisherCreationLocks == nil {
bc.publishersLock.Lock()
if bc.publisherCreationLocks == nil {
bc.publisherCreationLocks = make(map[string]*sync.Mutex)
}
bc.publishersLock.Unlock()
}
bc.publishersLock.RLock()
creationLock, exists := bc.publisherCreationLocks[key]
if !exists {
// Need to create a creation lock for this topic-partition
bc.publishersLock.RUnlock()
bc.publishersLock.Lock()
// Double-check if someone else created it
if lock, exists := bc.publisherCreationLocks[key]; exists {
creationLock = lock
} else {
creationLock = &sync.Mutex{}
bc.publisherCreationLocks[key] = creationLock
}
bc.publishersLock.Unlock()
} else {
bc.publishersLock.RUnlock()
}
// Double-check after acquiring write lock
// Acquire the creation lock - only ONE goroutine will proceed
creationLock.Lock()
defer creationLock.Unlock()
// Double-check if publisher was created while we were waiting for the lock
bc.publishersLock.RLock()
if session, exists := bc.publishers[key]; exists {
bc.publishersLock.RUnlock()
return session, nil
}
bc.publishersLock.RUnlock()
// Create the stream
stream, err := bc.client.PublishMessage(bc.ctx)
@ -160,13 +193,13 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br
return nil, fmt.Errorf("failed to create publish stream: %v", err)
}
// Get the actual partition assignment from the broker instead of using Kafka partition mapping
// Get the actual partition assignment from the broker
actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
if err != nil {
return nil, fmt.Errorf("failed to get actual partition assignment: %v", err)
}
// Send init message using the actual partition structure that the broker allocated
// Send init message
if err := stream.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
@ -183,9 +216,7 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br
return nil, fmt.Errorf("failed to send init message: %v", err)
}
// CRITICAL: Consume the "hello" message sent by broker after init
// Broker sends empty PublishMessageResponse{} on line 137 of broker_grpc_pub.go
// Without this, first Recv() in PublishRecord gets hello instead of data ack
// Consume the "hello" message sent by broker after init
helloResp, err := stream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive hello message: %v", err)
@ -200,7 +231,11 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br
Stream: stream,
}
// Store in the map under the publishersLock
bc.publishersLock.Lock()
bc.publishers[key] = session
bc.publishersLock.Unlock()
return session, nil
}

3
weed/mq/kafka/integration/types.go

@ -179,6 +179,9 @@ type BrokerClient struct {
publishersLock sync.RWMutex
publishers map[string]*BrokerPublisherSession
// Publisher creation locks to prevent concurrent creation attempts for the same topic-partition
publisherCreationLocks map[string]*sync.Mutex
// Subscriber streams for offset tracking
subscribersLock sync.RWMutex
subscribers map[string]*BrokerSubscriberSession

21
weed/mq/kafka/protocol/produce.go

@ -55,7 +55,15 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a
timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
_ = timeout // unused for now
// CRITICAL FIX: Apply client-specified timeout to context
// If client specifies a timeout, create a new context with that timeout
// This ensures broker connections respect the client's expectations
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
defer cancel()
}
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
@ -620,9 +628,18 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
_ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) // timeout
timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// CRITICAL FIX: Apply client-specified timeout to context
// If client specifies a timeout, create a new context with that timeout
// This ensures broker connections respect the client's expectations
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
defer cancel()
}
// Remember if this is fire-and-forget mode
isFireAndForget := acks == 0
if isFireAndForget {

Loading…
Cancel
Save