You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							376 lines
						
					
					
						
							14 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							376 lines
						
					
					
						
							14 KiB
						
					
					
				| package broker | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"encoding/binary" | |
| 	"fmt" | |
| 	"io" | |
| 	"strings" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/logstore" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/mq/topic" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |
| ) | |
| 
 | |
| func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { | |
| 	// get or generate a local partition using cached topic config | |
| 	conf, err := b.getTopicConfFromCache(t) | |
| 	if err != nil { | |
| 		glog.Errorf("topic %v not found: %v", t, err) | |
| 		return nil, fmt.Errorf("topic %v not found: %w", t, err) | |
| 	} | |
| 
 | |
| 	localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) | |
| 	if getOrGenError != nil { | |
| 		glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) | |
| 		return nil, fmt.Errorf("topic %v partition %v not setup: %w", t, partition, getOrGenError) | |
| 	} | |
| 	return localTopicPartition, nil | |
| } | |
| 
 | |
| // invalidateTopicCache removes a topic from the unified cache | |
| // Should be called when a topic is created, deleted, or config is updated | |
| func (b *MessageQueueBroker) invalidateTopicCache(t topic.Topic) { | |
| 	topicKey := t.String() | |
| 	b.topicCacheMu.Lock() | |
| 	delete(b.topicCache, topicKey) | |
| 	b.topicCacheMu.Unlock() | |
| 	glog.V(4).Infof("Invalidated topic cache for %s", topicKey) | |
| } | |
| 
 | |
| // getTopicConfFromCache reads topic configuration with caching | |
| // Returns the config or error if not found. Uses unified cache to avoid expensive filer reads. | |
| // On cache miss, validates broker assignments to ensure they're still active (14% CPU overhead). | |
| // This is the public API for reading topic config - always use this instead of direct filer reads. | |
| func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.ConfigureTopicResponse, error) { | |
| 	topicKey := t.String() | |
| 
 | |
| 	// Check unified cache first | |
| 	b.topicCacheMu.RLock() | |
| 	if entry, found := b.topicCache[topicKey]; found { | |
| 		if time.Now().Before(entry.expiresAt) { | |
| 			conf := entry.conf | |
| 			b.topicCacheMu.RUnlock() | |
| 
 | |
| 			// If conf is nil, topic was cached as non-existent | |
| 			if conf == nil { | |
| 				glog.V(4).Infof("Topic cache HIT for %s: topic doesn't exist", topicKey) | |
| 				return nil, fmt.Errorf("topic %v not found (cached)", t) | |
| 			} | |
| 
 | |
| 			glog.V(4).Infof("Topic cache HIT for %s (skipping assignment validation)", topicKey) | |
| 			// Cache hit - return immediately without validating assignments | |
| 			// Assignments were validated when we first cached this config | |
| 			return conf, nil | |
| 		} | |
| 	} | |
| 	b.topicCacheMu.RUnlock() | |
| 
 | |
| 	// Cache miss or expired - read from filer | |
| 	glog.V(4).Infof("Topic cache MISS for %s, reading from filer", topicKey) | |
| 	conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) | |
| 
 | |
| 	if readConfErr != nil { | |
| 		// Negative cache: topic doesn't exist | |
| 		b.topicCacheMu.Lock() | |
| 		b.topicCache[topicKey] = &topicCacheEntry{ | |
| 			conf:      nil, | |
| 			expiresAt: time.Now().Add(b.topicCacheTTL), | |
| 		} | |
| 		b.topicCacheMu.Unlock() | |
| 		glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) | |
| 		return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr) | |
| 	} | |
| 
 | |
