@ -60,6 +60,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
noDelete := volFixReplicationCommand . Bool ( "noDelete" , false , "Do not delete over-replicated volumes, only fix under-replication" )
noDelete := volFixReplicationCommand . Bool ( "noDelete" , false , "Do not delete over-replicated volumes, only fix under-replication" )
retryCount := volFixReplicationCommand . Int ( "retry" , 5 , "how many times to retry" )
retryCount := volFixReplicationCommand . Int ( "retry" , 5 , "how many times to retry" )
volumesPerStep := volFixReplicationCommand . Int ( "volumesPerStep" , 0 , "how many volumes to fix in one cycle" )
volumesPerStep := volFixReplicationCommand . Int ( "volumesPerStep" , 0 , "how many volumes to fix in one cycle" )
replicaPlacement := volFixReplicationCommand . String ( "replicaPlacement" , "" , "override the default replicaPlacement of volume" )
if err = volFixReplicationCommand . Parse ( args ) ; err != nil {
if err = volFixReplicationCommand . Parse ( args ) ; err != nil {
return nil
return nil
@ -72,6 +73,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := ! * skipChange
takeAction := ! * skipChange
doDeletes := ! * noDelete
doDeletes := ! * noDelete
var rp * super_block . ReplicaPlacement
if len ( * replicaPlacement ) > 0 {
rp , err = super_block . NewReplicaPlacementFromString ( * replicaPlacement )
if err != nil {
return err
}
fmt . Fprintf ( writer , "override replicaPlacement: %s" , rp . String ( ) )
}
underReplicatedVolumeIdsCount := 1
underReplicatedVolumeIdsCount := 1
for underReplicatedVolumeIdsCount > 0 {
for underReplicatedVolumeIdsCount > 0 {
fixedVolumeReplicas := map [ string ] int { }
fixedVolumeReplicas := map [ string ] int { }
@ -84,7 +94,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication
// find all volumes that needs replication
// collect all data nodes
// collect all data nodes
volumeReplicas , allLocations := collectVolumeReplicaLocations ( topologyInfo )
volumeReplicas , allLocations := collectVolumeReplicaLocations ( topologyInfo , rp )
if len ( allLocations ) == 0 {
if len ( allLocations ) == 0 {
return fmt . Errorf ( "no data nodes at all" )
return fmt . Errorf ( "no data nodes at all" )
@ -94,9 +104,19 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
var underReplicatedVolumeIds , overReplicatedVolumeIds , misplacedVolumeIds [ ] uint32
var underReplicatedVolumeIds , overReplicatedVolumeIds , misplacedVolumeIds [ ] uint32
for vid , replicas := range volumeReplicas {
for vid , replicas := range volumeReplicas {
replica := replicas [ 0 ]
replica := replicas [ 0 ]
if len ( * c . collectionPattern ) > 0 {
matched , err := filepath . Match ( * c . collectionPattern , replica . info . Collection )
if err != nil {
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
}
if ! matched {
continue
}
}
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
replicaPlacement , _ := super_block . NewReplicaPlacementFromByte ( byte ( replica . info . ReplicaPlacement ) )
if replicaPlacement . GetCopyCount ( ) > len ( replicas ) {
if replicaPlacement . GetCopyCount ( ) > len ( replicas ) {
underReplicatedVolumeIds = append ( underReplicatedVolumeIds , vid )
underReplicatedVolumeIds = append ( underReplicatedVolumeIds , vid )
fmt . Fprintf ( writer , "volume %d replication %s, current only %+d replicas\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
} else if replicaPlacement . GetCopyCount ( ) < len ( replicas ) {
} else if replicaPlacement . GetCopyCount ( ) < len ( replicas ) {
overReplicatedVolumeIds = append ( overReplicatedVolumeIds , vid )
overReplicatedVolumeIds = append ( overReplicatedVolumeIds , vid )
fmt . Fprintf ( writer , "volume %d replication %s, but over replicated %+d\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
fmt . Fprintf ( writer , "volume %d replication %s, but over replicated %+d\n" , replica . info . Id , replicaPlacement , len ( replicas ) )
@ -150,7 +170,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
volumeIdLocationCount := len ( volumeIdLocation . Locations )
volumeIdLocationCount := len ( volumeIdLocation . Locations )
i := 0
i := 0
for fixedVolumeReplicas [ volumeId ] >= volumeIdLocationCount {
for fixedVolumeReplicas [ volumeId ] >= volumeIdLocationCount {
fmt . Fprintf ( writer , "the number of locations for volume %s has not increased yet, let's wait\n" , volumeId )
fmt . Fprintf ( writer , "the number of locations(current:%d, before:%d) for volume %s has not increased yet, let's wait\n" , fixedVolumeReplicas [ volumeId ] , volumeIdLocationCount , volumeId )
time . Sleep ( time . Duration ( i + 1 ) * time . Second * 7 )
time . Sleep ( time . Duration ( i + 1 ) * time . Second * 7 )
volumeLocIds , err := lookupVolumeIds ( commandEnv , [ ] string { volumeId } )
volumeLocIds , err := lookupVolumeIds ( commandEnv , [ ] string { volumeId } )
if err != nil {
if err != nil {
@ -168,13 +188,16 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
return nil
return nil
}
}
func collectVolumeReplicaLocations ( topologyInfo * master_pb . TopologyInfo ) ( map [ uint32 ] [ ] * VolumeReplica , [ ] location ) {
func collectVolumeReplicaLocations ( topologyInfo * master_pb . TopologyInfo , rp * super_block . ReplicaPlacement ) ( map [ uint32 ] [ ] * VolumeReplica , [ ] location ) {
volumeReplicas := make ( map [ uint32 ] [ ] * VolumeReplica )
volumeReplicas := make ( map [ uint32 ] [ ] * VolumeReplica )
var allLocations [ ] location
var allLocations [ ] location
eachDataNode ( topologyInfo , func ( dc string , rack RackId , dn * master_pb . DataNodeInfo ) {
eachDataNode ( topologyInfo , func ( dc string , rack RackId , dn * master_pb . DataNodeInfo ) {
loc := newLocation ( dc , string ( rack ) , dn )
loc := newLocation ( dc , string ( rack ) , dn )
for _ , diskInfo := range dn . DiskInfos {
for _ , diskInfo := range dn . DiskInfos {
for _ , v := range diskInfo . VolumeInfos {
for _ , v := range diskInfo . VolumeInfos {
if rp != nil {
v . ReplicaPlacement = uint32 ( rp . Byte ( ) )
}
volumeReplicas [ v . Id ] = append ( volumeReplicas [ v . Id ] , & VolumeReplica {
volumeReplicas [ v . Id ] = append ( volumeReplicas [ v . Id ] , & VolumeReplica {
location : & loc ,
location : & loc ,
info : v ,
info : v ,
@ -237,6 +260,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
underReplicatedVolumeIds = underReplicatedVolumeIds [ 0 : volumesPerStep ]
underReplicatedVolumeIds = underReplicatedVolumeIds [ 0 : volumesPerStep ]
}
}
for _ , vid := range underReplicatedVolumeIds {
for _ , vid := range underReplicatedVolumeIds {
fmt . Fprintf ( writer , "begin fix volume: %d\n" , vid )
for i := 0 ; i < retryCount + 1 ; i ++ {
for i := 0 ; i < retryCount + 1 ; i ++ {
if err = c . fixOneUnderReplicatedVolume ( commandEnv , writer , takeAction , volumeReplicas , vid , allLocations ) ; err == nil {
if err = c . fixOneUnderReplicatedVolume ( commandEnv , writer , takeAction , volumeReplicas , vid , allLocations ) ; err == nil {
if takeAction {
if takeAction {
@ -267,6 +291,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
return fmt . Errorf ( "match pattern %s with collection %s: %v" , * c . collectionPattern , replica . info . Collection , err )
}
}
if ! matched {
if ! matched {
fmt . Fprintf ( writer , "collection(%s) skipped for volume: %d, filer collection pattern:%s" , replica . info . Collection , vid , * c . collectionPattern )
hasSkippedCollection = true
hasSkippedCollection = true
break
break
}
}
@ -290,6 +315,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
if replicateErr != nil {
if replicateErr != nil {
return fmt . Errorf ( "copying from %s => %s : %v" , replica . location . dataNode . Id , dst . dataNode . Id , replicateErr )
return fmt . Errorf ( "copying from %s => %s : %v" , replica . location . dataNode . Id , dst . dataNode . Id , replicateErr )
}
}
var bytesCount int64
for {
for {
resp , recvErr := stream . Recv ( )
resp , recvErr := stream . Recv ( )
if recvErr != nil {
if recvErr != nil {
@ -300,10 +326,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
}
}
}
}
if resp . ProcessedBytes > 0 {
if resp . ProcessedBytes > 0 {
fmt . Fprintf ( writer , "volume %d processed %d bytes\n" , replica . info . Id , resp . ProcessedBytes )
bytesCount += resp . ProcessedBytes
}
}
}
}
fmt . Fprintf ( writer , "volume %d processed completed! total %d bytes\n" , replica . info . Id , bytesCount )
return nil
return nil
} )
} )