diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index f53256c37..cef2198aa 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -4,17 +4,21 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) func init() { @@ -110,9 +114,33 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { // find volume location - existingLocations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) - if !found { - return fmt.Errorf("volume %d not found", vid) + topoInfo, _, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return fmt.Errorf("collect topology info: %v", err) + } + + var existingLocations []wdclient.Location + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + for _, disk := range dn.DiskInfos { + for _, vi := range disk.VolumeInfos { + if needle.VolumeId(vi.Id) == vid && (collection == "" || vi.Collection == collection) { + fmt.Printf("find volume %d from Url:%s, GrpcPort:%d, DC:%s\n", vid, dn.Id, dn.GrpcPort, string(dc)) + existingLocations = append(existingLocations, wdclient.Location{ + Url: dn.Id, + PublicUrl: dn.Id, + GrpcPort: int(dn.GrpcPort), + DataCenter: string(dc), + }) + } + } + } + }) + + if len(existingLocations) == 0 { + if collection == "" { + return fmt.Errorf("volume %d not found", vid) + } + return fmt.Errorf("volume %d not found in collection %s", vid, collection) } err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false) @@ -135,7 +163,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str if i == 0 { continue } - fmt.Printf("delete volume %d from %s\n", vid, location.Url) + fmt.Printf("delete volume %d from Url:%s\n", vid, location.Url) err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)