You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
6.9 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. "google.golang.org/grpc"
  13. )
  14. func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
  15. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  16. if !applyBalancing {
  17. return nil
  18. }
  19. // ask destination node to copy shard and the ecx file from source node, and mount it
  20. copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
  21. if err != nil {
  22. return err
  23. }
  24. // unmount the to be deleted shards
  25. err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
  26. if err != nil {
  27. return err
  28. }
  29. // ask source node to delete the shard, and maybe the ecx file
  30. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
  31. if err != nil {
  32. return err
  33. }
  34. deleteEcVolumeShards(existingLocation, vid, copiedShardIds)
  35. return nil
  36. }
  37. func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  38. targetServer *EcNode, startFromShardId uint32, shardCount int,
  39. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  40. var shardIdsToCopy []uint32
  41. for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
  42. shardIdsToCopy = append(shardIdsToCopy, shardId)
  43. }
  44. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  45. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  46. if targetServer.info.Id != existingLocation {
  47. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  48. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  49. VolumeId: uint32(volumeId),
  50. Collection: collection,
  51. ShardIds: shardIdsToCopy,
  52. CopyEcxFile: true,
  53. SourceDataNode: existingLocation,
  54. })
  55. if copyErr != nil {
  56. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  57. }
  58. }
  59. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  60. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  61. VolumeId: uint32(volumeId),
  62. Collection: collection,
  63. ShardIds: shardIdsToCopy,
  64. })
  65. if mountErr != nil {
  66. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  67. }
  68. if targetServer.info.Id != existingLocation {
  69. copiedShardIds = shardIdsToCopy
  70. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  71. }
  72. return nil
  73. })
  74. if err != nil {
  75. return
  76. }
  77. return
  78. }
  79. func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) {
  80. for _, dc := range topo.DataCenterInfos {
  81. for _, rack := range dc.RackInfos {
  82. for _, dn := range rack.DataNodeInfos {
  83. fn(dn)
  84. }
  85. }
  86. }
  87. }
  88. func sortEcNodes(ecNodes []*EcNode) {
  89. sort.Slice(ecNodes, func(i, j int) bool {
  90. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  91. })
  92. }
  93. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  94. for _, ecShardInfo := range ecShardInfos {
  95. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  96. count += shardBits.ShardIdCount()
  97. }
  98. return
  99. }
  100. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
  101. return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
  102. }
  103. type EcNode struct {
  104. info *master_pb.DataNodeInfo
  105. freeEcSlot int
  106. }
  107. func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  108. // list all possible locations
  109. var resp *master_pb.VolumeListResponse
  110. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  111. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  112. return err
  113. })
  114. if err != nil {
  115. return nil, 0, err
  116. }
  117. // find out all volume servers with one slot left.
  118. eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) {
  119. if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
  120. ecNodes = append(ecNodes, &EcNode{
  121. info: dn,
  122. freeEcSlot: int(freeEcSlots),
  123. })
  124. totalFreeEcSlots += freeEcSlots
  125. }
  126. })
  127. sortEcNodes(ecNodes)
  128. return
  129. }
  130. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  131. collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  132. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  133. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  134. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  135. VolumeId: uint32(volumeId),
  136. Collection: collection,
  137. ShardIds: toBeDeletedShardIds,
  138. })
  139. return deleteErr
  140. })
  141. }
  142. func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  143. volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
  144. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  145. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  146. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
  147. VolumeId: uint32(volumeId),
  148. ShardIds: toBeUnmountedhardIds,
  149. })
  150. return deleteErr
  151. })
  152. }
  153. func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  154. collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
  155. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  156. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  157. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  158. VolumeId: uint32(volumeId),
  159. Collection: collection,
  160. ShardIds: toBeMountedhardIds,
  161. })
  162. return mountErr
  163. })
  164. }