Browse Source

fix ec.encode not finding the local ec shards

pull/991/head
Chris Lu 6 years ago
parent
commit
d85b41b904
  1. 15
      weed/server/volume_grpc_copy.go
  2. 2
      weed/server/volume_grpc_erasure_coding.go
  3. 4
      weed/shell/command_ec_common.go
  4. 2
      weed/shell/command_ec_encode.go

15
weed/server/volume_grpc_copy.go

@ -6,12 +6,14 @@ import (
"io" "io"
"math" "math"
"os" "os"
"path"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -199,11 +201,16 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
} }
fileName = v.FileName() + req.Ext fileName = v.FileName() + req.Ext
} else { } else {
ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
if !found {
return fmt.Errorf("not found ec volume id %d", req.VolumeId)
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
tName := path.Join(location.Directory, baseFileName)
if util.FileExists(tName){
fileName = tName
}
}
if fileName == "" {
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
} }
fileName = ecv.FileName() + req.Ext
} }
bytesToRead := int64(req.StopOffset) bytesToRead := int64(req.StopOffset)

2
weed/server/volume_grpc_erasure_coding.go

@ -200,7 +200,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
if !found { if !found {
return fmt.Errorf("not found ec volume id %d", req.VolumeId)
return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
} }
ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId)) ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
if !found { if !found {

4
weed/shell/command_ec_common.go

@ -62,7 +62,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
SourceDataNode: existingLocation, SourceDataNode: existingLocation,
}) })
if copyErr != nil { if copyErr != nil {
return copyErr
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
} }
} }
@ -73,7 +73,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
ShardIds: shardIdsToCopy, ShardIds: shardIdsToCopy,
}) })
if mountErr != nil { if mountErr != nil {
return mountErr
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
} }
if targetServer.info.Id != existingLocation { if targetServer.info.Id != existingLocation {

2
weed/shell/command_ec_encode.go

@ -97,7 +97,7 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string,
// balance the ec shards to current cluster // balance the ec shards to current cluster
err = spreadEcShards(ctx, commandEnv, vid, collection, locations) err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
if err != nil { if err != nil {
return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err)
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
} }
return nil return nil

Loading…
Cancel
Save