Browse Source

refactor: merge topicExistsCache and topicConfCache into unified topicCache

Merged two separate caches into one unified cache to simplify code and
reduce memory usage. The unified cache stores both topic existence and
configuration in a single structure.

Design:
- Single topicCacheEntry with optional *ConfigureTopicResponse
- If conf != nil: topic exists with full configuration
- If conf == nil: topic doesn't exist (negative cache)
- Same 30-second TTL for both existence and config caching

Changes to broker_server.go:
- Removed topicExistsCacheEntry struct
- Removed topicConfCacheEntry struct
- Added unified topicCacheEntry struct (conf can be nil)
- Removed topicExistsCache, topicExistsCacheMu, topicExistsCacheTTL
- Removed topicConfCache, topicConfCacheMu, topicConfCacheTTL
- Added unified topicCache, topicCacheMu, topicCacheTTL
- Updated NewMessageBroker() to initialize single cache

Changes to broker_topic_conf_read_write.go:
- Modified GetOrGenerateLocalPartition() to use unified cache
- Added negative caching (conf=nil) when topic not found
- Renamed invalidateTopicConfCache() to invalidateTopicCache()
- Single cache lookup instead of two separate checks

Changes to broker_grpc_lookup.go:
- Modified TopicExists() to use unified cache
- Check: exists = (entry.conf != nil)
- Only cache negative results (conf=nil) in TopicExists
- Positive results cached by GetOrGenerateLocalPartition
- Removed old invalidateTopicExistsCache() function

Changes to broker_grpc_configure.go:
- Updated invalidateTopicExistsCache() calls to invalidateTopicCache()
- Two call sites updated

Benefits:
1. Code Simplification: One cache instead of two
2. Memory Reduction: Single map, single mutex, single TTL
3. Consistency: No risk of cache desync between existence and config
4. Less Lock Contention: One lock instead of two
5. Easier Maintenance: Single invalidation function
6. Same Performance: Still eliminates 60% CPU overhead

Cache Behavior:
- TopicExists: Lightweight check, only caches negative (conf=nil)
- GetOrGenerateLocalPartition: Full config read, caches positive (conf != nil)
- Both share same 30s TTL
- Both use same invalidation on topic create/update/delete

Testing:
-  Compiles successfully
- Ready for integration testing

This refactor maintains all performance benefits while simplifying
the codebase and reducing memory footprint.
pull/7329/head
chrislu 6 days ago
parent
commit
b9ad795dce
  1. 8
      weed/mq/broker/broker_grpc_configure.go
  2. 51
      weed/mq/broker/broker_grpc_lookup.go
  3. 50
      weed/mq/broker/broker_server.go
  4. 62
      weed/mq/broker/broker_topic_conf_read_write.go

8
weed/mq/broker/broker_grpc_configure.go

@ -77,8 +77,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, fmt.Errorf("update topic schemas: %w", err)
}
// Invalidate TopicExists cache since we just updated the topic
b.invalidateTopicExistsCache(t)
// Invalidate topic cache since we just updated the topic
b.invalidateTopicCache(t)
glog.V(0).Infof("updated schemas for topic %s", request.Topic)
return resp, nil
@ -105,8 +105,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return nil, fmt.Errorf("configure topic: %w", err)
}
// Invalidate TopicExists cache since we just created/updated the topic
b.invalidateTopicExistsCache(t)
// Invalidate topic cache since we just created/updated the topic
b.invalidateTopicCache(t)
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)

51
weed/mq/broker/broker_grpc_lookup.go

