From 3f9ecee40fd469f9669686752ea8c6b2b8090596 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 May 2019 21:29:07 -0700 Subject: [PATCH] working with reading remote intervals --- weed/command/weedfuse/weedfuse.go | 2 +- weed/filer2/filerstore.go | 4 +- weed/filesys/wfs.go | 6 +-- weed/pb/filer_pb/filer_pb_helper_test.go | 1 - weed/security/guard.go | 2 +- weed/server/volume_grpc_copy.go | 4 +- weed/server/volume_grpc_erasure_coding.go | 37 +++++++++++-------- weed/shell/command_ec_encode.go | 15 ++++++-- weed/storage/disk_location_ec_test.go | 4 +- weed/storage/erasure_coding/ec_volume.go | 4 +- weed/storage/erasure_coding/ec_volume_info.go | 6 +-- weed/storage/idx/walk.go | 1 - weed/storage/needle_map/compact_map.go | 22 +++++------ weed/storage/store_ec.go | 27 +++++++------- weed/topology/topology_ec.go | 2 +- 15 files changed, 75 insertions(+), 62 deletions(-) diff --git a/weed/command/weedfuse/weedfuse.go b/weed/command/weedfuse/weedfuse.go index 5897ea066..4c0d12874 100644 --- a/weed/command/weedfuse/weedfuse.go +++ b/weed/command/weedfuse/weedfuse.go @@ -8,8 +8,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/command" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/kardianos/osext" "github.com/jacobsa/daemonize" + "github.com/kardianos/osext" ) var ( diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 3db50bf17..fa5979b30 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -30,9 +30,9 @@ type FilerStoreWrapper struct { actualStore FilerStore } -func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper{ +func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { return &FilerStoreWrapper{ - actualStore:store, + actualStore: store, } } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 68632b791..9018c36ed 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -29,9 +29,9 @@ type Option struct { DirListingLimit int EntryCacheTtl time.Duration - MountUid uint32 - MountGid uint32 - MountMode os.FileMode + MountUid uint32 + MountGid uint32 + MountMode os.FileMode MountCtime time.Time MountMtime time.Time } diff --git a/weed/pb/filer_pb/filer_pb_helper_test.go b/weed/pb/filer_pb/filer_pb_helper_test.go index 64ed72ed7..02c7842c9 100644 --- a/weed/pb/filer_pb/filer_pb_helper_test.go +++ b/weed/pb/filer_pb/filer_pb_helper_test.go @@ -15,4 +15,3 @@ func TestFileIdSize(t *testing.T) { println(len(fileIdStr)) println(len(bytes)) } - diff --git a/weed/security/guard.go b/weed/security/guard.go index d8427997e..5d25d8327 100644 --- a/weed/security/guard.go +++ b/weed/security/guard.go @@ -49,7 +49,7 @@ type Guard struct { } func NewGuard(whiteList []string, signingKey string, expiresAfterSec int) *Guard { - g := &Guard{whiteList: whiteList, SigningKey: SigningKey(signingKey), ExpiresAfterSec:expiresAfterSec} + g := &Guard{whiteList: whiteList, SigningKey: SigningKey(signingKey), ExpiresAfterSec: expiresAfterSec} g.isActive = len(g.whiteList) != 0 || len(g.SigningKey) != 0 return g } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index e5a3d6edf..186c378ef 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -53,11 +53,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err!=nil{ + if err := vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil { return err } - if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err!=nil{ + if err := vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil { return err } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 6cb826d30..2ef89a040 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -160,35 +160,42 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId) } - buffer := make([]byte, BufferSizeLimit) + bufSize := req.Size + if bufSize > BufferSizeLimit { + bufSize = BufferSizeLimit + } + buffer := make([]byte, bufSize) + startOffset, bytesToRead := req.Offset, req.Size for bytesToRead > 0 { bytesread, err := ecShard.ReadAt(buffer, startOffset) // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + if bytesread > 0 { - if err != nil { - if err != io.EOF { + if int64(bytesread) > bytesToRead { + bytesread = int(bytesToRead) + } + err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ + Data: buffer[:bytesread], + }) + if err != nil { + // println("sending", bytesread, "bytes err", err.Error()) return err } - // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error()) - break - } - if int64(bytesread) > bytesToRead { - bytesread = int(bytesToRead) + bytesToRead -= int64(bytesread) + } - err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ - Data: buffer[:bytesread], - }) + if err != nil { - // println("sending", bytesread, "bytes err", err.Error()) - return err + if err != io.EOF { + return err + } + return nil } - bytesToRead -= int64(bytesread) - } return nil diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 499c8a32e..ac42b520d 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -196,9 +197,10 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { + var shardIdsToCopy []uint32 for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) - copiedShardIds = append(copiedShardIds, shardId) + shardIdsToCopy = append(shardIdsToCopy, shardId) } err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -208,7 +210,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: copiedShardIds, + ShardIds: shardIdsToCopy, SourceDataNode: existingLocation.Url, }) if copyErr != nil { @@ -219,12 +221,17 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: copiedShardIds, + ShardIds: shardIdsToCopy, }) if mountErr != nil { return mountErr } + if targetServer.Id != existingLocation.Url { + copiedShardIds = shardIdsToCopy + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) + } + return nil }) @@ -243,7 +250,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), - ShardIds: toBeDeletedShardIds, + ShardIds: toBeDeletedShardIds, ShouldDeleteEcx: shouldDeleteEcx, }) return deleteErr diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go index a65eb906b..3ce04c6c0 100644 --- a/weed/storage/disk_location_ec_test.go +++ b/weed/storage/disk_location_ec_test.go @@ -11,7 +11,7 @@ func TestLoadingEcShards(t *testing.T) { t.Errorf("load all ec shards: %v", err) } - if len(dl.ecVolumes)!=1 { + if len(dl.ecVolumes) != 1 { t.Errorf("loading err") } -} \ No newline at end of file +} diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 28f26d683..32f66260a 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -111,7 +111,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V return } -func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) { +func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) { // find the needle from ecx file offset, size, err = ev.findNeedleFromEcx(n.Id) @@ -122,7 +122,7 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, shard := ev.Shards[0] // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), size) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version))) return } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 48f4713d9..c9e85c662 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -40,7 +40,7 @@ func (ecInfo *EcVolumeInfo) ShardIdCount() (count int) { return ecInfo.ShardBits.ShardIdCount() } -func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) (*EcVolumeInfo) { +func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { ret := &EcVolumeInfo{ VolumeId: ecInfo.VolumeId, Collection: ecInfo.Collection, @@ -88,10 +88,10 @@ func (b ShardBits) ShardIdCount() (count int) { return } -func (b ShardBits) Minus(other ShardBits) (ShardBits) { +func (b ShardBits) Minus(other ShardBits) ShardBits { return b &^ other } -func (b ShardBits) Plus(other ShardBits) (ShardBits) { +func (b ShardBits) Plus(other ShardBits) ShardBits { return b | other } diff --git a/weed/storage/idx/walk.go b/weed/storage/idx/walk.go index c674a2aeb..90efb75e6 100644 --- a/weed/storage/idx/walk.go +++ b/weed/storage/idx/walk.go @@ -51,4 +51,3 @@ func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size u const ( RowsToRead = 1024 ) - diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 8aee51ec9..4089e65f7 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -248,16 +248,16 @@ func (cm *CompactMap) AscendingVisit(visit func(NeedleValue) error) error { for _, cs := range cm.list { cs.RLock() var i, j int - for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values) && j 1 { + glog.V(4).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) + } + bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) } @@ -124,14 +125,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n return 0, fmt.Errorf("ec shard %d not found", vid) } -func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, version needle.Version, intervals []erasure_coding.Interval) (data []byte, err error) { +func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) { if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil { return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } for i, interval := range intervals { - if d, e := s.readOneEcShardInterval(ctx, ecVolume, version, interval); e != nil { + if d, e := s.readOneEcShardInterval(ctx, ecVolume, interval); e != nil { return nil, e } else { if i == 0 { @@ -144,11 +145,10 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, e return } -func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, version needle.Version, interval erasure_coding.Interval) (data []byte, err error) { +func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, err error) { shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) - data = make([]byte, int(needle.GetActualSize(interval.Size, version))) + data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { - glog.V(3).Infof("read local ec shard %d.%d", ecVolume.VolumeId, shardId) if _, err = shard.ReadAt(data, actualOffset); err != nil { glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err) return @@ -160,7 +160,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co if !found || len(sourceDataNodes) == 0 { return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId) } - glog.V(3).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0]) + glog.V(4).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0]) _, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNodes[0], ecVolume.VolumeId, shardId, data, actualOffset) if err != nil { glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNodes[0], ecVolume.VolumeId, shardId, err) @@ -195,6 +195,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url) } } + ecVolume.ShardLocationsRefreshTime = time.Now() ecVolume.ShardLocationsLock.Unlock() return nil diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 63cd3ce35..1ead27a81 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -123,7 +123,7 @@ func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, } } -func (t *Topology) LookupEcShards(vid needle.VolumeId)(locations *EcShardLocations, found bool) { +func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) { t.ecShardMapLock.RLock() defer t.ecShardMapLock.RUnlock()