diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 4fcc066fc..e63c3cba1 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -34,6 +34,8 @@ type DiskLocation struct { ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex + ecShardNotifyHandler func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) + isDiskSpaceLow bool closeCh chan struct{} } @@ -259,7 +261,7 @@ func (l *DiskLocation) loadExistingVolumesWithId(needleMapKind NeedleMapKind, ld l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId) glog.V(2).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId) - l.loadAllEcShards() + l.loadAllEcShardsWithCallback(l.ecShardNotifyHandler) glog.V(2).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId) } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index b370555da..73b37e893 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -137,6 +137,10 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding } func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) { + return l.loadEcShardsWithCallback(shards, collection, vid, nil) +} + +func (l *DiskLocation) loadEcShardsWithCallback(shards []string, collection string, vid needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) { for _, shard := range shards { shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64) @@ -149,16 +153,23 @@ func (l *DiskLocation) loadEcShards(shards []string, collection string, vid need return fmt.Errorf("shard ID out of range: %d", shardId) } - _, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId)) + ecVolume, err := l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId)) if err != nil { return fmt.Errorf("failed to load ec shard %v: %w", shard, err) } + if onShardLoad != nil { + onShardLoad(collection, vid, erasure_coding.ShardId(shardId), ecVolume) + } } return nil } func (l *DiskLocation) loadAllEcShards() (err error) { + return l.loadAllEcShardsWithCallback(nil) +} + +func (l *DiskLocation) loadAllEcShardsWithCallback(onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) { dirEntries, err := os.ReadDir(l.Directory) if err != nil { @@ -222,7 +233,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) { } if ext == ".ecx" && volumeId == prevVolumeId && collection == prevCollection { - l.handleFoundEcxFile(sameVolumeShards, collection, volumeId) + l.handleFoundEcxFile(sameVolumeShards, collection, volumeId, onShardLoad) reset() continue } @@ -277,7 +288,7 @@ func (l *DiskLocation) EcShardCount() int { // handleFoundEcxFile processes a complete group of EC shards when their .ecx file is found. // This includes validation, loading, and cleanup of incomplete/invalid EC volumes. -func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId) { +func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) { // Check if this is an incomplete EC encoding (not a distributed EC volume) // Key distinction: if .dat file still exists, EC encoding may have failed // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers @@ -297,7 +308,7 @@ func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, vo } // Attempt to load the EC shards - if err := l.loadEcShards(shards, collection, volumeId); err != nil { + if err := l.loadEcShardsWithCallback(shards, collection, volumeId, onShardLoad); err != nil { // If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used // If .dat is gone, log error but don't clean up (may be waiting for shards from other servers) if datExists { diff --git a/weed/storage/store.go b/weed/storage/store.go index 7a336d1ff..b214b3533 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -86,6 +86,12 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) + s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024) + s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024) + + s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024) + s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024) + var wg sync.WaitGroup for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], int32(maxVolumeCounts[i]), minFreeSpaces[i], idxFolder, diskTypes[i]) @@ -93,6 +99,33 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) diskId := uint32(i) // Track disk ID + + location.ecShardNotifyHandler = func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) { + var shardSize int64 + if shard, found := ecVolume.FindEcVolumeShard(shardId); found { + shardSize = shard.Size() + } + si := erasure_coding.NewShardsInfo() + si.Set(shardId, erasure_coding.ShardSize(shardSize)) + + // Use non-blocking send during startup to avoid deadlock + // The channel reader only starts after connecting to master, but we're loading during startup + select { + case s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ + Id: uint32(vid), + Collection: collection, + EcIndexBits: si.Bitmap(), + ShardSizes: si.SizesInt64(), + DiskType: string(location.DiskType), + ExpireAtSec: ecVolume.ExpireAtSec, + DiskId: diskId, + }: + default: + // Channel full during startup - this is OK, heartbeat will report EC shards later + glog.V(2).Infof("NewEcShardsChan full during startup for shard %d.%d, will be reported in heartbeat", vid, shardId) + } + } + wg.Add(1) go func(id uint32, diskLoc *DiskLocation) { defer wg.Done() @@ -101,12 +134,6 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, } wg.Wait() - s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) - s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3) - - s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3) - s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3) - return } func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {