Browse Source

refactoring ecx to ecVolume

pull/991/head
Chris Lu 6 years ago
parent
commit
3a8c1055a2
  1. 11
      weed/storage/disk_location_ec.go
  2. 42
      weed/storage/erasure_coding/ec_shard.go
  3. 61
      weed/storage/erasure_coding/ec_volume.go

11
weed/storage/disk_location_ec.go

@ -50,7 +50,15 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
} }
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
l.ecVolumes[vid].AddEcVolumeShard(ecVolumeShard)
ecVolume, found := l.ecVolumes[vid]
if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid)
if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}
l.ecVolumes[vid] = ecVolume
}
ecVolume.AddEcVolumeShard(ecVolumeShard)
l.ecVolumesLock.Unlock() l.ecVolumesLock.Unlock()
return nil return nil
@ -69,6 +77,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
if len(ecVolume.Shards) == 0 { if len(ecVolume.Shards) == 0 {
delete(l.ecVolumes, vid) delete(l.ecVolumes, vid)
} }
ecVolume.Close()
return true return true
} }

42
weed/storage/erasure_coding/ec_shard.go

@ -6,9 +6,7 @@ import (
"path" "path"
"strconv" "strconv"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type ShardId uint8 type ShardId uint8
@ -20,8 +18,6 @@ type EcVolumeShard struct {
dir string dir string
ecdFile *os.File ecdFile *os.File
ecdFileSize int64 ecdFileSize int64
ecxFile *os.File
ecxFileSize int64
} }
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
@ -30,16 +26,6 @@ func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, sha
baseFileName := v.FileName() baseFileName := v.FileName()
// open ecx file
if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
}
ecxFi, statErr := v.ecxFile.Stat()
if statErr != nil {
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
}
v.ecxFileSize = ecxFi.Size()
// open ecd file // open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
@ -76,34 +62,6 @@ func (shard *EcVolumeShard) Close() {
_ = shard.ecdFile.Close() _ = shard.ecdFile.Close()
shard.ecdFile = nil shard.ecdFile = nil
} }
if shard.ecxFile != nil {
_ = shard.ecxFile.Close()
shard.ecxFile = nil
}
}
func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
return types.Offset{}, 0, err
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
return
}
if key < needleId {
l = m + 1
} else {
h = m
}
}
err = fmt.Errorf("needle id %d not found", needleId)
return
} }
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {

61
weed/storage/erasure_coding/ec_volume.go

@ -1,16 +1,42 @@
package erasure_coding package erasure_coding
import ( import (
"fmt"
"math" "math"
"os"
"sort" "sort"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type EcVolume struct { type EcVolume struct {
Shards []*EcVolumeShard Shards []*EcVolumeShard
VolumeId needle.VolumeId
Collection string
dir string
ecxFile *os.File
ecxFileSize int64
}
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
baseFileName := EcShardFileName(collection, dir, int(vid))
// open ecx file
if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
return
} }
func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool { func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
@ -55,6 +81,10 @@ func (ev *EcVolume) Close() {
for _, s := range ev.Shards { for _, s := range ev.Shards {
s.Close() s.Close()
} }
if ev.ecxFile != nil {
_ = ev.ecxFile.Close()
ev.ecxFile = nil
}
} }
func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) { func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
@ -76,15 +106,40 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) { func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) {
shard := ev.Shards[0]
// find the needle from ecx file // find the needle from ecx file
offset, size, err = shard.findNeedleFromEcx(n.Id)
offset, size, err = ev.findNeedleFromEcx(n.Id)
if err != nil { if err != nil {
return types.Offset{}, 0, nil, err return types.Offset{}, 0, nil, err
} }
shard := ev.Shards[0]
// calculate the locations in the ec shards // calculate the locations in the ec shards
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size)
intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), size)
return
}
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
return types.Offset{}, 0, err
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
return
}
if key < needleId {
l = m + 1
} else {
h = m
}
}
err = fmt.Errorf("needle id %d not found", needleId)
return return
} }
Loading…
Cancel
Save