You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
11 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "sort"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. "google.golang.org/grpc"
  14. )
  15. func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  16. copiedShardIds := []uint32{uint32(shardId)}
  17. if applyBalancing {
  18. // ask destination node to copy shard and the ecx file from source node, and mount it
  19. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
  20. if err != nil {
  21. return err
  22. }
  23. // unmount the to be deleted shards
  24. err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
  25. if err != nil {
  26. return err
  27. }
  28. // ask source node to delete the shard, and maybe the ecx file
  29. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
  30. if err != nil {
  31. return err
  32. }
  33. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  34. }
  35. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  36. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  37. return nil
  38. }
  39. func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  40. targetServer *EcNode, startFromShardId uint32, shardCount int,
  41. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  42. var shardIdsToCopy []uint32
  43. for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
  44. shardIdsToCopy = append(shardIdsToCopy, shardId)
  45. }
  46. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  47. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  48. if targetServer.info.Id != existingLocation {
  49. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  50. _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
  51. VolumeId: uint32(volumeId),
  52. Collection: collection,
  53. ShardIds: shardIdsToCopy,
  54. CopyEcxFile: true,
  55. CopyEcjFile: true,
  56. SourceDataNode: existingLocation,
  57. })
  58. if copyErr != nil {
  59. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  60. }
  61. }
  62. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  63. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  64. VolumeId: uint32(volumeId),
  65. Collection: collection,
  66. ShardIds: shardIdsToCopy,
  67. })
  68. if mountErr != nil {
  69. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  70. }
  71. if targetServer.info.Id != existingLocation {
  72. copiedShardIds = shardIdsToCopy
  73. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  74. }
  75. return nil
  76. })
  77. if err != nil {
  78. return
  79. }
  80. return
  81. }
  82. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
  83. for _, dc := range topo.DataCenterInfos {
  84. for _, rack := range dc.RackInfos {
  85. for _, dn := range rack.DataNodeInfos {
  86. fn(dc.Id, RackId(rack.Id), dn)
  87. }
  88. }
  89. }
  90. }
  91. func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
  92. sort.Slice(ecNodes, func(i, j int) bool {
  93. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  94. })
  95. }
  96. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  97. sort.Slice(ecNodes, func(i, j int) bool {
  98. return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
  99. })
  100. }
  101. type CandidateEcNode struct {
  102. ecNode *EcNode
  103. shardCount int
  104. }
  105. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  106. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  107. for i := index - 1; i >= 0; i-- {
  108. if lessThan(i+1, i) {
  109. swap(data, i, i+1)
  110. } else {
  111. break
  112. }
  113. }
  114. for i := index + 1; i < len(data); i++ {
  115. if lessThan(i, i-1) {
  116. swap(data, i, i-1)
  117. } else {
  118. break
  119. }
  120. }
  121. }
  122. func swap(data []*CandidateEcNode, i, j int) {
  123. t := data[i]
  124. data[i] = data[j]
  125. data[j] = t
  126. }
  127. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  128. for _, ecShardInfo := range ecShardInfos {
  129. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  130. count += shardBits.ShardIdCount()
  131. }
  132. return
  133. }
  134. func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
  135. return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos)
  136. }
  137. type RackId string
  138. type EcNodeId string
  139. type EcNode struct {
  140. info *master_pb.DataNodeInfo
  141. dc string
  142. rack RackId
  143. freeEcSlot int
  144. }
  145. type EcRack struct {
  146. ecNodes map[EcNodeId]*EcNode
  147. freeEcSlot int
  148. }
  149. func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  150. // list all possible locations
  151. var resp *master_pb.VolumeListResponse
  152. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  153. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  154. return err
  155. })
  156. if err != nil {
  157. return nil, 0, err
  158. }
  159. // find out all volume servers with one slot left.
  160. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  161. if selectedDataCenter != "" && selectedDataCenter != dc {
  162. return
  163. }
  164. freeEcSlots := countFreeShardSlots(dn)
  165. ecNodes = append(ecNodes, &EcNode{
  166. info: dn,
  167. dc: dc,
  168. rack: rack,
  169. freeEcSlot: int(freeEcSlots),
  170. })
  171. totalFreeEcSlots += freeEcSlots
  172. })
  173. sortEcNodesByFreeslotsDecending(ecNodes)
  174. return
  175. }
  176. func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  177. collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  178. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  179. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  180. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
  181. VolumeId: uint32(volumeId),
  182. Collection: collection,
  183. ShardIds: toBeDeletedShardIds,
  184. })
  185. return deleteErr
  186. })
  187. }
  188. func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  189. volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
  190. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  191. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  192. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
  193. VolumeId: uint32(volumeId),
  194. ShardIds: toBeUnmountedhardIds,
  195. })
  196. return deleteErr
  197. })
  198. }
  199. func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
  200. collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
  201. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  202. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  203. _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
  204. VolumeId: uint32(volumeId),
  205. Collection: collection,
  206. ShardIds: toBeMountedhardIds,
  207. })
  208. return mountErr
  209. })
  210. }
  211. func ceilDivide(total, n int) int {
  212. return int(math.Ceil(float64(total) / float64(n)))
  213. }
  214. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  215. for _, shardInfo := range ecNode.info.EcShardInfos {
  216. if needle.VolumeId(shardInfo.Id) == vid {
  217. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  218. }
  219. }
  220. return 0
  221. }
  222. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  223. foundVolume := false
  224. for _, shardInfo := range ecNode.info.EcShardInfos {
  225. if needle.VolumeId(shardInfo.Id) == vid {
  226. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  227. newShardBits := oldShardBits
  228. for _, shardId := range shardIds {
  229. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  230. }
  231. shardInfo.EcIndexBits = uint32(newShardBits)
  232. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  233. foundVolume = true
  234. break
  235. }
  236. }
  237. if !foundVolume {
  238. var newShardBits erasure_coding.ShardBits
  239. for _, shardId := range shardIds {
  240. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  241. }
  242. ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  243. Id: uint32(vid),
  244. Collection: collection,
  245. EcIndexBits: uint32(newShardBits),
  246. })
  247. ecNode.freeEcSlot -= len(shardIds)
  248. }
  249. return ecNode
  250. }
  251. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  252. for _, shardInfo := range ecNode.info.EcShardInfos {
  253. if needle.VolumeId(shardInfo.Id) == vid {
  254. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  255. newShardBits := oldShardBits
  256. for _, shardId := range shardIds {
  257. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  258. }
  259. shardInfo.EcIndexBits = uint32(newShardBits)
  260. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  261. }
  262. }
  263. return ecNode
  264. }
  265. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  266. countMap := make(map[string]int)
  267. for _, d := range data {
  268. id, count := identifierFn(d)
  269. countMap[id] += count
  270. }
  271. return countMap
  272. }
  273. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  274. groupMap := make(map[string][]*EcNode)
  275. for _, d := range data {
  276. id := identifierFn(d)
  277. groupMap[id] = append(groupMap[id], d)
  278. }
  279. return groupMap
  280. }