From d344e0a035985ce7a28a6f7a4499199ef27aeda3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Jun 2019 23:20:26 -0700 Subject: [PATCH] fix ec related bugs --- weed/server/volume_server_handlers_read.go | 2 +- weed/shell/command_ec_balance.go | 25 ++++++++++++++++++++++ weed/shell/command_ec_common.go | 9 +++++++- weed/shell/command_ec_rebuild.go | 2 ++ weed/storage/disk_location.go | 4 ++-- weed/storage/disk_location_ec.go | 2 +- weed/storage/erasure_coding/ec_volume.go | 4 ++-- weed/storage/store_ec.go | 2 +- weed/topology/topology.go | 10 ++++++++- 9 files changed, 51 insertions(+), 9 deletions(-) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 839e17fbe..9bc436239 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -76,7 +76,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { - glog.V(0).Infof("read %s error: %v", r.URL.Path, err) + glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) w.WriteHeader(http.StatusNotFound) return } diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 424b63d9d..4edf94711 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -199,6 +199,7 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { return err } + deleteEcVolumeShards(ecNode, vid, duplicatedShardIds) ecNode.freeEcSlot++ } } @@ -273,3 +274,27 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar return 0 } + +func addEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){ + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + for _, shardId := range shardIds{ + shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).AddShardId(erasure_coding.ShardId(shardId))) + } + } + } + +} + +func deleteEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){ + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + for _, shardId := range shardIds{ + shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).RemoveShardId(erasure_coding.ShardId(shardId))) + } + } + } + +} diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 4c53ba43b..041715908 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -35,7 +35,14 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist } // ask source node to delete the shard, and maybe the ecx file - return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + deleteEcVolumeShards(existingLocation, vid, copiedShardIds) + + return nil } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 9a5ea3ca9..20929d76c 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -163,6 +163,8 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * return err } + addEcVolumeShards(rebuilder, volumeId, generatedShardIds) + return nil } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 234d1a5f4..e61623fc7 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -223,8 +223,8 @@ func (l *DiskLocation) Close() { l.Unlock() l.ecVolumesLock.Lock() - for _, shards := range l.ecVolumes { - shards.Close() + for _, ecVolume := range l.ecVolumes { + ecVolume.Close() } l.ecVolumesLock.Unlock() diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 7fb5351a4..ba0824c6d 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -87,8 +87,8 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { if len(ecVolume.Shards) == 0 { delete(l.ecVolumes, vid) + ecVolume.Close() } - ecVolume.Close() return true } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 17e7f2935..aea53f36e 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -155,7 +155,7 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version // find the needle from ecx file offset, size, err = ev.findNeedleFromEcx(n.Id) if err != nil { - return types.Offset{}, 0, nil, err + return types.Offset{}, 0, nil, fmt.Errorf("findNeedleFromEcx: %v", err) } shard := ev.Shards[0] @@ -173,7 +173,7 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off for l < h { m := (l + h) / 2 if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, 0, err + return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err) } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 43fb62ba1..408436376 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -118,7 +118,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version) if err != nil { - return 0, err + return 0, fmt.Errorf("locate in local ec volume: %v", err) } glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index c6fdd3861..aa01190c9 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -78,7 +78,7 @@ func (t *Topology) Leader() (string, error) { return l, nil } -func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -91,6 +91,14 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode { return c.(*Collection).Lookup(vid) } } + + if locations, found := t.LookupEcShards(vid); found { + for _, loc := range locations.Locations { + dataNodes = append(dataNodes, loc...) + } + return dataNodes + } + return nil }