You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
12 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/types"
  6. "math"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "google.golang.org/grpc"
  15. )
  16. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  17. copiedShardIds := []uint32{uint32(shardId)}
  18. if applyBalancing {
  19. // ask destination node to copy shard and the ecx file from source node, and mount it
  20. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
  21. if err != nil {
  22. return err
  23. }
  24. // unmount the to be deleted shards
  25. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
  26. if err != nil {
  27. return err
  28. }
  29. // ask source node to delete the shard, and maybe the ecx file
  30. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
  31. if err != nil {
  32. return err
  33. }
  34. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  35. }
  36. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  37. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  38. return nil
  39. }
  40. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  41. targetServer *EcNode, shardIdsToCopy []uint32,
  42. volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
  43. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  44. err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  45. if targetServer.info.Id != existingLocation {
  46. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  47. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  48. VolumeId: uint32(volumeId),
  49. Collection: collection,
  50. ShardIds: shardIdsToCopy,
  51. CopyEcxFile: true,
  52. CopyEcjFile: true,
  53. CopyVifFile: true,
  54. SourceDataNode: existingLocation,
  55. })
  56. if copyErr != nil {
  57. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  58. }
  59. }
  60. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  61. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  62. VolumeId: uint32(volumeId),
  63. Collection: collection,
  64. ShardIds: shardIdsToCopy,
  65. })
  66. if mountErr != nil {
  67. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  68. }
  69. if targetServer.info.Id != existingLocation {
  70. copiedShardIds = shardIdsToCopy
  71. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  72. }
  73. return nil
  74. })
  75. if err != nil {
  76. return
  77. }
  78. return
  79. }
  80. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
  81. for _, dc := range topo.DataCenterInfos {
  82. for _, rack := range dc.RackInfos {
  83. for _, dn := range rack.DataNodeInfos {
  84. fn(dc.Id, RackId(rack.Id), dn)
  85. }
  86. }
  87. }
  88. }
  89. func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
  90. sort.Slice(ecNodes, func(i, j int) bool {
  91. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  92. })
  93. }
  94. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  95. sort.Slice(ecNodes, func(i, j int) bool {
  96. return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
  97. })
  98. }
  99. type CandidateEcNode struct {
  100. ecNode *EcNode
  101. shardCount int
  102. }
  103. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  104. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  105. for i := index - 1; i >= 0; i-- {
  106. if lessThan(i+1, i) {
  107. swap(data, i, i+1)
  108. } else {
  109. break
  110. }
  111. }
  112. for i := index + 1; i < len(data); i++ {
  113. if lessThan(i, i-1) {
  114. swap(data, i, i-1)
  115. } else {
  116. break
  117. }
  118. }
  119. }
  120. func swap(data []*CandidateEcNode, i, j int) {
  121. t := data[i]
  122. data[i] = data[j]
  123. data[j] = t
  124. }
  125. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  126. for _, ecShardInfo := range ecShardInfos {
  127. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  128. count += shardBits.ShardIdCount()
  129. }
  130. return
  131. }
  132. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  133. if dn.DiskInfos == nil {
  134. return 0
  135. }
  136. diskInfo := dn.DiskInfos[string(diskType)]
  137. if diskInfo == nil {
  138. return 0
  139. }
  140. return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  141. }
  142. type RackId string
  143. type EcNodeId string
  144. type EcNode struct {
  145. info *master_pb.DataNodeInfo
  146. dc string
  147. rack RackId
  148. freeEcSlot int
  149. }
  150. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  151. for _, diskInfo := range ecNode.info.DiskInfos {
  152. for _, ecShardInfo := range diskInfo.EcShardInfos {
  153. if vid == ecShardInfo.Id {
  154. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  155. return shardBits.ShardIdCount()
  156. }
  157. }
  158. }
  159. return 0
  160. }
  161. type EcRack struct {
  162. ecNodes map[EcNodeId]*EcNode
  163. freeEcSlot int
  164. }
  165. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  166. // list all possible locations
  167. // collect topology information
  168. topologyInfo, _, err := collectTopologyInfo(commandEnv)
  169. if err != nil {
  170. return
  171. }
  172. // find out all volume servers with one slot left.
  173. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  174. sortEcNodesByFreeslotsDecending(ecNodes)
  175. return
  176. }
  177. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  178. eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  179. if selectedDataCenter != "" && selectedDataCenter != dc {
  180. return
  181. }
  182. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  183. ecNodes = append(ecNodes, &EcNode{
  184. info: dn,
  185. dc: dc,
  186. rack: rack,
  187. freeEcSlot: int(freeEcSlots),
  188. })
  189. totalFreeEcSlots += freeEcSlots
  190. })
  191. return
  192. }
  193. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
  194. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  195. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  196. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  197. VolumeId: uint32(volumeId),
  198. Collection: collection,
  199. ShardIds: toBeDeletedShardIds,
  200. })
  201. return deleteErr
  202. })
  203. }
  204. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
  205. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  206. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  207. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  208. VolumeId: uint32(volumeId),
  209. ShardIds: toBeUnmountedhardIds,
  210. })
  211. return deleteErr
  212. })
  213. }
  214. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
  215. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  216. return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  217. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  218. VolumeId: uint32(volumeId),
  219. Collection: collection,
  220. ShardIds: toBeMountedhardIds,
  221. })
  222. return mountErr
  223. })
  224. }
  225. func divide(total, n int) float64 {
  226. return float64(total) / float64(n)
  227. }
  228. func ceilDivide(total, n int) int {
  229. return int(math.Ceil(float64(total) / float64(n)))
  230. }
  231. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  232. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  233. for _, shardInfo := range diskInfo.EcShardInfos {
  234. if needle.VolumeId(shardInfo.Id) == vid {
  235. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  236. }
  237. }
  238. }
  239. return 0
  240. }
  241. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  242. foundVolume := false
  243. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  244. if found {
  245. for _, shardInfo := range diskInfo.EcShardInfos {
  246. if needle.VolumeId(shardInfo.Id) == vid {
  247. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  248. newShardBits := oldShardBits
  249. for _, shardId := range shardIds {
  250. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  251. }
  252. shardInfo.EcIndexBits = uint32(newShardBits)
  253. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  254. foundVolume = true
  255. break
  256. }
  257. }
  258. } else {
  259. diskInfo = &master_pb.DiskInfo{
  260. Type: string(types.HardDriveType),
  261. }
  262. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  263. }
  264. if !foundVolume {
  265. var newShardBits erasure_coding.ShardBits
  266. for _, shardId := range shardIds {
  267. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  268. }
  269. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  270. Id: uint32(vid),
  271. Collection: collection,
  272. EcIndexBits: uint32(newShardBits),
  273. DiskType: string(types.HardDriveType),
  274. })
  275. ecNode.freeEcSlot -= len(shardIds)
  276. }
  277. return ecNode
  278. }
  279. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  280. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  281. for _, shardInfo := range diskInfo.EcShardInfos {
  282. if needle.VolumeId(shardInfo.Id) == vid {
  283. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  284. newShardBits := oldShardBits
  285. for _, shardId := range shardIds {
  286. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  287. }
  288. shardInfo.EcIndexBits = uint32(newShardBits)
  289. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  290. }
  291. }
  292. }
  293. return ecNode
  294. }
  295. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  296. countMap := make(map[string]int)
  297. for _, d := range data {
  298. id, count := identifierFn(d)
  299. countMap[id] += count
  300. }
  301. return countMap
  302. }
  303. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  304. groupMap := make(map[string][]*EcNode)
  305. for _, d := range data {
  306. id := identifierFn(d)
  307. groupMap[id] = append(groupMap[id], d)
  308. }
  309. return groupMap
  310. }