diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index 4030ab0e2..7a633d93e 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -45,15 +45,17 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor client := mq_pb.NewSeaweedMessagingClient(conn) return &BrokerClient{ - filerClientAccessor: filerClientAccessor, - brokerAddress: brokerAddress, - conn: conn, - client: client, - publishers: make(map[string]*BrokerPublisherSession), - subscribers: make(map[string]*BrokerSubscriberSession), - fetchRequests: make(map[string]*FetchRequest), - ctx: ctx, - cancel: cancel, + filerClientAccessor: filerClientAccessor, + brokerAddress: brokerAddress, + conn: conn, + client: client, + publishers: make(map[string]*BrokerPublisherSession), + subscribers: make(map[string]*BrokerSubscriberSession), + fetchRequests: make(map[string]*FetchRequest), + partitionAssignmentCache: make(map[string]*partitionAssignmentCacheEntry), + partitionAssignmentCacheTTL: 30 * time.Second, // Same as broker's cache TTL + ctx: ctx, + cancel: cancel, }, nil } diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 388e22d9f..4c24c6d91 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -3,6 +3,7 @@ package integration import ( "context" "fmt" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" @@ -223,8 +224,23 @@ func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error { } // getActualPartitionAssignment looks up the actual partition assignment from the broker configuration +// Uses cache to avoid expensive LookupTopicBrokers calls on every fetch (13.5% CPU overhead!) func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) { - // Look up the topic configuration from the broker to get the actual partition assignments + // Check cache first + bc.partitionAssignmentCacheMu.RLock() + if entry, found := bc.partitionAssignmentCache[topic]; found { + if time.Now().Before(entry.expiresAt) { + assignments := entry.assignments + bc.partitionAssignmentCacheMu.RUnlock() + glog.V(4).Infof("Partition assignment cache HIT for topic %s", topic) + // Use cached assignments to find partition + return bc.findPartitionInAssignments(topic, kafkaPartition, assignments) + } + } + bc.partitionAssignmentCacheMu.RUnlock() + + // Cache miss or expired - lookup from broker + glog.V(4).Infof("Partition assignment cache MISS for topic %s, calling LookupTopicBrokers", topic) lookupResp, err := bc.client.LookupTopicBrokers(bc.ctx, &mq_pb.LookupTopicBrokersRequest{ Topic: &schema_pb.Topic{ Namespace: "kafka", @@ -239,7 +255,22 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio return nil, fmt.Errorf("no partition assignments found for topic %s", topic) } - totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments)) + // Cache the assignments + bc.partitionAssignmentCacheMu.Lock() + bc.partitionAssignmentCache[topic] = &partitionAssignmentCacheEntry{ + assignments: lookupResp.BrokerPartitionAssignments, + expiresAt: time.Now().Add(bc.partitionAssignmentCacheTTL), + } + bc.partitionAssignmentCacheMu.Unlock() + glog.V(4).Infof("Cached partition assignments for topic %s", topic) + + // Use freshly fetched assignments to find partition + return bc.findPartitionInAssignments(topic, kafkaPartition, lookupResp.BrokerPartitionAssignments) +} + +// findPartitionInAssignments finds the SeaweedFS partition for a given Kafka partition ID +func (bc *BrokerClient) findPartitionInAssignments(topic string, kafkaPartition int32, assignments []*mq_pb.BrokerPartitionAssignment) (*schema_pb.Partition, error) { + totalPartitions := int32(len(assignments)) if kafkaPartition >= totalPartitions { return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions", kafkaPartition, topic, totalPartitions) @@ -262,7 +293,7 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio kafkaPartition, topic, expectedRangeStart, expectedRangeStop, totalPartitions) // Find the broker assignment that matches this range - for _, assignment := range lookupResp.BrokerPartitionAssignments { + for _, assignment := range assignments { if assignment.Partition == nil { continue } @@ -280,7 +311,7 @@ func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartitio glog.Warningf("no partition assignment found for Kafka partition %d in topic %s with expected range [%d, %d]", kafkaPartition, topic, expectedRangeStart, expectedRangeStop) glog.Warningf("Available assignments:") - for i, assignment := range lookupResp.BrokerPartitionAssignments { + for i, assignment := range assignments { if assignment.Partition != nil { glog.Warningf(" Assignment[%d]: {RangeStart: %d, RangeStop: %d, RingSize: %d}", i, assignment.Partition.RangeStart, assignment.Partition.RangeStop, assignment.Partition.RingSize) diff --git a/weed/mq/kafka/integration/types.go b/weed/mq/kafka/integration/types.go index ac4ae428f..3e7efcbdd 100644 --- a/weed/mq/kafka/integration/types.go +++ b/weed/mq/kafka/integration/types.go @@ -161,6 +161,12 @@ type FetchResult struct { err error } +// partitionAssignmentCacheEntry caches LookupTopicBrokers results +type partitionAssignmentCacheEntry struct { + assignments []*mq_pb.BrokerPartitionAssignment + expiresAt time.Time +} + type BrokerClient struct { // Reference to shared filer client accessor filerClientAccessor *filer_client.FilerClientAccessor @@ -181,6 +187,11 @@ type BrokerClient struct { fetchRequestsLock sync.Mutex fetchRequests map[string]*FetchRequest + // Partition assignment cache to reduce LookupTopicBrokers calls (13.5% CPU overhead!) + partitionAssignmentCache map[string]*partitionAssignmentCacheEntry // Key: topic name + partitionAssignmentCacheMu sync.RWMutex + partitionAssignmentCacheTTL time.Duration + ctx context.Context cancel context.CancelFunc }