|  |  | @ -63,6 +63,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr | 
			
		
	
		
			
				
					|  |  |  | 	collection := encodeCommand.String("collection", "", "the collection name") | 
			
		
	
		
			
				
					|  |  |  | 	fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") | 
			
		
	
		
			
				
					|  |  |  | 	quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") | 
			
		
	
		
			
				
					|  |  |  | 	parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel") | 
			
		
	
		
			
				
					|  |  |  | 	if err = encodeCommand.Parse(args); err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -71,7 +72,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// volumeId is provided
 | 
			
		
	
		
			
				
					|  |  |  | 	if vid != 0 { | 
			
		
	
		
			
				
					|  |  |  | 		return doEcEncode(commandEnv, *collection, vid) | 
			
		
	
		
			
				
					|  |  |  | 		return doEcEncode(commandEnv, *collection, vid, *parallelCopy) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// apply to all volumes in the collection
 | 
			
		
	
	
		
			
				
					|  |  | @ -81,7 +82,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	fmt.Printf("ec encode volumes: %v\n", volumeIds) | 
			
		
	
		
			
				
					|  |  |  | 	for _, vid := range volumeIds { | 
			
		
	
		
			
				
					|  |  |  | 		if err = doEcEncode(commandEnv, *collection, vid); err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return err | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -89,7 +90,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { | 
			
		
	
		
			
				
					|  |  |  | func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) { | 
			
		
	
		
			
				
					|  |  |  | 	// find volume location
 | 
			
		
	
		
			
				
					|  |  |  | 	locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) | 
			
		
	
		
			
				
					|  |  |  | 	if !found { | 
			
		
	
	
		
			
				
					|  |  | @ -111,7 +112,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// balance the ec shards to current cluster
 | 
			
		
	
		
			
				
					|  |  |  | 	err = spreadEcShards(commandEnv, vid, collection, locations) | 
			
		
	
		
			
				
					|  |  |  | 	err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -157,7 +158,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { | 
			
		
	
		
			
				
					|  |  |  | func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
	
		
			
				
					|  |  | @ -176,7 +177,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection | 
			
		
	
		
			
				
					|  |  |  | 	allocatedEcIds := balancedEcDistribution(allocatedDataNodes) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// ask the data nodes to copy from the source volume server
 | 
			
		
	
		
			
				
					|  |  |  | 	copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) | 
			
		
	
		
			
				
					|  |  |  | 	copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return err | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -206,30 +207,36 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { | 
			
		
	
		
			
				
					|  |  |  | func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// parallelize
 | 
			
		
	
		
			
				
					|  |  |  | 	shardIdChan := make(chan []uint32, len(targetServers)) | 
			
		
	
		
			
				
					|  |  |  | 	var wg sync.WaitGroup | 
			
		
	
		
			
				
					|  |  |  | 	shardIdChan := make(chan []uint32, len(targetServers)) | 
			
		
	
		
			
				
					|  |  |  | 	copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) { | 
			
		
	
		
			
				
					|  |  |  | 		defer wg.Done() | 
			
		
	
		
			
				
					|  |  |  | 		copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, | 
			
		
	
		
			
				
					|  |  |  | 			allocatedEcShardIds, volumeId, collection, existingLocation.Url) | 
			
		
	
		
			
				
					|  |  |  | 		if copyErr != nil { | 
			
		
	
		
			
				
					|  |  |  | 			err = copyErr | 
			
		
	
		
			
				
					|  |  |  | 		} else { | 
			
		
	
		
			
				
					|  |  |  | 			shardIdChan <- copiedShardIds | 
			
		
	
		
			
				
					|  |  |  | 			server.addEcVolumeShards(volumeId, collection, copiedShardIds) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// maybe parallelize
 | 
			
		
	
		
			
				
					|  |  |  | 	for i, server := range targetServers { | 
			
		
	
		
			
				
					|  |  |  | 		if len(allocatedEcIds[i]) <= 0 { | 
			
		
	
		
			
				
					|  |  |  | 			continue | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		wg.Add(1) | 
			
		
	
		
			
				
					|  |  |  | 		go func(server *EcNode, allocatedEcShardIds []uint32) { | 
			
		
	
		
			
				
					|  |  |  | 			defer wg.Done() | 
			
		
	
		
			
				
					|  |  |  | 			copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, | 
			
		
	
		
			
				
					|  |  |  | 				allocatedEcShardIds, volumeId, collection, existingLocation.Url) | 
			
		
	
		
			
				
					|  |  |  | 			if copyErr != nil { | 
			
		
	
		
			
				
					|  |  |  | 				err = copyErr | 
			
		
	
		
			
				
					|  |  |  | 			} else { | 
			
		
	
		
			
				
					|  |  |  | 				shardIdChan <- copiedShardIds | 
			
		
	
		
			
				
					|  |  |  | 				server.addEcVolumeShards(volumeId, collection, copiedShardIds) | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		}(server, allocatedEcIds[i]) | 
			
		
	
		
			
				
					|  |  |  | 		if parallelCopy { | 
			
		
	
		
			
				
					|  |  |  | 			go copyFunc(server, allocatedEcIds[i]) | 
			
		
	
		
			
				
					|  |  |  | 		} else { | 
			
		
	
		
			
				
					|  |  |  | 			copyFunc(server, allocatedEcIds[i]) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	wg.Wait() | 
			
		
	
		
			
				
					|  |  |  | 	close(shardIdChan) | 
			
		
	
	
		
			
				
					|  |  | 
 |