@ -179,7 +179,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
}
// TopicExists checks if a topic exists in memory or filer
// Caches both positive and negative results to reduce filer load
// Uses unified cache (checks if config is non-nil) to reduce filer load
func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.TopicExistsRequest) (*mq_pb.TopicExistsResponse, error) {
if !b.isLockOwner() {
var resp *mq_pb.TopicExistsResponse
@ -210,19 +210,20 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top
return &mq_pb.TopicExistsResponse{Exists: true}, nil
}
// Check cache for filer lookup results (both positive and negative)
b.topicExistsCacheMu.RLock()
if entry, found := b.topicExistsCache[topicKey]; found {
// Check unified cache (if conf != nil, topic exists; if conf == nil, doesn't exist)
b.topicCacheMu.RLock()
if entry, found := b.topicCache[topicKey]; found {
if time.Now().Before(entry.expiresAt) {
b.topicExistsCacheMu.RUnlock()
glog.V(4).Infof("TopicExists cache HIT for %s: %v", topicKey, entry.exists)
return &mq_pb.TopicExistsResponse{Exists: entry.exists}, nil
exists := entry.conf != nil
b.topicCacheMu.RUnlock()
glog.V(4).Infof("Topic cache HIT for %s: exists=%v", topicKey, exists)
return &mq_pb.TopicExistsResponse{Exists: exists}, nil
}
}
b.topicExistsCacheMu.RUnlock()
b.topicCacheMu.RUnlock()
// Cache miss or expired - query filer for persisted topics
glog.V(4).Infof("TopicExists cache MISS for %s, querying filer", topicKey)
// Cache miss or expired - query filer for persisted topics (lightweight check)
glog.V(4).Infof("Topic cache MISS for %s, querying filer for existence", topicKey)
exists := false
err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name)
@ -242,28 +243,24 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top
return &mq_pb.TopicExistsResponse{Exists: false}, nil
}
// Update cache with result (both positive and negative)
b.topicExistsCacheMu.Lock()
b.topicExistsCache[topicKey] = &topicExistsCacheEntry{
exists: exists,
expiresAt: time.Now().Add(b.topicExistsCacheTTL),
// Update unified cache with lightweight result (don't read full config yet)
// Cache existence info: conf=nil for non-existent (we don't have full config yet for existent)
b.topicCacheMu.Lock()
if !exists {
// Negative cache: topic definitely doesn't exist
b.topicCache[topicKey] = &topicCacheEntry{
conf: nil,
expiresAt: time.Now().Add(b.topicCacheTTL),
}
glog.V(4).Infof("Topic cached as non-existent: %s", topicKey)
}
b.topicExistsCacheMu.Unlock()
glog.V(4).Infof("TopicExists cached result for %s: %v", topicKey, exists)
// Note: For positive existence, we don't cache here to avoid partial state
// The config will be cached when GetOrGenerateLocalPartition reads it
b.topicCacheMu.Unlock()
return &mq_pb.TopicExistsResponse{Exists: exists}, nil
}
// invalidateTopicExistsCache removes a topic from the cache
// Should be called when a topic is created or deleted
func (b *MessageQueueBroker) invalidateTopicExistsCache(t topic.Topic) {
topicKey := t.String()
b.topicExistsCacheMu.Lock()
delete(b.topicExistsCache, topicKey)
b.topicExistsCacheMu.Unlock()
glog.V(4).Infof("Invalidated TopicExists cache for %s", topicKey)
}
// GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
if !b.isLockOwner() {

50
weed/mq/broker/broker_server.go

@ -39,13 +39,11 @@ func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress {
return pb.NewServerAddress(option.Ip, option.Port, 0)
}
type topicExistsCacheEntry struct {
exists bool
expiresAt time.Time
}
type topicConfCacheEntry struct {
conf *mq_pb.ConfigureTopicResponse
// topicCacheEntry caches both topic existence and configuration
// If conf is nil, topic doesn't exist (negative cache)
// If conf is non-nil, topic exists with this configuration (positive cache)
type topicCacheEntry struct {
conf *mq_pb.ConfigureTopicResponse // nil = topic doesn't exist
expiresAt time.Time
}
@ -66,16 +64,12 @@ type MessageQueueBroker struct {
// Removed gatewayRegistry - no longer needed
accessLock sync.Mutex
fca *filer_client.FilerClientAccessor
// TopicExists cache to reduce filer lookups
// Caches both positive (topic exists) and negative (topic doesn't exist) results
topicExistsCache map[string]*topicExistsCacheEntry
topicExistsCacheMu sync.RWMutex
topicExistsCacheTTL time.Duration
// TopicConf cache to reduce expensive filer reads and JSON unmarshaling
// Caches topic configuration to avoid 60% CPU overhead on every fetch
topicConfCache map[string]*topicConfCacheEntry
topicConfCacheMu sync.RWMutex
topicConfCacheTTL time.Duration
// Unified topic cache for both existence and configuration
// Caches topic config (positive: conf != nil) and non-existence (negative: conf == nil)
// Eliminates 60% CPU overhead from repeated filer reads and JSON unmarshaling
topicCache map[string]*topicCacheEntry
topicCacheMu sync.RWMutex
topicCacheTTL time.Duration
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@ -84,18 +78,16 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
subCoordinator := sub_coordinator.NewSubCoordinator()
mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
PubBalancer: pubBalancer,
SubCoordinator: subCoordinator,
offsetManager: nil, // Will be initialized below
topicExistsCache: make(map[string]*topicExistsCacheEntry),
topicExistsCacheTTL: 30 * time.Second, // Cache for 30 seconds to reduce filer load
topicConfCache: make(map[string]*topicConfCacheEntry),
topicConfCacheTTL: 30 * time.Second, // Cache topic config to avoid 60% CPU overhead
option: option,
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
PubBalancer: pubBalancer,
SubCoordinator: subCoordinator,
offsetManager: nil, // Will be initialized below
topicCache: make(map[string]*topicCacheEntry),
topicCacheTTL: 30 * time.Second, // Unified cache for existence + config (eliminates 60% CPU overhead)
}
// Create FilerClientAccessor that adapts broker's single filer to the new multi-filer interface
fca := &filer_client.FilerClientAccessor{

62
weed/mq/broker/broker_topic_conf_read_write.go

@ -20,14 +20,21 @@ import (
func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
// get or generate a local partition
topicKey := t.String()
// Check cache first to avoid expensive filer reads (60% CPU overhead!)
b.topicConfCacheMu.RLock()
if entry, found := b.topicConfCache[topicKey]; found {
// Check unified cache first to avoid expensive filer reads (60% CPU overhead!)
b.topicCacheMu.RLock()
if entry, found := b.topicCache[topicKey]; found {
if time.Now().Before(entry.expiresAt) {
conf := entry.conf
b.topicConfCacheMu.RUnlock()
glog.V(4).Infof("TopicConf cache HIT for %s", topicKey)
b.topicCacheMu.RUnlock()
// If conf is nil, topic was cached as non-existent
if conf == nil {
glog.V(4).Infof("Topic cache HIT for %s: topic doesn't exist", topicKey)
return nil, fmt.Errorf("topic %v not found (cached)", t)
}
glog.V(4).Infof("Topic cache HIT for %s", topicKey)
localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
if getOrGenError != nil {
glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
@ -36,24 +43,33 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio
return localTopicPartition, nil
}
}
b.topicConfCacheMu.RUnlock()
b.topicCacheMu.RUnlock()
// Cache miss or expired - read from filer
glog.V(4).Infof("TopicConf cache MISS for %s, reading from filer", topicKey)
glog.V(4).Infof("Topic cache MISS for %s, reading from filer", topicKey)
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
// Cache the result (even if error/not found - negative caching)
b.topicCacheMu.Lock()
if readConfErr != nil {
// Negative cache: topic doesn't exist
b.topicCache[topicKey] = &topicCacheEntry{
conf: nil,
expiresAt: time.Now().Add(b.topicCacheTTL),
}
b.topicCacheMu.Unlock()
glog.V(4).Infof("Topic cached as non-existent: %s", topicKey)
glog.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr)
}
// Cache the result
b.topicConfCacheMu.Lock()
b.topicConfCache[topicKey] = &topicConfCacheEntry{
// Positive cache: topic exists with config
b.topicCache[topicKey] = &topicCacheEntry{
conf: conf,
expiresAt: time.Now().Add(b.topicConfCacheTTL),
expiresAt: time.Now().Add(b.topicCacheTTL),
}
b.topicConfCacheMu.Unlock()
glog.V(4).Infof("TopicConf cached for %s", topicKey)
b.topicCacheMu.Unlock()
glog.V(4).Infof("Topic config cached for %s", topicKey)
localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
if getOrGenError != nil {
@ -63,14 +79,14 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio
return localTopicPartition, nil
}
// invalidateTopicConfCache removes a topic config from the cache
// Should be called when a topic configuration is updated
func (b *MessageQueueBroker) invalidateTopicConfCache(t topic.Topic) {
// invalidateTopicCache removes a topic from the unified cache
// Should be called when a topic is created, deleted, or config is updated
func (b *MessageQueueBroker) invalidateTopicCache(t topic.Topic) {
topicKey := t.String()
b.topicConfCacheMu.Lock()
delete(b.topicConfCache, topicKey)
b.topicConfCacheMu.Unlock()
glog.V(4).Infof("Invalidated TopicConf cache for %s", topicKey)
b.topicCacheMu.Lock()
delete(b.topicCache, topicKey)
b.topicCacheMu.Unlock()
glog.V(4).Infof("Invalidated topic cache for %s", topicKey)
}
func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {

Loading…
Cancel
Save