You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
11 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/types"
  6. "math"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "google.golang.org/grpc"
  15. )
  16. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  17. copiedShardIds := []uint32{uint32(shardId)}
  18. if applyBalancing {
  19. // ask destination node to copy shard and the ecx file from source node, and mount it
  20. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
  21. if err != nil {
  22. return err
  23. }
  24. // unmount the to be deleted shards
  25. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
  26. if err != nil {
  27. return err
  28. }
  29. // ask source node to delete the shard, and maybe the ecx file
  30. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
  31. if err != nil {
  32. return err
  33. }
  34. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  35. }
  36. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  37. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  38. return nil
  39. }
  40. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  41. targetServer *EcNode, shardIdsToCopy []uint32,
  42. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  43. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  44. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  45. if targetServer.info.Id != existingLocation {
  46. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  47. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  48. VolumeId: uint32(volumeId),
  49. Collection: collection,
  50. ShardIds: shardIdsToCopy,
  51. CopyEcxFile: true,
  52. CopyEcjFile: true,
  53. CopyVifFile: true,
  54. SourceDataNode: existingLocation,
  55. })
  56. if copyErr != nil {
  57. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  58. }
  59. }
  60. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  61. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  62. VolumeId: uint32(volumeId),
  63. Collection: collection,
  64. ShardIds: shardIdsToCopy,
  65. })
  66. if mountErr != nil {
  67. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  68. }
  69. if targetServer.info.Id != existingLocation {
  70. copiedShardIds = shardIdsToCopy
  71. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  72. }
  73. return nil
  74. })
  75. if err != nil {
  76. return
  77. }
  78. return
  79. }
  80. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
  81. for _, dc := range topo.DataCenterInfos {
  82. for _, rack := range dc.RackInfos {
  83. for _, dn := range rack.DataNodeInfos {
  84. fn(dc.Id, RackId(rack.Id), dn)
  85. }
  86. }
  87. }
  88. }
  89. func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
  90. sort.Slice(ecNodes, func(i, j int) bool {
  91. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  92. })
  93. }
  94. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  95. sort.Slice(ecNodes, func(i, j int) bool {
  96. return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
  97. })
  98. }
  99. type CandidateEcNode struct {
  100. ecNode *EcNode
  101. shardCount int
  102. }
  103. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  104. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  105. for i := index - 1; i >= 0; i-- {
  106. if lessThan(i+1, i) {
  107. swap(data, i, i+1)
  108. } else {
  109. break
  110. }
  111. }
  112. for i := index + 1; i < len(data); i++ {
  113. if lessThan(i, i-1) {
  114. swap(data, i, i-1)
  115. } else {
  116. break
  117. }
  118. }
  119. }
  120. func swap(data []*CandidateEcNode, i, j int) {
  121. t := data[i]
  122. data[i] = data[j]
  123. data[j] = t
  124. }
  125. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  126. for _, ecShardInfo := range ecShardInfos {
  127. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  128. count += shardBits.ShardIdCount()
  129. }
  130. return
  131. }
  132. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  133. if dn.DiskInfos == nil {
  134. return 0
  135. }
  136. diskInfo := dn.DiskInfos[string(diskType)]
  137. if diskInfo == nil {
  138. return 0
  139. }
  140. return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  141. }
  142. type RackId string
  143. type EcNodeId string
  144. type EcNode struct {
  145. info *master_pb.DataNodeInfo
  146. dc string
  147. rack RackId
  148. freeEcSlot int
  149. }
  150. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  151. for _, diskInfo := range ecNode.info.DiskInfos {
  152. for _, ecShardInfo := range diskInfo.EcShardInfos {
  153. if vid == ecShardInfo.Id {
  154. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  155. return shardBits.ShardIdCount()
  156. }
  157. }
  158. }
  159. return 0
  160. }
  161. type EcRack struct {
  162. ecNodes map[EcNodeId]*EcNode
  163. freeEcSlot int
  164. }
  165. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  166. // list all possible locations
  167. var resp *master_pb.VolumeListResponse
  168. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  169. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  170. return err
  171. })
  172. if err != nil {
  173. return nil, 0, err
  174. }
  175. // find out all volume servers with one slot left.
  176. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter)
  177. sortEcNodesByFreeslotsDecending(ecNodes)
  178. return
  179. }
  180. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  181. eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  182. if selectedDataCenter != "" && selectedDataCenter != dc {
  183. return
  184. }
  185. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  186. ecNodes = append(ecNodes, &EcNode{
  187. info: dn,
  188. dc: dc,
  189. rack: rack,
  190. freeEcSlot: int(freeEcSlots),
  191. })
  192. totalFreeEcSlots += freeEcSlots
  193. })
  194. return
  195. }
  196. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  197. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  198. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  199. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  200. VolumeId: uint32(volumeId),
  201. Collection: collection,
  202. ShardIds: toBeDeletedShardIds,
  203. })
  204. return deleteErr
  205. })
  206. }
  207. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
  208. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  209. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  210. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  211. VolumeId: uint32(volumeId),
  212. ShardIds: toBeUnmountedhardIds,
  213. })
  214. return deleteErr
  215. })
  216. }
  217. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
  218. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  219. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  220. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  221. VolumeId: uint32(volumeId),
  222. Collection: collection,
  223. ShardIds: toBeMountedhardIds,
  224. })
  225. return mountErr
  226. })
  227. }
  228. func divide(total, n int) float64 {
  229. return float64(total) / float64(n)
  230. }
  231. func ceilDivide(total, n int) int {
  232. return int(math.Ceil(float64(total) / float64(n)))
  233. }
  234. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  235. diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
  236. for _, shardInfo := range diskInfo.EcShardInfos {
  237. if needle.VolumeId(shardInfo.Id) == vid {
  238. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  239. }
  240. }
  241. return 0
  242. }
  243. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  244. foundVolume := false
  245. diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
  246. for _, shardInfo := range diskInfo.EcShardInfos {
  247. if needle.VolumeId(shardInfo.Id) == vid {
  248. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  249. newShardBits := oldShardBits
  250. for _, shardId := range shardIds {
  251. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  252. }
  253. shardInfo.EcIndexBits = uint32(newShardBits)
  254. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  255. foundVolume = true
  256. break
  257. }
  258. }
  259. if !foundVolume {
  260. var newShardBits erasure_coding.ShardBits
  261. for _, shardId := range shardIds {
  262. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  263. }
  264. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  265. Id: uint32(vid),
  266. Collection: collection,
  267. EcIndexBits: uint32(newShardBits),
  268. DiskType: string(types.HardDriveType),
  269. })
  270. ecNode.freeEcSlot -= len(shardIds)
  271. }
  272. return ecNode
  273. }
  274. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  275. diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
  276. for _, shardInfo := range diskInfo.EcShardInfos {
  277. if needle.VolumeId(shardInfo.Id) == vid {
  278. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  279. newShardBits := oldShardBits
  280. for _, shardId := range shardIds {
  281. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  282. }
  283. shardInfo.EcIndexBits = uint32(newShardBits)
  284. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  285. }
  286. }
  287. return ecNode
  288. }
  289. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  290. countMap := make(map[string]int)
  291. for _, d := range data {
  292. id, count := identifierFn(d)
  293. countMap[id] += count
  294. }
  295. return countMap
  296. }
  297. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  298. groupMap := make(map[string][]*EcNode)
  299. for _, d := range data {
  300. id := identifierFn(d)
  301. groupMap[id] = append(groupMap[id], d)
  302. }
  303. return groupMap
  304. }