diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 1c599b8a0..6d4227a3b 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -84,7 +84,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } volumeServers := collectVolumeServersByDc(topologyInfo, *dc) - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo, nil) diskTypes := collectVolumeDiskTypes(topologyInfo) if *collection == "EACH_COLLECTION" { diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 5bd170e71..b0d857b38 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -175,7 +175,7 @@ func TestIsGoodMove(t *testing.T) { func TestBalance(t *testing.T) { topologyInfo := parseOutput(topoData) volumeServers := collectVolumeServersByDc(topologyInfo, "") - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo, nil) diskTypes := collectVolumeDiskTypes(topologyInfo) if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, 30*1024*1024*1024, "ALL_COLLECTIONS", false); err != nil { diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 6a1634f8a..3325cf256 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -69,7 +69,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err != nil { return err } - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo, nil) // pick 1 pairs of volume replica fileCount := func(replica *VolumeReplica) uint64 { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 783b2cce8..b0a9b5c27 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -60,6 +60,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, noDelete := volFixReplicationCommand.Bool("noDelete", false, "Do not delete over-replicated volumes, only fix under-replication") retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry") volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") + replicaPlacement := volFixReplicationCommand.String("replicaPlacement", "", "override the default replicaPlacement of volume") if err = volFixReplicationCommand.Parse(args); err != nil { return nil @@ -72,6 +73,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, takeAction := !*skipChange doDeletes := !*noDelete + var rp *super_block.ReplicaPlacement + if len(*replicaPlacement) > 0 { + rp, err = super_block.NewReplicaPlacementFromString(*replicaPlacement) + if err != nil { + return err + } + fmt.Fprintf(writer, "override replicaPlacement: %s", rp.String()) + } + underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { fixedVolumeReplicas := map[string]int{} @@ -84,7 +94,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all volumes that needs replication // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo, rp) if len(allLocations) == 0 { return fmt.Errorf("no data nodes at all") @@ -94,9 +104,19 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32 for vid, replicas := range volumeReplicas { replica := replicas[0] + if len(*c.collectionPattern) > 0 { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + continue + } + } replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(replicas) { underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, current only %+d replicas\n", replica.info.Id, replicaPlacement, len(replicas)) } else if replicaPlacement.GetCopyCount() < len(replicas) { overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) @@ -150,7 +170,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, volumeIdLocationCount := len(volumeIdLocation.Locations) i := 0 for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { - fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + fmt.Fprintf(writer, "the number of locations(current:%d, before:%d) for volume %s has not increased yet, let's wait\n", fixedVolumeReplicas[volumeId], volumeIdLocationCount, volumeId) time.Sleep(time.Duration(i+1) * time.Second * 7) volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId}) if err != nil { @@ -168,13 +188,16 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return nil } -func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { +func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo, rp *super_block.ReplicaPlacement) (map[uint32][]*VolumeReplica, []location) { volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { + if rp != nil { + v.ReplicaPlacement = uint32(rp.Byte()) + } volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ location: &loc, info: v, @@ -237,6 +260,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } for _, vid := range underReplicatedVolumeIds { + fmt.Fprintf(writer, "begin fix volume: %d\n", vid) for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { if takeAction { @@ -267,6 +291,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) } if !matched { + fmt.Fprintf(writer, "collection(%s) skipped for volume: %d, filer collection pattern:%s", replica.info.Collection, vid, *c.collectionPattern) hasSkippedCollection = true break } @@ -290,6 +315,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co if replicateErr != nil { return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } + var bytesCount int64 for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -300,10 +326,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co } } if resp.ProcessedBytes > 0 { - fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes) + bytesCount += resp.ProcessedBytes } } - + fmt.Fprintf(writer, "volume %d processed completed! total %d bytes\n", replica.info.Id, bytesCount) return nil }) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 94c457689..a1c736ab7 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -124,7 +124,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } } } - volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo, nil) for _, vol := range diskInfo.VolumeInfos { hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index a59119a40..511b757d6 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -94,7 +94,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } fmt.Printf("tier move volumes: %v\n", volumeIds) - _, allLocations := collectVolumeReplicaLocations(topologyInfo) + _, allLocations := collectVolumeReplicaLocations(topologyInfo, nil) allLocations = filterLocationsByDiskType(allLocations, toDiskType) keepDataNodesSorted(allLocations, toDiskType)