From a4f3d82c57bca13321dca257891836ff36c7eca5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 27 May 2019 01:29:46 -0700 Subject: [PATCH] convert needle id to ec intervals to read from --- weed/server/volume_grpc_client_to_master.go | 3 - weed/server/volume_server_handlers_read.go | 17 +++- weed/storage/disk_location_ec.go | 11 ++ weed/storage/erasure_coding/ec_locate.go | 27 ++--- weed/storage/erasure_coding/ec_shard.go | 107 ++++++++++++++++++++ weed/storage/erasure_coding/ec_test.go | 26 ++--- weed/storage/erasure_coding/ec_volume.go | 74 ++++---------- weed/storage/store.go | 23 ++--- weed/storage/store_ec.go | 32 ++++-- 9 files changed, 215 insertions(+), 105 deletions(-) create mode 100644 weed/storage/erasure_coding/ec_shard.go diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 571c5716c..0fa61d71d 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -66,9 +66,6 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA glog.V(0).Infof("Heartbeat to: %v", masterNode) vs.currentMaster = masterNode - vs.store.Client = stream - defer func() { vs.store.Client = nil }() - doneChan := make(chan error, 1) go func() { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 816afcb8b..4af63e2ce 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -40,7 +40,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } glog.V(4).Infoln("volume", volumeId, "reading", n) - if !vs.store.HasVolume(volumeId) { + hasVolume := vs.store.HasVolume(volumeId) + _, hasEcShard := vs.store.HasEcShard(volumeId) + if !hasVolume && !hasEcShard { if !vs.ReadRedirect { glog.V(2).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) @@ -65,10 +67,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie - count, e := vs.store.ReadVolumeNeedle(volumeId, n) - glog.V(4).Infoln("read bytes", count, "error", e) - if e != nil || count < 0 { - glog.V(0).Infof("read %s error: %v", r.URL.Path, e) + var count int + if hasVolume { + count, err = vs.store.ReadVolumeNeedle(volumeId, n) + } else if hasEcShard { + count, err = vs.store.ReadEcShardNeedle(volumeId, n) + } + glog.V(4).Infoln("read bytes", count, "error", err) + if err != nil || count < 0 { + glog.V(0).Infof("read %s error: %v", r.URL.Path, err) w.WriteHeader(http.StatusNotFound) return } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 05acbb98b..e91c0f262 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -16,6 +16,17 @@ var ( re = regexp.MustCompile("\\.ec[0-9][0-9]") ) +func (l *DiskLocation) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) { + l.ecShardsLock.RLock() + defer l.ecShardsLock.RUnlock() + + ecShards, ok := l.ecShards[vid] + if ok { + return ecShards, true + } + return nil, false +} + func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) { l.ecShardsLock.RLock() defer l.ecShardsLock.RUnlock() diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go index b570f750c..ee8af3382 100644 --- a/weed/storage/erasure_coding/ec_locate.go +++ b/weed/storage/erasure_coding/ec_locate.go @@ -1,22 +1,25 @@ package erasure_coding type Interval struct { - blockIndex int - innerBlockOffset int64 - size uint32 - isLargeBlock bool + BlockIndex int + InnerBlockOffset int64 + Size uint32 + IsLargeBlock bool + LargeBlockRowsCount int } -func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) { +func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) { blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset) - nLargeBlockRows := int(datSize / (largeBlockLength * DataShardsCount)) + // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size + nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount)) for size > 0 { interval := Interval{ - blockIndex: blockIndex, - innerBlockOffset: innerBlockOffset, - isLargeBlock: isLargeBlock, + BlockIndex: blockIndex, + InnerBlockOffset: innerBlockOffset, + IsLargeBlock: isLargeBlock, + LargeBlockRowsCount: nLargeBlockRows, } blockRemaining := largeBlockLength - innerBlockOffset @@ -25,14 +28,14 @@ func locateData(largeBlockLength, smallBlockLength int64, datSize int64, offset } if int64(size) <= blockRemaining { - interval.size = size + interval.Size = size intervals = append(intervals, interval) return } - interval.size = uint32(blockRemaining) + interval.Size = uint32(blockRemaining) intervals = append(intervals, interval) - size -= interval.size + size -= interval.Size blockIndex += 1 if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount { isLargeBlock = false diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go new file mode 100644 index 000000000..084d3f03b --- /dev/null +++ b/weed/storage/erasure_coding/ec_shard.go @@ -0,0 +1,107 @@ +package erasure_coding + +import ( + "fmt" + "os" + "path" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type ShardId uint8 + +type EcVolumeShard struct { + VolumeId needle.VolumeId + ShardId ShardId + Collection string + dir string + ecdFile *os.File + ecdFileSize int64 + ecxFile *os.File + ecxFileSize int64 +} + +func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { + + v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId} + + baseFileName := v.FileName() + + // open ecx file + if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil { + return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e) + } + ecxFi, statErr := v.ecxFile.Stat() + if statErr != nil { + return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr) + } + v.ecxFileSize = ecxFi.Size() + + // open ecd file + if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { + return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) + } + ecdFi, statErr := v.ecdFile.Stat() + if statErr != nil { + return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr) + } + v.ecdFileSize = ecdFi.Size() + + return +} + +func (shard *EcVolumeShard) String() string { + return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", shard.VolumeId, shard.ShardId, shard.dir, shard.Collection) +} + +func (shard *EcVolumeShard) FileName() (fileName string) { + return EcShardFileName(shard.Collection, shard.dir, int(shard.VolumeId)) +} + +func EcShardFileName(collection string, dir string, id int) (fileName string) { + idString := strconv.Itoa(id) + if collection == "" { + fileName = path.Join(dir, idString) + } else { + fileName = path.Join(dir, collection+"_"+idString) + } + return +} + +func (shard *EcVolumeShard) Close() { + if shard.ecdFile != nil { + _ = shard.ecdFile.Close() + shard.ecdFile = nil + } + if shard.ecxFile != nil { + _ = shard.ecxFile.Close() + shard.ecxFile = nil + } +} + +func (shard *EcVolumeShard) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { + var key types.NeedleId + buf := make([]byte, types.NeedleMapEntrySize) + l, h := int64(0), shard.ecxFileSize/types.NeedleMapEntrySize + for l < h { + m := (l + h) / 2 + if _, err := shard.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + return types.Offset{}, 0, err + } + key, offset, size = idx.IdxFileEntry(buf) + if key == needleId { + return + } + if key < needleId { + l = m + 1 + } else { + h = m + } + } + + err = fmt.Errorf("needle id %d not found", needleId) + return +} diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index ecf73ac96..83b0bc23a 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -103,7 +103,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) { - intervals := locateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size) + intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size) nLargeBlockRows := int(datSize / (largeBlockSize * DataShardsCount)) @@ -123,20 +123,20 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin } func readOneInterval(interval Interval, ecFiles []*os.File, nLargeBlockRows int) (data []byte, err error) { - ecFileOffset := interval.innerBlockOffset - rowIndex := interval.blockIndex / DataShardsCount - if interval.isLargeBlock { + ecFileOffset := interval.InnerBlockOffset + rowIndex := interval.BlockIndex / DataShardsCount + if interval.IsLargeBlock { ecFileOffset += int64(rowIndex) * largeBlockSize } else { ecFileOffset += int64(nLargeBlockRows)*largeBlockSize + int64(rowIndex)*smallBlockSize } - ecFileIndex := interval.blockIndex % DataShardsCount + ecFileIndex := interval.BlockIndex % DataShardsCount - data = make([]byte, interval.size) + data = make([]byte, interval.Size) err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset) { // do some ec testing - ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.size) + ecData, err := readFromOtherEcFiles(ecFiles, ecFileIndex, ecFileOffset, interval.Size) if err != nil { return nil, fmt.Errorf("ec reconstruct error: %v", err) } @@ -194,7 +194,7 @@ func removeGeneratedFiles(baseFileName string) { } func TestLocateData(t *testing.T) { - intervals := locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1) + intervals := LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize, 1) if len(intervals) != 1 { t.Errorf("unexpected interval size %d", len(intervals)) } @@ -202,13 +202,13 @@ func TestLocateData(t *testing.T) { t.Errorf("unexpected interval %+v", intervals[0]) } - intervals = locateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100) + intervals = LocateData(largeBlockSize, smallBlockSize, DataShardsCount*largeBlockSize+1, DataShardsCount*largeBlockSize/2+100, DataShardsCount*largeBlockSize+1-DataShardsCount*largeBlockSize/2-100) fmt.Printf("%+v\n", intervals) } func (this Interval) sameAs(that Interval) bool { - return this.isLargeBlock == that.isLargeBlock && - this.innerBlockOffset == that.innerBlockOffset && - this.blockIndex == that.blockIndex && - this.size == that.size + return this.IsLargeBlock == that.IsLargeBlock && + this.InnerBlockOffset == that.InnerBlockOffset && + this.BlockIndex == that.BlockIndex && + this.Size == that.Size } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 11dcd0860..d57a28449 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -1,44 +1,15 @@ package erasure_coding import ( - "fmt" "math" - "os" - "path" "sort" - "strconv" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -type ShardId uint8 - -type EcVolumeShard struct { - VolumeId needle.VolumeId - ShardId ShardId - Collection string - dir string - ecdFile *os.File - ecxFile *os.File -} type EcVolumeShards []*EcVolumeShard -func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { - - v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId} - - baseFileName := v.FileName() - if v.ecxFile, e = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); e != nil { - return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, e) - } - if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { - return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) - } - - return -} - func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool { for _, s := range *shards { if s.ShardId == ecVolumeShard.ShardId { @@ -68,6 +39,15 @@ func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard) return true } +func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) { + for _, s := range *shards { + if s.ShardId == shardId { + return s, true + } + } + return nil, false +} + func (shards *EcVolumeShards) Close() { for _, s := range *shards { s.Close() @@ -91,31 +71,19 @@ func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*m return } -func (v *EcVolumeShard) String() string { - return fmt.Sprintf("ec shard %v:%v, dir:%s, Collection:%s", v.VolumeId, v.ShardId, v.dir, v.Collection) -} - -func (v *EcVolumeShard) FileName() (fileName string) { - return EcShardFileName(v.Collection, v.dir, int(v.VolumeId)) -} +func (shards *EcVolumeShards) ReadEcShardNeedle(n *needle.Needle) (int, error) { -func EcShardFileName(collection string, dir string, id int) (fileName string) { - idString := strconv.Itoa(id) - if collection == "" { - fileName = path.Join(dir, idString) - } else { - fileName = path.Join(dir, collection+"_"+idString) + shard := (*shards)[0] + // find the needle from ecx file + offset, size, err := shard.findNeedleFromEcx(n.Id) + if err != nil { + return 0, err } - return -} -func (v *EcVolumeShard) Close() { - if v.ecdFile != nil { - _ = v.ecdFile.Close() - v.ecdFile = nil - } - if v.ecxFile != nil { - _ = v.ecxFile.Close() - v.ecxFile = nil - } + // calculate the locations in the ec shards + intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shard.ecxFileSize, offset.ToAcutalOffset(), size) + + // TODO read the intervals + + return len(intervals), nil } diff --git a/weed/storage/store.go b/weed/storage/store.go index ad8f2d6b9..d5474d87f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -18,18 +18,17 @@ const ( * A VolumeServer contains one Store */ type Store struct { - volumeSizeLimit uint64 //read from the master - Ip string - Port int - PublicUrl string - Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists - rack string //optional information, overwriting master setting if exists - connected bool - Client master_pb.Seaweed_SendHeartbeatClient - NeedleMapType NeedleMapType - NewVolumesChan chan master_pb.VolumeShortInformationMessage - DeletedVolumesChan chan master_pb.VolumeShortInformationMessage + volumeSizeLimit uint64 //read from the master + Ip string + Port int + PublicUrl string + Locations []*DiskLocation + dataCenter string //optional informaton, overwriting master setting if exists + rack string //optional information, overwriting master setting if exists + connected bool + NeedleMapType NeedleMapType + NewVolumesChan chan master_pb.VolumeShortInformationMessage + DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index ad935609d..ed7c6484b 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -33,9 +33,9 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er var shardBits erasure_coding.ShardBits s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ - Id: uint32(vid), - Collection: collection, - EcIndexBits: uint32(shardBits.AddShardId(shardId)), + Id: uint32(vid), + Collection: collection, + EcIndexBits: uint32(shardBits.AddShardId(shardId)), } return nil } @@ -53,9 +53,9 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar var shardBits erasure_coding.ShardBits message := master_pb.VolumeEcShardInformationMessage{ - Id: uint32(vid), - Collection: ecShard.Collection, - EcIndexBits: uint32(shardBits.AddShardId(shardId)), + Id: uint32(vid), + Collection: ecShard.Collection, + EcIndexBits: uint32(shardBits.AddShardId(shardId)), } for _, location := range s.Locations { @@ -69,7 +69,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId) } -func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) { +func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) { for _, location := range s.Locations { if v, found := location.FindEcShard(vid, shardId); found { return v, found @@ -77,3 +77,21 @@ func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) } return nil, false } + +func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) { + for _, location := range s.Locations { + if s, found := location.HasEcShard(vid); found { + return s, true + } + } + return nil, false +} + +func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { + for _, location := range s.Locations { + if ecShards, found := location.HasEcShard(vid); found { + return ecShards.ReadEcShardNeedle(n) + } + } + return 0, fmt.Errorf("ec shard %d not found", vid) +}