From b9ad795dce861ca1da84de169ce6af442e1525f3 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 23:20:43 -0700 Subject: [PATCH] refactor: merge topicExistsCache and topicConfCache into unified topicCache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merged two separate caches into one unified cache to simplify code and reduce memory usage. The unified cache stores both topic existence and configuration in a single structure. Design: - Single topicCacheEntry with optional *ConfigureTopicResponse - If conf != nil: topic exists with full configuration - If conf == nil: topic doesn't exist (negative cache) - Same 30-second TTL for both existence and config caching Changes to broker_server.go: - Removed topicExistsCacheEntry struct - Removed topicConfCacheEntry struct - Added unified topicCacheEntry struct (conf can be nil) - Removed topicExistsCache, topicExistsCacheMu, topicExistsCacheTTL - Removed topicConfCache, topicConfCacheMu, topicConfCacheTTL - Added unified topicCache, topicCacheMu, topicCacheTTL - Updated NewMessageBroker() to initialize single cache Changes to broker_topic_conf_read_write.go: - Modified GetOrGenerateLocalPartition() to use unified cache - Added negative caching (conf=nil) when topic not found - Renamed invalidateTopicConfCache() to invalidateTopicCache() - Single cache lookup instead of two separate checks Changes to broker_grpc_lookup.go: - Modified TopicExists() to use unified cache - Check: exists = (entry.conf != nil) - Only cache negative results (conf=nil) in TopicExists - Positive results cached by GetOrGenerateLocalPartition - Removed old invalidateTopicExistsCache() function Changes to broker_grpc_configure.go: - Updated invalidateTopicExistsCache() calls to invalidateTopicCache() - Two call sites updated Benefits: 1. Code Simplification: One cache instead of two 2. Memory Reduction: Single map, single mutex, single TTL 3. Consistency: No risk of cache desync between existence and config 4. Less Lock Contention: One lock instead of two 5. Easier Maintenance: Single invalidation function 6. Same Performance: Still eliminates 60% CPU overhead Cache Behavior: - TopicExists: Lightweight check, only caches negative (conf=nil) - GetOrGenerateLocalPartition: Full config read, caches positive (conf != nil) - Both share same 30s TTL - Both use same invalidation on topic create/update/delete Testing: - ✅ Compiles successfully - Ready for integration testing This refactor maintains all performance benefits while simplifying the codebase and reducing memory footprint. --- weed/mq/broker/broker_grpc_configure.go | 8 +-- weed/mq/broker/broker_grpc_lookup.go | 51 +++++++-------- weed/mq/broker/broker_server.go | 50 +++++++-------- .../mq/broker/broker_topic_conf_read_write.go | 62 ++++++++++++------- 4 files changed, 88 insertions(+), 83 deletions(-) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 40dd71db1..3d3ed0d1c 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -77,8 +77,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, fmt.Errorf("update topic schemas: %w", err) } - // Invalidate TopicExists cache since we just updated the topic - b.invalidateTopicExistsCache(t) + // Invalidate topic cache since we just updated the topic + b.invalidateTopicCache(t) glog.V(0).Infof("updated schemas for topic %s", request.Topic) return resp, nil @@ -105,8 +105,8 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. return nil, fmt.Errorf("configure topic: %w", err) } - // Invalidate TopicExists cache since we just created/updated the topic - b.invalidateTopicExistsCache(t) + // Invalidate topic cache since we just created/updated the topic + b.invalidateTopicCache(t) b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 680fba87b..17187e526 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -179,7 +179,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List } // TopicExists checks if a topic exists in memory or filer -// Caches both positive and negative results to reduce filer load +// Uses unified cache (checks if config is non-nil) to reduce filer load func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.TopicExistsRequest) (*mq_pb.TopicExistsResponse, error) { if !b.isLockOwner() { var resp *mq_pb.TopicExistsResponse @@ -210,19 +210,20 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top return &mq_pb.TopicExistsResponse{Exists: true}, nil } - // Check cache for filer lookup results (both positive and negative) - b.topicExistsCacheMu.RLock() - if entry, found := b.topicExistsCache[topicKey]; found { + // Check unified cache (if conf != nil, topic exists; if conf == nil, doesn't exist) + b.topicCacheMu.RLock() + if entry, found := b.topicCache[topicKey]; found { if time.Now().Before(entry.expiresAt) { - b.topicExistsCacheMu.RUnlock() - glog.V(4).Infof("TopicExists cache HIT for %s: %v", topicKey, entry.exists) - return &mq_pb.TopicExistsResponse{Exists: entry.exists}, nil + exists := entry.conf != nil + b.topicCacheMu.RUnlock() + glog.V(4).Infof("Topic cache HIT for %s: exists=%v", topicKey, exists) + return &mq_pb.TopicExistsResponse{Exists: exists}, nil } } - b.topicExistsCacheMu.RUnlock() + b.topicCacheMu.RUnlock() - // Cache miss or expired - query filer for persisted topics - glog.V(4).Infof("TopicExists cache MISS for %s, querying filer", topicKey) + // Cache miss or expired - query filer for persisted topics (lightweight check) + glog.V(4).Infof("Topic cache MISS for %s, querying filer for existence", topicKey) exists := false err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name) @@ -242,28 +243,24 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top return &mq_pb.TopicExistsResponse{Exists: false}, nil } - // Update cache with result (both positive and negative) - b.topicExistsCacheMu.Lock() - b.topicExistsCache[topicKey] = &topicExistsCacheEntry{ - exists: exists, - expiresAt: time.Now().Add(b.topicExistsCacheTTL), + // Update unified cache with lightweight result (don't read full config yet) + // Cache existence info: conf=nil for non-existent (we don't have full config yet for existent) + b.topicCacheMu.Lock() + if !exists { + // Negative cache: topic definitely doesn't exist + b.topicCache[topicKey] = &topicCacheEntry{ + conf: nil, + expiresAt: time.Now().Add(b.topicCacheTTL), + } + glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) } - b.topicExistsCacheMu.Unlock() - glog.V(4).Infof("TopicExists cached result for %s: %v", topicKey, exists) + // Note: For positive existence, we don't cache here to avoid partial state + // The config will be cached when GetOrGenerateLocalPartition reads it + b.topicCacheMu.Unlock() return &mq_pb.TopicExistsResponse{Exists: exists}, nil } -// invalidateTopicExistsCache removes a topic from the cache -// Should be called when a topic is created or deleted -func (b *MessageQueueBroker) invalidateTopicExistsCache(t topic.Topic) { - topicKey := t.String() - b.topicExistsCacheMu.Lock() - delete(b.topicExistsCache, topicKey) - b.topicExistsCacheMu.Unlock() - glog.V(4).Infof("Invalidated TopicExists cache for %s", topicKey) -} - // GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) { if !b.isLockOwner() { diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index ecf05d663..38e022a7c 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -39,13 +39,11 @@ func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress { return pb.NewServerAddress(option.Ip, option.Port, 0) } -type topicExistsCacheEntry struct { - exists bool - expiresAt time.Time -} - -type topicConfCacheEntry struct { - conf *mq_pb.ConfigureTopicResponse +// topicCacheEntry caches both topic existence and configuration +// If conf is nil, topic doesn't exist (negative cache) +// If conf is non-nil, topic exists with this configuration (positive cache) +type topicCacheEntry struct { + conf *mq_pb.ConfigureTopicResponse // nil = topic doesn't exist expiresAt time.Time } @@ -66,16 +64,12 @@ type MessageQueueBroker struct { // Removed gatewayRegistry - no longer needed accessLock sync.Mutex fca *filer_client.FilerClientAccessor - // TopicExists cache to reduce filer lookups - // Caches both positive (topic exists) and negative (topic doesn't exist) results - topicExistsCache map[string]*topicExistsCacheEntry - topicExistsCacheMu sync.RWMutex - topicExistsCacheTTL time.Duration - // TopicConf cache to reduce expensive filer reads and JSON unmarshaling - // Caches topic configuration to avoid 60% CPU overhead on every fetch - topicConfCache map[string]*topicConfCacheEntry - topicConfCacheMu sync.RWMutex - topicConfCacheTTL time.Duration + // Unified topic cache for both existence and configuration + // Caches topic config (positive: conf != nil) and non-existence (negative: conf == nil) + // Eliminates 60% CPU overhead from repeated filer reads and JSON unmarshaling + topicCache map[string]*topicCacheEntry + topicCacheMu sync.RWMutex + topicCacheTTL time.Duration } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -84,18 +78,16 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial subCoordinator := sub_coordinator.NewSubCoordinator() mqBroker = &MessageQueueBroker{ - option: option, - grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), - filers: make(map[pb.ServerAddress]struct{}), - localTopicManager: topic.NewLocalTopicManager(), - PubBalancer: pubBalancer, - SubCoordinator: subCoordinator, - offsetManager: nil, // Will be initialized below - topicExistsCache: make(map[string]*topicExistsCacheEntry), - topicExistsCacheTTL: 30 * time.Second, // Cache for 30 seconds to reduce filer load - topicConfCache: make(map[string]*topicConfCacheEntry), - topicConfCacheTTL: 30 * time.Second, // Cache topic config to avoid 60% CPU overhead + option: option, + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), + filers: make(map[pb.ServerAddress]struct{}), + localTopicManager: topic.NewLocalTopicManager(), + PubBalancer: pubBalancer, + SubCoordinator: subCoordinator, + offsetManager: nil, // Will be initialized below + topicCache: make(map[string]*topicCacheEntry), + topicCacheTTL: 30 * time.Second, // Unified cache for existence + config (eliminates 60% CPU overhead) } // Create FilerClientAccessor that adapts broker's single filer to the new multi-filer interface fca := &filer_client.FilerClientAccessor{ diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index f02d2ac75..f926afd01 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -20,14 +20,21 @@ import ( func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { // get or generate a local partition topicKey := t.String() - - // Check cache first to avoid expensive filer reads (60% CPU overhead!) - b.topicConfCacheMu.RLock() - if entry, found := b.topicConfCache[topicKey]; found { + + // Check unified cache first to avoid expensive filer reads (60% CPU overhead!) + b.topicCacheMu.RLock() + if entry, found := b.topicCache[topicKey]; found { if time.Now().Before(entry.expiresAt) { conf := entry.conf - b.topicConfCacheMu.RUnlock() - glog.V(4).Infof("TopicConf cache HIT for %s", topicKey) + b.topicCacheMu.RUnlock() + + // If conf is nil, topic was cached as non-existent + if conf == nil { + glog.V(4).Infof("Topic cache HIT for %s: topic doesn't exist", topicKey) + return nil, fmt.Errorf("topic %v not found (cached)", t) + } + + glog.V(4).Infof("Topic cache HIT for %s", topicKey) localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) if getOrGenError != nil { glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) @@ -36,24 +43,33 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio return localTopicPartition, nil } } - b.topicConfCacheMu.RUnlock() - + b.topicCacheMu.RUnlock() + // Cache miss or expired - read from filer - glog.V(4).Infof("TopicConf cache MISS for %s, reading from filer", topicKey) + glog.V(4).Infof("Topic cache MISS for %s, reading from filer", topicKey) conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) + + // Cache the result (even if error/not found - negative caching) + b.topicCacheMu.Lock() if readConfErr != nil { + // Negative cache: topic doesn't exist + b.topicCache[topicKey] = &topicCacheEntry{ + conf: nil, + expiresAt: time.Now().Add(b.topicCacheTTL), + } + b.topicCacheMu.Unlock() + glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) glog.Errorf("topic %v not found: %v", t, readConfErr) return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr) } - - // Cache the result - b.topicConfCacheMu.Lock() - b.topicConfCache[topicKey] = &topicConfCacheEntry{ + + // Positive cache: topic exists with config + b.topicCache[topicKey] = &topicCacheEntry{ conf: conf, - expiresAt: time.Now().Add(b.topicConfCacheTTL), + expiresAt: time.Now().Add(b.topicCacheTTL), } - b.topicConfCacheMu.Unlock() - glog.V(4).Infof("TopicConf cached for %s", topicKey) + b.topicCacheMu.Unlock() + glog.V(4).Infof("Topic config cached for %s", topicKey) localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) if getOrGenError != nil { @@ -63,14 +79,14 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio return localTopicPartition, nil } -// invalidateTopicConfCache removes a topic config from the cache -// Should be called when a topic configuration is updated -func (b *MessageQueueBroker) invalidateTopicConfCache(t topic.Topic) { +// invalidateTopicCache removes a topic from the unified cache +// Should be called when a topic is created, deleted, or config is updated +func (b *MessageQueueBroker) invalidateTopicCache(t topic.Topic) { topicKey := t.String() - b.topicConfCacheMu.Lock() - delete(b.topicConfCache, topicKey) - b.topicConfCacheMu.Unlock() - glog.V(4).Infof("Invalidated TopicConf cache for %s", topicKey) + b.topicCacheMu.Lock() + delete(b.topicCache, topicKey) + b.topicCacheMu.Unlock() + glog.V(4).Infof("Invalidated topic cache for %s", topicKey) } func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {