You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
7.8 KiB

  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. "google.golang.org/grpc"
  16. )
  17. func init() {
  18. commands = append(commands, &commandEcEncode{})
  19. }
  20. type commandEcEncode struct {
  21. }
  22. func (c *commandEcEncode) Name() string {
  23. return "ec.encode"
  24. }
  25. func (c *commandEcEncode) Help() string {
  26. return `apply erasure coding to a volume
  27. This command will:
  28. 1. freeze one volume
  29. 2. apply erasure coding to the volume
  30. 3. move the encoded shards to multiple volume servers
  31. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  32. to lose 4 volume servers.
  33. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  34. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  35. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  36. have 4 corrupted shard files.
  37. `
  38. }
  39. func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  40. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  41. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  42. collection := encodeCommand.String("collection", "", "the collection name")
  43. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  44. if err = encodeCommand.Parse(args); err != nil {
  45. return nil
  46. }
  47. ctx := context.Background()
  48. vid := needle.VolumeId(*volumeId)
  49. // volumeId is provided
  50. if vid != 0 {
  51. return doEcEncode(ctx, commandEnv, *collection, vid)
  52. }
  53. // apply to all volumes in the collection
  54. volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
  55. if err != nil {
  56. return err
  57. }
  58. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  59. for _, vid := range volumeIds {
  60. if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
  61. return err
  62. }
  63. }
  64. return nil
  65. }
  66. func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
  67. // find volume location
  68. locations := commandEnv.masterClient.GetLocations(uint32(vid))
  69. if len(locations) == 0 {
  70. return fmt.Errorf("volume %d not found", vid)
  71. }
  72. // generate ec shards
  73. err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
  74. if err != nil {
  75. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  76. }
  77. // balance the ec shards to current cluster
  78. err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
  79. if err != nil {
  80. return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
  81. }
  82. return nil
  83. }
  84. func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  85. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  86. _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
  87. VolumeId: uint32(volumeId),
  88. Collection: collection,
  89. })
  90. return genErr
  91. })
  92. return err
  93. }
  94. func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
  95. allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
  96. if err != nil {
  97. return err
  98. }
  99. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  100. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  101. }
  102. allocatedDataNodes := allEcNodes
  103. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  104. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  105. }
  106. // calculate how many shards to allocate for these servers
  107. allocated := balancedEcDistribution(allocatedDataNodes)
  108. // ask the data nodes to copy from the source volume server
  109. copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
  110. if err != nil {
  111. return err
  112. }
  113. // unmount the to be deleted shards
  114. err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
  115. if err != nil {
  116. return err
  117. }
  118. // ask the source volume server to clean up copied ec shards
  119. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
  120. if err != nil {
  121. return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
  122. }
  123. // ask the source volume server to delete the original volume
  124. for _, location := range existingLocations {
  125. err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
  126. if err != nil {
  127. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  128. }
  129. }
  130. return err
  131. }
  132. func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  133. targetServers []*EcNode, allocated []int,
  134. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
  135. // parallelize
  136. shardIdChan := make(chan []uint32, len(targetServers))
  137. var wg sync.WaitGroup
  138. startFromShardId := uint32(0)
  139. for i, server := range targetServers {
  140. if allocated[i] <= 0 {
  141. continue
  142. }
  143. wg.Add(1)
  144. go func(server *EcNode, startFromShardId uint32, shardCount int) {
  145. defer wg.Done()
  146. copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
  147. startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
  148. if copyErr != nil {
  149. err = copyErr
  150. } else {
  151. shardIdChan <- copiedShardIds
  152. server.freeEcSlot -= len(copiedShardIds)
  153. }
  154. }(server, startFromShardId, allocated[i])
  155. startFromShardId += uint32(allocated[i])
  156. }
  157. wg.Wait()
  158. close(shardIdChan)
  159. if err != nil {
  160. return nil, err
  161. }
  162. for shardIds := range shardIdChan {
  163. actuallyCopied = append(actuallyCopied, shardIds...)
  164. }
  165. return
  166. }
  167. func balancedEcDistribution(servers []*EcNode) (allocated []int) {
  168. allocated = make([]int, len(servers))
  169. allocatedCount := 0
  170. for allocatedCount < erasure_coding.TotalShardsCount {
  171. for i, server := range servers {
  172. if server.freeEcSlot-allocated[i] > 0 {
  173. allocated[i] += 1
  174. allocatedCount += 1
  175. }
  176. if allocatedCount >= erasure_coding.TotalShardsCount {
  177. break
  178. }
  179. }
  180. }
  181. return allocated
  182. }
  183. func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  184. var resp *master_pb.VolumeListResponse
  185. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  186. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  187. return err
  188. })
  189. if err != nil {
  190. return
  191. }
  192. quietSeconds := int64(quietPeriod / time.Second)
  193. nowUnixSeconds := time.Now().Unix()
  194. fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
  195. vidMap := make(map[uint32]bool)
  196. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  197. for _, r := range dc.RackInfos {
  198. for _, dn := range r.DataNodeInfos {
  199. for _, v := range dn.VolumeInfos {
  200. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  201. vidMap[v.Id] = true
  202. }
  203. }
  204. }
  205. }
  206. }
  207. for vid, _ := range vidMap {
  208. vids = append(vids, needle.VolumeId(vid))
  209. }
  210. return
  211. }