From 4e2af080df55a5eb2cc0ff9a4b786ca5219d7210 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 1 Jan 2026 15:39:54 -0800 Subject: [PATCH] optimize: enable immediate EC shard reporting during startup (#7933) * optimize: enable immediate EC shard reporting during startup Ported the immediate EC shard reporting feature from Enterprise to Community version. This allows the master to be notified about EC shards immediately during volume server startup, instead of waiting for the first heartbeat. Changes: 1. Updated NewStore to initialize notification channels BEFORE loading volumes (fixes potential nil panic). 2. Added ecShardNotifyHandler to report EC shards to NewEcShardsChan during startup. 3. Implemented non-blocking channel send for EC reporting to prevent deadlock when loading many EC shards (fixing the enterprise bug 17ac1290c). 4. Updated DiskLocation and EC loading logic to support the callback. This optimization improves cluster state consistency and startup speed for EC-heavy clusters. * optimize: report actual EC shard size during startup * optimize: increase notification channel buffer size to 1024 * optimize: fix variable shadowing in store.go --- weed/storage/disk_location.go | 4 +++- weed/storage/disk_location_ec.go | 19 ++++++++++++---- weed/storage/store.go | 39 +++++++++++++++++++++++++++----- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 4fcc066fc..e63c3cba1 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -34,6 +34,8 @@ type DiskLocation struct { ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex + ecShardNotifyHandler func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) + isDiskSpaceLow bool closeCh chan struct{} } @@ -259,7 +261,7 @@ func (l *DiskLocation) loadExistingVolumesWithId(needleMapKind NeedleMapKind, ld l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId) glog.V(2).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId) - l.loadAllEcShards() + l.loadAllEcShardsWithCallback(l.ecShardNotifyHandler) glog.V(2).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId) } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index b370555da..73b37e893 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -137,6 +137,10 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding } func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) { + return l.loadEcShardsWithCallback(shards, collection, vid, nil) +} + +func (l *DiskLocation) loadEcShardsWithCallback(shards []string, collection string, vid needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) { for _, shard := range shards { shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64) @@ -149,16 +153,23 @@ func (l *DiskLocation) loadEcShards(shards []string, collection string, vid need return fmt.Errorf("shard ID out of range: %d", shardId) } - _, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId)) + ecVolume, err := l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId)) if err != nil { return fmt.Errorf("failed to load ec shard %v: %w", shard, err) } + if onShardLoad != nil { + onShardLoad(collection, vid, erasure_coding.ShardId(shardId), ecVolume) + } } return nil } func (l *DiskLocation) loadAllEcShards() (err error) { + return l.loadAllEcShardsWithCallback(nil) +} + +func (l *DiskLocation) loadAllEcShardsWithCallback(onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) { dirEntries, err := os.ReadDir(l.Directory) if err != nil { @@ -222,7 +233,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) { } if ext == ".ecx" && volumeId == prevVolumeId && collection == prevCollection { - l.handleFoundEcxFile(sameVolumeShards, collection, volumeId) + l.handleFoundEcxFile(sameVolumeShards, collection, volumeId, onShardLoad) reset() continue } @@ -277,7 +288,7 @@ func (l *DiskLocation) EcShardCount() int { // handleFoundEcxFile processes a complete group of EC shards when their .ecx file is found. // This includes validation, loading, and cleanup of incomplete/invalid EC volumes. -func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId) { +func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) { // Check if this is an incomplete EC encoding (not a distributed EC volume) // Key distinction: if .dat file still exists, EC encoding may have failed // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers @@ -297,7 +308,7 @@ func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, vo } // Attempt to load the EC shards - if err := l.loadEcShards(shards, collection, volumeId); err != nil { + if err := l.loadEcShardsWithCallback(shards, collection, volumeId, onShardLoad); err != nil { // If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used // If .dat is gone, log error but don't clean up (may be waiting for shards from other servers) if datExists { diff --git a/weed/storage/store.go b/weed/storage/store.go index 7a336d1ff..b214b3533 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -86,6 +86,12 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) + s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024) + s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024) + + s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024) + s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024) + var wg sync.WaitGroup for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], int32(maxVolumeCounts[i]), minFreeSpaces[i], idxFolder, diskTypes[i]) @@ -93,6 +99,33 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) diskId := uint32(i) // Track disk ID + + location.ecShardNotifyHandler = func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) { + var shardSize int64 + if shard, found := ecVolume.FindEcVolumeShard(shardId); found { + shardSize = shard.Size() + } + si := erasure_coding.NewShardsInfo() + si.Set(shardId, erasure_coding.ShardSize(shardSize)) + + // Use non-blocking send during startup to avoid deadlock + // The channel reader only starts after connecting to master, but we're loading during startup + select { + case s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ + Id: uint32(vid), + Collection: collection, + EcIndexBits: si.Bitmap(), + ShardSizes: si.SizesInt64(), + DiskType: string(location.DiskType), + ExpireAtSec: ecVolume.ExpireAtSec, + DiskId: diskId, + }: + default: + // Channel full during startup - this is OK, heartbeat will report EC shards later + glog.V(2).Infof("NewEcShardsChan full during startup for shard %d.%d, will be reported in heartbeat", vid, shardId) + } + } + wg.Add(1) go func(id uint32, diskLoc *DiskLocation) { defer wg.Done() @@ -101,12 +134,6 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, } wg.Wait() - s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) - s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) - - s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3) - s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3) - return } func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {