Browse Source

perf: add partition assignment cache in gateway to eliminate 13.5% CPU overhead

CRITICAL: Gateway calling LookupTopicBrokers on EVERY fetch to translate
Kafka partition IDs to SeaweedFS partition ranges!

Problem (from CPU profile):
- getActualPartitionAssignment: 13.52% CPU (1.71s out of 12.65s)
- Called bc.client.LookupTopicBrokers on line 228 for EVERY fetch
- With 250 fetches/sec, this means 250 LookupTopicBrokers calls/sec!
- No caching at all - same overhead as broker had before optimization

Root Cause:
Gateway needs to translate Kafka partition IDs (0, 1, 2...) to SeaweedFS
partition ranges (0-341, 342-682, etc.) for every fetch request. This
translation requires calling LookupTopicBrokers to get partition assignments.

Without caching, every fetch request triggered:
1. gRPC call to broker (LookupTopicBrokers)
2. Broker reads from its cache (fast now after broker optimization)
3. gRPC response back to gateway
4. Gateway computes partition range mapping

The gRPC round-trip overhead was consuming 13.5% CPU even though broker
cache was fast!

Solution:
Added partitionAssignmentCache to BrokerClient:

Changes to types.go:
- Added partitionAssignmentCacheEntry struct (assignments + expiresAt)
- Added cache fields to BrokerClient:
  * partitionAssignmentCache map[string]*partitionAssignmentCacheEntry
  * partitionAssignmentCacheMu sync.RWMutex
  * partitionAssignmentCacheTTL time.Duration

Changes to broker_client.go:
- Initialize partitionAssignmentCache in NewBrokerClientWithFilerAccessor
- Set partitionAssignmentCacheTTL to 30 seconds (same as broker)

Changes to broker_client_publish.go:
- Added "time" import
- Modified getActualPartitionAssignment() to check cache first:
  * Cache HIT: Use cached assignments (fast )
  * Cache MISS: Call LookupTopicBrokers, cache result for 30s
- Extracted findPartitionInAssignments() helper function
  * Contains range calculation and partition matching logic
  * Reused for both cached and fresh lookups

Cache Behavior:
- First fetch: Cache MISS -> LookupTopicBrokers (~2ms) -> cache for 30s
- Next 7500 fetches in 30s: Cache HIT -> immediate return (~0.01ms)
- Cache automatically expires after 30s, re-validates on next fetch

Performance Impact:
With 250 fetches/sec and 5 topics:
- Before: 250 LookupTopicBrokers/sec = 500ms CPU overhead
- After: 0.17 LookupTopicBrokers/sec (5 topics / 30s TTL)
- Reduction: 99.93% fewer gRPC calls

Expected CPU Reduction:
- Before: 12.65s total, 1.71s in getActualPartitionAssignment (13.5%)
- After: ~11s total (-13.5% = 1.65s saved)
- Benefit: 13% lower CPU, more capacity for actual message processing

Cache Consistency:
- Same 30-second TTL as broker's topic config cache
- Partition assignments rarely change (only on topic reconfiguration)
- 30-second staleness is acceptable for partition mapping
- Gateway will eventually converge with broker's view

Testing:
-  Compiles successfully
- Ready to deploy and measure CPU improvement

Priority: CRITICAL - Eliminates major performance bottleneck in gateway fetch path
pull/7329/head
chrislu 6 days ago
parent
commit
98ec5e03eb
  1. 20
      weed/mq/kafka/integration/broker_client.go
  2. 39
      weed/mq/kafka/integration/broker_client_publish.go
  3. 11
      weed/mq/kafka/integration/types.go

20
weed/mq/kafka/integration/broker_client.go

@ -45,15 +45,17 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor
client := mq_pb.NewSeaweedMessagingClient(conn)
return &BrokerClient{
filerClientAccessor: filerClientAccessor,
brokerAddress: brokerAddress,
conn: conn,
client: client,
publishers: make(map[string]*BrokerPublisherSession),
subscribers: make(map[string]*BrokerSubscriberSession),
fetchRequests: make(map[string]*FetchRequest),
ctx: ctx,
cancel: cancel,
filerClientAccessor: filerClientAccessor,
brokerAddress: brokerAddress,
conn: conn,
client: client,
publishers: make(map[string]*BrokerPublisherSession),
subscribers: make(map[string]*BrokerSubscriberSession),
fetchRequests: make(map[string]*FetchRequest),
partitionAssignmentCache: make(map[string]*partitionAssignmentCacheEntry),
partitionAssignmentCacheTTL: 30 * time.Second, // Same as broker's cache TTL
ctx: ctx,
cancel: cancel,
}, nil
}