| 	// Validate broker assignments before caching (NOT holding cache lock) | |
| 	// This ensures cached configs always have valid broker assignments | |
| 	// Only done on cache miss (not on every lookup), saving 14% CPU | |
| 	glog.V(4).Infof("Validating broker assignments for %s", topicKey) | |
| 	hasChanges := b.ensureTopicActiveAssignmentsUnsafe(t, conf) | |
| 	if hasChanges { | |
| 		glog.V(0).Infof("topic %v partition assignments updated due to broker changes", t) | |
| 		// Save updated assignments to filer immediately to ensure persistence | |
| 		if err := b.fca.SaveTopicConfToFiler(t, conf); err != nil { | |
| 			glog.Errorf("failed to save updated topic config for %s: %v", topicKey, err) | |
| 			// Don't cache on error - let next request retry | |
| 			return conf, err | |
| 		} | |
| 		// CRITICAL FIX: Invalidate cache while holding lock to prevent race condition | |
| 		// Before the fix, between checking the cache and invalidating it, another goroutine | |
| 		// could read stale data. Now we hold the lock throughout. | |
| 		b.topicCacheMu.Lock() | |
| 		delete(b.topicCache, topicKey) | |
| 		// Cache the updated config with validated assignments | |
| 		b.topicCache[topicKey] = &topicCacheEntry{ | |
| 			conf:      conf, | |
| 			expiresAt: time.Now().Add(b.topicCacheTTL), | |
| 		} | |
| 		b.topicCacheMu.Unlock() | |
| 		glog.V(4).Infof("Updated cache for %s after assignment update", topicKey) | |
| 		return conf, nil | |
| 	} | |
| 
 | |
| 	// Positive cache: topic exists with validated assignments | |
| 	b.topicCacheMu.Lock() | |
| 	b.topicCache[topicKey] = &topicCacheEntry{ | |
| 		conf:      conf, | |
| 		expiresAt: time.Now().Add(b.topicCacheTTL), | |
| 	} | |
| 	b.topicCacheMu.Unlock() | |
| 	glog.V(4).Infof("Topic config cached for %s", topicKey) | |
| 
 | |
| 	return conf, nil | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { | |
| 	b.accessLock.Lock() | |
| 	defer b.accessLock.Unlock() | |
| 
 | |
| 	if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { | |
| 		localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf) | |
| 		if err != nil { | |
| 			return nil, false, err | |
| 		} | |
| 	} | |
| 	return localPartition, isGenerated, nil | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { | |
| 	self := b.option.BrokerAddress() | |
| 	glog.V(4).Infof("genLocalPartitionFromFiler for %s %s, self=%s", t, partition, self) | |
| 	glog.V(4).Infof("conf.BrokerPartitionAssignments: %v", conf.BrokerPartitionAssignments) | |
| 	for _, assignment := range conf.BrokerPartitionAssignments { | |
| 		assignmentPartition := topic.FromPbPartition(assignment.Partition) | |
| 		glog.V(4).Infof("checking assignment: LeaderBroker=%s, Partition=%s", assignment.LeaderBroker, assignmentPartition) | |
| 		glog.V(4).Infof("comparing self=%s with LeaderBroker=%s: %v", self, assignment.LeaderBroker, assignment.LeaderBroker == string(self)) | |
| 		glog.V(4).Infof("comparing partition=%s with assignmentPartition=%s: %v", partition.String(), assignmentPartition.String(), partition.Equals(assignmentPartition)) | |
| 		glog.V(4).Infof("logical comparison (RangeStart, RangeStop only): %v", partition.LogicalEquals(assignmentPartition)) | |
| 		glog.V(4).Infof("partition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", partition.RangeStart, partition.RangeStop, partition.RingSize, partition.UnixTimeNs) | |
| 		glog.V(4).Infof("assignmentPartition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", assignmentPartition.RangeStart, assignmentPartition.RangeStop, assignmentPartition.RingSize, assignmentPartition.UnixTimeNs) | |
| 		if assignment.LeaderBroker == string(self) && partition.LogicalEquals(assignmentPartition) { | |
| 			glog.V(4).Infof("Creating local partition for %s %s", t, partition) | |
| 			localPartition = topic.NewLocalPartition(partition, b.option.LogFlushInterval, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) | |
| 
 | |
| 			// Initialize offset from existing data to ensure continuity on restart | |
| 			b.initializePartitionOffsetFromExistingData(localPartition, t, partition) | |
| 
 | |
| 			b.localTopicManager.AddLocalPartition(t, localPartition) | |
| 			isGenerated = true | |
| 			glog.V(4).Infof("Successfully added local partition %s %s to localTopicManager", t, partition) | |
| 			break | |
| 		} | |
| 	} | |
| 
 | |
| 	if !isGenerated { | |
| 		glog.V(4).Infof("No matching assignment found for %s %s", t, partition) | |
| 	} | |
| 
 | |
| 	return localPartition, isGenerated, nil | |
| } | |
| 
 | |
