diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index c780a6cdf..55659ca0f 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -408,7 +408,9 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req) - v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + // collect .ec00 ~ .ec09 files + shardFileNames := make([]string, erasure_coding.DataShardsCount) + v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames) if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) } @@ -417,6 +419,12 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + for shardId := 0; shardId < DataShardsCount; shardId++ { + if shardFileNames[shardId] == "" { + return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId) + } + } + dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() // calculate .dat file size datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) @@ -425,7 +433,7 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ } // write .dat file from .ec00 ~ .ec09 files - if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil { + if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize, shardFileNames); err != nil { return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err) } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ee21eb803..5af354277 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -38,6 +38,22 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) { } } +func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { + l.ecVolumesLock.RLock() + defer l.ecVolumesLock.RUnlock() + + ecVolume, found = l.ecVolumes[vid] + if !found { + return + } + for _, ecShard := range ecVolume.Shards { + if ecShard.ShardId < erasure_coding.ShardId(len(shardFileNames)) { + shardFileNames[ecShard.ShardId] = erasure_coding.EcShardFileName(ecVolume.Collection, l.Directory, int(ecVolume.VolumeId)) + erasure_coding.ToExt(int(ecShard.ShardId)) + } + } + return +} + func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) { l.ecVolumesLock.RLock() defer l.ecVolumesLock.RUnlock() diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index a545d9975..9a415fc78 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -151,7 +151,7 @@ func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId } // WriteDatFile generates .dat from from .ec00 ~ .ec09 files -func WriteDatFile(baseFileName string, datFileSize int64) error { +func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error { datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if openErr != nil { @@ -161,13 +161,19 @@ func WriteDatFile(baseFileName string, datFileSize int64) error { inputFiles := make([]*os.File, DataShardsCount) + defer func() { + for shardId := 0; shardId < DataShardsCount; shardId++ { + if inputFiles[shardId] != nil { + inputFiles[shardId].Close() + } + } + }() + for shardId := 0; shardId < DataShardsCount; shardId++ { - shardFileName := baseFileName + ToExt(shardId) - inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0) + inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0) if openErr != nil { return openErr } - defer inputFiles[shardId].Close() } for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize { diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index b46803fb2..484ad3aad 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -116,6 +116,17 @@ func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, boo return nil, false } +// shardFiles is a list of shard files, which is used to return the shard locations +func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { + for _, location := range s.Locations { + if s, found := location.CollectEcShards(vid, shardFileNames); found { + ecVolume = s + found = true + } + } + return +} + func (s *Store) DestroyEcVolume(vid needle.VolumeId) { for _, location := range s.Locations { location.DestroyEcVolume(vid)