|
|
@ -20,7 +20,7 @@ import ( |
|
|
|
func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { |
|
|
|
// get or generate a local partition
|
|
|
|
topicKey := t.String() |
|
|
|
|
|
|
|
|
|
|
|
// Check cache first to avoid expensive filer reads (60% CPU overhead!)
|
|
|
|
b.topicConfCacheMu.RLock() |
|
|
|
if entry, found := b.topicConfCache[topicKey]; found { |
|
|
@ -37,7 +37,7 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio |
|
|
|
} |
|
|
|
} |
|
|
|
b.topicConfCacheMu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
// Cache miss or expired - read from filer
|
|
|
|
glog.V(4).Infof("TopicConf cache MISS for %s, reading from filer", topicKey) |
|
|
|
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) |
|
|
@ -45,7 +45,7 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio |
|
|
|
glog.Errorf("topic %v not found: %v", t, readConfErr) |
|
|
|
return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Cache the result
|
|
|
|
b.topicConfCacheMu.Lock() |
|
|
|
b.topicConfCache[topicKey] = &topicConfCacheEntry{ |
|
|
|