From 3a8c1055a24094d22bdc33bdd01435097a6268a9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 27 May 2019 22:00:36 -0700 Subject: [PATCH] refactoring ecx to ecVolume --- weed/storage/disk_location_ec.go | 11 ++++- weed/storage/erasure_coding/ec_shard.go | 42 ---------------- weed/storage/erasure_coding/ec_volume.go | 63 ++++++++++++++++++++++-- 3 files changed, 69 insertions(+), 47 deletions(-) diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 2aec1502b..bcc09a1c0 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -50,7 +50,15 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) } l.ecVolumesLock.Lock() - l.ecVolumes[vid].AddEcVolumeShard(ecVolumeShard) + ecVolume, found := l.ecVolumes[vid] + if !found { + ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid) + if err != nil { + return fmt.Errorf("failed to create ec volume %d: %v", vid, err) + } + l.ecVolumes[vid] = ecVolume + } + ecVolume.AddEcVolumeShard(ecVolumeShard) l.ecVolumesLock.Unlock() return nil @@ -69,6 +77,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding if len(ecVolume.Shards) == 0 { delete(l.ecVolumes, vid) } + ecVolume.Close() return true } diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index fa4bfcecd..037c4ba48 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -6,9 +6,7 @@ import ( "path" "strconv" - "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/types" ) type ShardId uint8 @@ -20,8 +18,6 @@ type EcVolumeShard struct { dir string ecdFile *os.File ecdFileSize int64 - ecxFile *os.File - ecxFileSize int64 } func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { @@ -30,16 +26,6 @@ func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, sha baseFileName := v.FileName() - // open ecx file - if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil { - return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e) - } - ecxFi, statErr := v.ecxFile.Stat() - if statErr != nil { - return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr) - } - v.ecxFileSize = ecxFi.Size() - // open ecd file if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) @@ -76,34 +62,6 @@ func (shard *EcVolumeShard) Close() { _ = shard.ecdFile.Close() shard.ecdFile = nil } - if shard.ecxFile != nil { - _ = shard.ecxFile.Close() - shard.ecxFile = nil - } -} - -func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { - var key types.NeedleId - buf := make([]byte, types.NeedleMapEntrySize) - l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize - for l < h { - m := (l + h) / 2 - if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, 0, err - } - key, offset, size = idx.IdxFileEntry(buf) - if key == needleId { - return - } - if key < needleId { - l = m + 1 - } else { - h = m - } - } - - err = fmt.Errorf("needle id %d not found", needleId) - return } func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 5fba31f95..b4432c884 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -1,16 +1,42 @@ package erasure_coding import ( + "fmt" "math" + "os" "sort" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" ) type EcVolume struct { - Shards []*EcVolumeShard + Shards []*EcVolumeShard + VolumeId needle.VolumeId + Collection string + dir string + ecxFile *os.File + ecxFileSize int64 +} + +func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { + ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid} + + baseFileName := EcShardFileName(collection, dir, int(vid)) + + // open ecx file + if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil { + return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err) + } + ecxFi, statErr := ev.ecxFile.Stat() + if statErr != nil { + return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr) + } + ev.ecxFileSize = ecxFi.Size() + + return } func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool { @@ -55,6 +81,10 @@ func (ev *EcVolume) Close() { for _, s := range ev.Shards { s.Close() } + if ev.ecxFile != nil { + _ = ev.ecxFile.Close() + ev.ecxFile = nil + } } func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) { @@ -76,15 +106,40 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) { - shard := ev.Shards[0] // find the needle from ecx file - offset, size, err = shard.findNeedleFromEcx(n.Id) + offset, size, err = ev.findNeedleFromEcx(n.Id) if err != nil { return types.Offset{}, 0, nil, err } + shard := ev.Shards[0] + // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), size) + + return +} + +func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { + var key types.NeedleId + buf := make([]byte, types.NeedleMapEntrySize) + l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize + for l < h { + m := (l + h) / 2 + if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + return types.Offset{}, 0, err + } + key, offset, size = idx.IdxFileEntry(buf) + if key == needleId { + return + } + if key < needleId { + l = m + 1 + } else { + h = m + } + } + err = fmt.Errorf("needle id %d not found", needleId) return }