39
weed/mq/kafka/integration/broker_client_publish.go

@ -3,6 +3,7 @@ package integration
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
@ -223,8 +224,23 @@ func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error {
}
// getActualPartitionAssignment looks up the actual partition assignment from the broker configuration
// Uses cache to avoid expensive LookupTopicBrokers calls on every fetch (13.5% CPU overhead!)
func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) {
// Look up the topic configuration from the broker to get the actual partition assignments
// Check cache first
bc.partitionAssignmentCacheMu.RLock()
if entry, found := bc.partitionAssignmentCache[topic]; found {
if time.Now().Before(entry.expiresAt) {
assignments := entry.assignments
bc.partitionAssignmentCacheMu.RUnlock()
glog.V(4).Infof("Partition assignment cache HIT for topic %s", topic)
// Use cached assignments to find partition
return bc.findPartitionInAssignments(topic, kafkaPartition, assignments)
}
}
bc.partitionAssignmentCacheMu.RUnlock()
// Cache miss or expired - lookup from broker
glog.V(4).Infof("Partition assignment cache MISS for topic %s, calling LookupTopicBrokers", topic)
lookupResp, err := bc.client.LookupTopicBrokers(bc.ctx, &mq_pb.LookupTopicBrokersRequest{
Topic: &schema_pb.Topic{
Namespace: "kafka",
@ -239,7 +255,22 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio
return nil, fmt.Errorf("no partition assignments found for topic %s", topic)
}
totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments))
// Cache the assignments
bc.partitionAssignmentCacheMu.Lock()
bc.partitionAssignmentCache[topic] = &partitionAssignmentCacheEntry{
assignments: lookupResp.BrokerPartitionAssignments,
expiresAt: time.Now().Add(bc.partitionAssignmentCacheTTL),
}
bc.partitionAssignmentCacheMu.Unlock()
glog.V(4).Infof("Cached partition assignments for topic %s", topic)
// Use freshly fetched assignments to find partition
return bc.findPartitionInAssignments(topic, kafkaPartition, lookupResp.BrokerPartitionAssignments)
}
// findPartitionInAssignments finds the SeaweedFS partition for a given Kafka partition ID
func (bc *BrokerClient) findPartitionInAssignments(topic string, kafkaPartition int32, assignments []*mq_pb.BrokerPartitionAssignment) (*schema_pb.Partition, error) {
totalPartitions := int32(len(assignments))
if kafkaPartition >= totalPartitions {
return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions",
kafkaPartition, topic, totalPartitions)
@ -262,7 +293,7 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio
kafkaPartition, topic, expectedRangeStart, expectedRangeStop, totalPartitions)
// Find the broker assignment that matches this range
for _, assignment := range lookupResp.BrokerPartitionAssignments {
for _, assignment := range assignments {
if assignment.Partition == nil {
continue
}
@ -280,7 +311,7 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio
glog.Warningf("no partition assignment found for Kafka partition %d in topic %s with expected range [%d, %d]",
kafkaPartition, topic, expectedRangeStart, expectedRangeStop)
glog.Warningf("Available assignments:")
for i, assignment := range lookupResp.BrokerPartitionAssignments {
for i, assignment := range assignments {
if assignment.Partition != nil {
glog.Warningf(" Assignment[%d]: {RangeStart: %d, RangeStop: %d, RingSize: %d}",
i, assignment.Partition.RangeStart, assignment.Partition.RangeStop, assignment.Partition.RingSize)

11
weed/mq/kafka/integration/types.go

@ -161,6 +161,12 @@ type FetchResult struct {
err error
}
// partitionAssignmentCacheEntry caches LookupTopicBrokers results
type partitionAssignmentCacheEntry struct {
assignments []*mq_pb.BrokerPartitionAssignment
expiresAt time.Time
}
type BrokerClient struct {
// Reference to shared filer client accessor
filerClientAccessor *filer_client.FilerClientAccessor
@ -181,6 +187,11 @@ type BrokerClient struct {
fetchRequestsLock sync.Mutex
fetchRequests map[string]*FetchRequest
// Partition assignment cache to reduce LookupTopicBrokers calls (13.5% CPU overhead!)
partitionAssignmentCache map[string]*partitionAssignmentCacheEntry // Key: topic name
partitionAssignmentCacheMu sync.RWMutex
partitionAssignmentCacheTTL time.Duration
ctx context.Context
cancel context.CancelFunc
}

Loading…
Cancel
Save