Browse Source

convert needle id to ec intervals to read from

pull/991/head
Chris Lu 6 years ago
parent
commit
a4f3d82c57
  1. 3
      weed/server/volume_grpc_client_to_master.go
  2. 17
      weed/server/volume_server_handlers_read.go
  3. 11
      weed/storage/disk_location_ec.go
  4. 27
      weed/storage/erasure_coding/ec_locate.go
  5. 107
      weed/storage/erasure_coding/ec_shard.go
  6. 26
      weed/storage/erasure_coding/ec_test.go
  7. 74
      weed/storage/erasure_coding/ec_volume.go
  8. 23
      weed/storage/store.go
  9. 32
      weed/storage/store_ec.go

3
weed/server/volume_grpc_client_to_master.go

@ -66,9 +66,6 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
glog.V(0).Infof("Heartbeat to: %v", masterNode)
vs.currentMaster = masterNode
vs.store.Client = stream
defer func() { vs.store.Client = nil }()
doneChan := make(chan error, 1)
go func() {

17
weed/server/volume_server_handlers_read.go

@ -40,7 +40,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
glog.V(4).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) {
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcShard := vs.store.HasEcShard(volumeId)
if !hasVolume && !hasEcShard {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
@ -65,10 +67,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
count, e := vs.store.ReadVolumeNeedle(volumeId, n)
glog.V(4).Infoln("read bytes", count, "error", e)
if e != nil || count < 0 {
glog.V(0).Infof("read %s error: %v", r.URL.Path, e)
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
} else if hasEcShard {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}
glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
glog.V(0).Infof("read %s error: %v", r.URL.Path, err)
w.WriteHeader(http.StatusNotFound)
return
}

11
weed/storage/disk_location_ec.go

@ -16,6 +16,17 @@ var (
re = regexp.MustCompile("\\.ec[0-9][0-9]")
)
func (l *DiskLocation) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
l.ecShardsLock.RLock()
defer l.ecShardsLock.RUnlock()
ecShards, ok := l.ecShards[vid]
if ok {
return ecShards, true
}
return nil, false
}
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
l.ecShardsLock.RLock()
defer l.ecShardsLock.RUnlock()

27
weed/storage/erasure_coding/ec_locate.go

@ -1,22 +1,25 @@
package erasure_coding
type Interval struct {
blockIndex int
innerBlockOffset int64
size uint32
isLargeBlock bool
BlockIndex int
InnerBlockOffset int64
Size uint32
IsLargeBlock bool
LargeBlockRowsCount int
}
func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
nLargeBlockRows := int(datSize / (largeBlockLength * DataShardsCount))
// adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount))
for size > 0 {
interval := Interval{
blockIndex: blockIndex,
innerBlockOffset: innerBlockOffset,
isLargeBlock: isLargeBlock,
BlockIndex: blockIndex,
InnerBlockOffset: innerBlockOffset,
IsLargeBlock: isLargeBlock,
LargeBlockRowsCount: nLargeBlockRows,
}
blockRemaining := largeBlockLength - innerBlockOffset
@ -25,14 +28,14 @@ func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
}
if int64(size) <= blockRemaining {
interval.size = size
interval.Size = size
intervals = append(intervals, interval)
return
}
interval.size = uint32(blockRemaining)
interval.Size = uint32(blockRemaining)
intervals = append(intervals, interval)
size -= interval.size
size -= interval.Size
blockIndex += 1
if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount {
isLargeBlock = false

107
weed/storage/erasure_coding/ec_shard.go

@ -0,0 +1,107 @@
package erasure_coding
import (
"fmt"
"os"
"path"
"strconv"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
type ShardId uint8
type EcVolumeShard struct {
VolumeId needle.VolumeId
ShardId ShardId
Collection string
dir string
ecdFile *os.File
ecdFileSize int64
ecxFile *os.File
ecxFileSize int64
}
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
baseFileName := v.FileName()
// open ecx file
if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
}
ecxFi, statErr := v.ecxFile.Stat()
if statErr != nil {
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
}
v.ecxFileSize = ecxFi.Size()
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
}
ecdFi, statErr := v.ecdFile.Stat()
if statErr != nil {
return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
}
v.ecdFileSize = ecdFi.Size()
return
}
func (shard *EcVolumeShard) String() string {
return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", shard.VolumeId, shard.ShardId, shard.dir, shard.Collection)
}
func (shard *EcVolumeShard) FileName() (fileName string) {
return EcShardFileName(shard.Collection, shard.dir, int(shard.VolumeId))
}
func EcShardFileName(collection string, dir string, id int) (fileName string) {
idString := strconv.Itoa(id)
if collection == "" {
fileName = path.Join(dir, idString)
} else {
fileName = path.Join(dir, collection+"_"+idString)
}
return
}
func (shard *EcVolumeShard) Close() {
if shard.ecdFile != nil {
_ = shard.ecdFile.Close()
shard.ecdFile = nil
}
if shard.ecxFile != nil {
_ = shard.ecxFile.Close()
shard.ecxFile = nil
}
}
func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
return types.Offset{}, 0, err
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
return
}
if key < needleId {
l = m + 1
} else {
h = m
}
}
err = fmt.Errorf("needle id %d not found", needleId)
return
}

26
weed/storage/erasure_coding/ec_test.go