| // ensureTopicActiveAssignmentsUnsafe validates that partition assignments reference active brokers | |
| // Returns true if assignments were changed. Caller must save config to filer if hasChanges=true. | |
| // Note: Assumes caller holds topicCacheMu lock or is OK with concurrent access to conf | |
| func (b *MessageQueueBroker) ensureTopicActiveAssignmentsUnsafe(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (hasChanges bool) { | |
| 	// also fix assignee broker if invalid | |
| 	hasChanges = pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments) | |
| 	return hasChanges | |
| } | |
| 
 | |
| func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { | |
| 	// Validate and save if needed | |
| 	hasChanges := b.ensureTopicActiveAssignmentsUnsafe(t, conf) | |
| 	if hasChanges { | |
| 		glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) | |
| 		if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil { | |
| 			return err | |
| 		} | |
| 	} | |
| 
 | |
| 	return err | |
| } | |
| 
 | |
| // initializePartitionOffsetFromExistingData initializes the LogBuffer offset from existing data on filer | |
| // This ensures offset continuity when SMQ restarts | |
| func (b *MessageQueueBroker) initializePartitionOffsetFromExistingData(localPartition *topic.LocalPartition, t topic.Topic, partition topic.Partition) { | |
| 	// Create a function to get the highest existing offset from chunk metadata | |
| 	getHighestOffsetFn := func() (int64, error) { | |
| 		// Use the existing chunk metadata approach to find the highest offset | |
| 		if b.fca == nil { | |
| 			return -1, fmt.Errorf("no filer client accessor available") | |
| 		} | |
| 
 | |
| 		// Use the same logic as getOffsetRangeFromChunkMetadata but only get the highest offset | |
| 		_, highWaterMark, err := b.getOffsetRangeFromChunkMetadata(t, partition) | |
| 		if err != nil { | |
| 			return -1, err | |
| 		} | |
| 
 | |
| 		// The high water mark is the next offset to be assigned, so the highest existing offset is hwm - 1 | |
| 		if highWaterMark > 0 { | |
| 			return highWaterMark - 1, nil | |
| 		} | |
| 
 | |
| 		return -1, nil // No existing data | |
| 	} | |
| 
 | |
| 	// Initialize the LogBuffer offset from existing data | |
| 	if err := localPartition.LogBuffer.InitializeOffsetFromExistingData(getHighestOffsetFn); err != nil { | |
| 		glog.V(0).Infof("Failed to initialize offset for partition %s %s: %v", t, partition, err) | |
| 	} | |
| } | |
| 
 | |
| // getOffsetRangeFromChunkMetadata reads chunk metadata to find both earliest and latest offsets | |
| func (b *MessageQueueBroker) getOffsetRangeFromChunkMetadata(t topic.Topic, partition topic.Partition) (earliestOffset int64, highWaterMark int64, err error) { | |
| 	if b.fca == nil { | |
| 		return 0, 0, fmt.Errorf("filer client accessor not available") | |
| 	} | |
| 
 | |
| 	// Get the topic path and find the latest version | |
| 	topicPath := fmt.Sprintf("/topics/%s/%s", t.Namespace, t.Name) | |
| 
 | |
| 	// First, list the topic versions to find the latest | |
| 	var latestVersion string | |
| 	err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ | |
| 			Directory: topicPath, | |
| 		}) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 
 | |
| 		for { | |
| 			resp, err := stream.Recv() | |
| 			if err == io.EOF { | |
| 				break | |
| 			} | |
| 			if err != nil { | |
| 				return err | |
| 			} | |
| 			if resp.Entry.IsDirectory && strings.HasPrefix(resp.Entry.Name, "v") { | |
| 				if latestVersion == "" || resp.Entry.Name > latestVersion { | |
| 					latestVersion = resp.Entry.Name | |
| 				} | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return 0, 0, fmt.Errorf("failed to list topic versions: %v", err) | |
| 	} | |
| 
 | |
| 	if latestVersion == "" { | |
| 		glog.V(0).Infof("No version directory found for topic %s", t) | |
| 		return 0, 0, nil | |
| 	} | |
| 
 | |
| 	// Find the partition directory | |
| 	versionPath := fmt.Sprintf("%s/%s", topicPath, latestVersion) | |
| 	var partitionDir string | |
| 	err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ | |
| 			Directory: versionPath, | |
| 		}) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 
 | |
| 		// Look for the partition directory that matches our partition range | |
| 		targetPartitionName := fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop) | |
| 		for { | |
| 			resp, err := stream.Recv() | |
| 			if err == io.EOF { | |
| 				break | |
| 			} | |
| 			if err != nil { | |
| 				return err | |
| 			} | |
| 			if resp.Entry.IsDirectory && resp.Entry.Name == targetPartitionName { | |
| 				partitionDir = resp.Entry.Name | |
| 				break | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return 0, 0, fmt.Errorf("failed to list partition directories: %v", err) | |
| 	} | |
| 
 | |
| 	if partitionDir == "" { | |
| 		glog.V(0).Infof("No partition directory found for topic %s partition %s", t, partition) | |
| 		return 0, 0, nil | |
| 	} | |
| 
 | |
| 	// Scan all message files to find the highest offset_max and lowest offset_min | |
| 	partitionPath := fmt.Sprintf("%s/%s", versionPath, partitionDir) | |
| 	highWaterMark = 0 | |
| 	earliestOffset = -1 // -1 indicates no data found yet | |
|  | |
| 	err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ | |
| 			Directory: partitionPath, | |
| 		}) | |
| 		if err != nil { | |
| 			return err | |
| 		} | |
| 
 | |
| 		for { | |
| 			resp, err := stream.Recv() | |
| 			if err == io.EOF { | |
| 				break | |
| 			} | |
| 			if err != nil { | |
| 				return err | |
| 			} | |
| 			if !resp.Entry.IsDirectory && resp.Entry.Name != "checkpoint.offset" { | |
| 				// Check for offset ranges in Extended attributes (both log files and parquet files) | |
| 				if resp.Entry.Extended != nil { | |
| 					fileType := "log" | |
| 					if strings.HasSuffix(resp.Entry.Name, ".parquet") { | |
| 						fileType = "parquet" | |
| 					} | |
| 
 | |
| 					// Track maximum offset for high water mark | |
| 					if maxOffsetBytes, exists := resp.Entry.Extended[mq.ExtendedAttrOffsetMax]; exists && len(maxOffsetBytes) == 8 { | |
| 						maxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes)) | |
| 						if maxOffset > highWaterMark { | |
| 							highWaterMark = maxOffset | |
| 						} | |
| 						glog.V(2).Infof("%s file %s has offset_max=%d", fileType, resp.Entry.Name, maxOffset) | |
| 					} | |
| 
 | |
| 					// Track minimum offset for earliest offset | |
| 					if minOffsetBytes, exists := resp.Entry.Extended[mq.ExtendedAttrOffsetMin]; exists && len(minOffsetBytes) == 8 { | |
| 						minOffset := int64(binary.BigEndian.Uint64(minOffsetBytes)) | |
| 						if earliestOffset == -1 || minOffset < earliestOffset { | |
| 							earliestOffset = minOffset | |
| 						} | |
| 						glog.V(2).Infof("%s file %s has offset_min=%d", fileType, resp.Entry.Name, minOffset) | |
| 					} | |
| 				} | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return 0, 0, fmt.Errorf("failed to scan message files: %v", err) | |
| 	} | |
| 
 | |
| 	// High water mark is the next offset after the highest written offset | |
| 	if highWaterMark > 0 { | |
| 		highWaterMark++ | |
| 	} | |
| 
 | |
| 	// If no data found, set earliest offset to 0 | |
| 	if earliestOffset == -1 { | |
| 		earliestOffset = 0 | |
| 	} | |
| 
 | |
| 	glog.V(0).Infof("Offset range for topic %s partition %s: earliest=%d, highWaterMark=%d", t, partition, earliestOffset, highWaterMark) | |
| 	return earliestOffset, highWaterMark, nil | |
| }
 |