You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
7.0 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. "google.golang.org/grpc"
  13. )
  14. func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
  15. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  16. if !applyBalancing {
  17. return nil
  18. }
  19. // ask destination node to copy shard and the ecx file from source node, and mount it
  20. copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
  21. if err != nil {
  22. return err
  23. }
  24. // unmount the to be deleted shards
  25. err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
  26. if err != nil {
  27. return err
  28. }
  29. // ask source node to delete the shard, and maybe the ecx file
  30. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
  31. if err != nil {
  32. return err
  33. }
  34. deleteEcVolumeShards(existingLocation, vid, copiedShardIds)
  35. return nil
  36. }
  37. func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  38. targetServer *EcNode, startFromShardId uint32, shardCount int,
  39. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  40. var shardIdsToCopy []uint32
  41. for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
  42. shardIdsToCopy = append(shardIdsToCopy, shardId)
  43. }
  44. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  45. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  46. if targetServer.info.Id != existingLocation {
  47. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  48. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  49. VolumeId: uint32(volumeId),
  50. Collection: collection,
  51. ShardIds: shardIdsToCopy,
  52. CopyEcxFile: true,
  53. SourceDataNode: existingLocation,
  54. })
  55. if copyErr != nil {
  56. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  57. }
  58. }
  59. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  60. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  61. VolumeId: uint32(volumeId),
  62. Collection: collection,
  63. ShardIds: shardIdsToCopy,
  64. })
  65. if mountErr != nil {
  66. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  67. }
  68. if targetServer.info.Id != existingLocation {
  69. copiedShardIds = shardIdsToCopy
  70. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  71. }
  72. return nil
  73. })
  74. if err != nil {
  75. return
  76. }
  77. return
  78. }
  79. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc, rack string, dn *master_pb.DataNodeInfo)) {
  80. for _, dc := range topo.DataCenterInfos {
  81. for _, rack := range dc.RackInfos {
  82. for _, dn := range rack.DataNodeInfos {
  83. fn(dc.Id, rack.Id, dn)
  84. }
  85. }
  86. }
  87. }
  88. func sortEcNodes(ecNodes []*EcNode) {
  89. sort.Slice(ecNodes, func(i, j int) bool {
  90. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  91. })
  92. }
  93. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  94. for _, ecShardInfo := range ecShardInfos {
  95. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  96. count += shardBits.ShardIdCount()
  97. }
  98. return
  99. }
  100. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
  101. return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
  102. }
  103. type EcNode struct {
  104. info *master_pb.DataNodeInfo
  105. dc string
  106. rack string
  107. freeEcSlot int
  108. }
  109. func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  110. // list all possible locations
  111. var resp *master_pb.VolumeListResponse
  112. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  113. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  114. return err
  115. })
  116. if err != nil {
  117. return nil, 0, err
  118. }
  119. // find out all volume servers with one slot left.
  120. eachDataNode(resp.TopologyInfo, func(dc, rack string, dn *master_pb.DataNodeInfo) {
  121. if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
  122. ecNodes = append(ecNodes, &EcNode{
  123. info: dn,
  124. dc: dc,
  125. rack: rack,
  126. freeEcSlot: int(freeEcSlots),
  127. })
  128. totalFreeEcSlots += freeEcSlots
  129. }
  130. })
  131. sortEcNodes(ecNodes)
  132. return
  133. }
  134. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  135. collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  136. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  137. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  138. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  139. VolumeId: uint32(volumeId),
  140. Collection: collection,
  141. ShardIds: toBeDeletedShardIds,
  142. })
  143. return deleteErr
  144. })
  145. }
  146. func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  147. volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
  148. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  149. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  150. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
  151. VolumeId: uint32(volumeId),
  152. ShardIds: toBeUnmountedhardIds,
  153. })
  154. return deleteErr
  155. })
  156. }
  157. func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  158. collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
  159. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  160. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  161. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  162. VolumeId: uint32(volumeId),
  163. Collection: collection,
  164. ShardIds: toBeMountedhardIds,
  165. })
  166. return mountErr
  167. })
  168. }