@ -103,7 +103,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
intervals := locateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
nLargeBlockRows := int(datSize / (largeBlockSize * DataShardsCount))
@ -123,20 +123,20 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin
}
func readOneInterval(interval Interval, ecFiles []*os.File, nLargeBlockRows int) (data []byte, err error) {
ecFileOffset := interval.innerBlockOffset
rowIndex := interval.blockIndex / DataShardsCount
if interval.isLargeBlock {
ecFileOffset := interval.InnerBlockOffset
rowIndex := interval.BlockIndex / DataShardsCount
if interval.IsLargeBlock {
ecFileOffset += int64(rowIndex) * largeBlockSize
} else {
ecFileOffset += int64(nLargeBlockRows)*largeBlockSize + int64(rowIndex)*smallBlockSize
}
ecFileIndex := interval.blockIndex % DataShardsCount
ecFileIndex := interval.BlockIndex % DataShardsCount
data = make([]byte, interval.size)
data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)
{ // do some ec testing
ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.size)
ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.Size)
if err != nil {
return nil, fmt.Errorf("ec reconstruct error: %v", err)
}
@ -194,7 +194,7 @@ func removeGeneratedFiles(baseFileName string) {
}
func TestLocateData(t *testing.T) {
intervals := locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
intervals := LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1)
if len(intervals) != 1 {
t.Errorf("unexpected interval size %d", len(intervals))
}
@ -202,13 +202,13 @@ func TestLocateData(t *testing.T) {
t.Errorf("unexpected interval %+v", intervals[0])
}
intervals = locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
intervals = LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100)
fmt.Printf("%+v\n", intervals)
}
func (this Interval) sameAs(that Interval) bool {
return this.isLargeBlock == that.isLargeBlock &&
this.innerBlockOffset == that.innerBlockOffset &&
this.blockIndex == that.blockIndex &&
this.size == that.size
return this.IsLargeBlock == that.IsLargeBlock &&
this.InnerBlockOffset == that.InnerBlockOffset &&
this.BlockIndex == that.BlockIndex &&
this.Size == that.Size
}

74
weed/storage/erasure_coding/ec_volume.go

@ -1,44 +1,15 @@
package erasure_coding
import (
"fmt"
"math"
"os"
"path"
"sort"
"strconv"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
type ShardId uint8
type EcVolumeShard struct {
VolumeId needle.VolumeId
ShardId ShardId
Collection string
dir string
ecdFile *os.File
ecxFile *os.File
}
type EcVolumeShards []*EcVolumeShard
func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
baseFileName := v.FileName()
if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e)
}
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
}
return
}
func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
for _, s := range *shards {
if s.ShardId == ecVolumeShard.ShardId {
@ -68,6 +39,15 @@ func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard)
return true
}
func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
for _, s := range *shards {
if s.ShardId == shardId {
return s, true
}
}
return nil, false
}
func (shards *EcVolumeShards) Close() {
for _, s := range *shards {
s.Close()
@ -91,31 +71,19 @@ func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*m
return
}
func (v *EcVolumeShard) String() string {
return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection)
}
func (v *EcVolumeShard) FileName() (fileName string) {
return EcShardFileName(v.Collection, v.dir, int(v.VolumeId))
}
func (shards *EcVolumeShards) ReadEcShardNeedle(n *needle.Needle) (int, error) {
func EcShardFileName(collection string, dir string, id int) (fileName string) {
idString := strconv.Itoa(id)
if collection == "" {
fileName = path.Join(dir, idString)
} else {
fileName = path.Join(dir, collection+"_"+idString)
shard := (*shards)[0]
// find the needle from ecx file
offset, size, err := shard.findNeedleFromEcx(n.Id)
if err != nil {
return 0, err
}
return
}
func (v *EcVolumeShard) Close() {
if v.ecdFile != nil {
_ = v.ecdFile.Close()
v.ecdFile = nil
}
if v.ecxFile != nil {
_ = v.ecxFile.Close()
v.ecxFile = nil
}
// calculate the locations in the ec shards
intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size)
// TODO read the intervals
return len(intervals), nil
}

23
weed/storage/store.go

@ -18,18 +18,17 @@ const (
* A VolumeServer contains one Store
*/
type Store struct {
volumeSizeLimit uint64 //read from the master
Ip string
Port int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
Client master_pb.Seaweed_SendHeartbeatClient
NeedleMapType NeedleMapType
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
volumeSizeLimit uint64 //read from the master
Ip string
Port int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
NeedleMapType NeedleMapType
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
}

32
weed/storage/store_ec.go

@ -33,9 +33,9 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
var shardBits erasure_coding.ShardBits
s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
}
return nil
}
@ -53,9 +53,9 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
var shardBits erasure_coding.ShardBits
message := master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
}
for _, location := range s.Locations {
@ -69,7 +69,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
}
func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
for _, location := range s.Locations {
if v, found := location.FindEcShard(vid, shardId); found {
return v, found
@ -77,3 +77,21 @@ func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId)
}
return nil, false
}
func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
for _, location := range s.Locations {
if s, found := location.HasEcShard(vid); found {
return s, true
}
}
return nil, false
}
func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
for _, location := range s.Locations {
if ecShards, found := location.HasEcShard(vid); found {
return ecShards.ReadEcShardNeedle(n)
}
}
return 0, fmt.Errorf("ec shard %d not found", vid)
}
Loading…
Cancel
Save