You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
12 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "math"
  8. "sort"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "google.golang.org/grpc"
  16. )
  17. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  18. copiedShardIds := []uint32{uint32(shardId)}
  19. if applyBalancing {
  20. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  21. // ask destination node to copy shard and the ecx file from source node, and mount it
  22. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  23. if err != nil {
  24. return err
  25. }
  26. // unmount the to be deleted shards
  27. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  28. if err != nil {
  29. return err
  30. }
  31. // ask source node to delete the shard, and maybe the ecx file
  32. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  33. if err != nil {
  34. return err
  35. }
  36. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  37. }
  38. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  39. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  40. return nil
  41. }
  42. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  43. targetServer *EcNode, shardIdsToCopy []uint32,
  44. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  45. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  46. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  47. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  48. if targetAddress != existingLocation {
  49. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  50. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  51. VolumeId: uint32(volumeId),
  52. Collection: collection,
  53. ShardIds: shardIdsToCopy,
  54. CopyEcxFile: true,
  55. CopyEcjFile: true,
  56. CopyVifFile: true,
  57. SourceDataNode: string(existingLocation),
  58. })
  59. if copyErr != nil {
  60. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  61. }
  62. }
  63. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  64. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  65. VolumeId: uint32(volumeId),
  66. Collection: collection,
  67. ShardIds: shardIdsToCopy,
  68. })
  69. if mountErr != nil {
  70. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  71. }
  72. if targetAddress != existingLocation {
  73. copiedShardIds = shardIdsToCopy
  74. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  75. }
  76. return nil
  77. })
  78. if err != nil {
  79. return
  80. }
  81. return
  82. }
  83. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
  84. for _, dc := range topo.DataCenterInfos {
  85. for _, rack := range dc.RackInfos {
  86. for _, dn := range rack.DataNodeInfos {
  87. fn(dc.Id, RackId(rack.Id), dn)
  88. }
  89. }
  90. }
  91. }
  92. func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
  93. sort.Slice(ecNodes, func(i, j int) bool {
  94. return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
  95. })
  96. }
  97. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  98. sort.Slice(ecNodes, func(i, j int) bool {
  99. return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
  100. })
  101. }
  102. type CandidateEcNode struct {
  103. ecNode *EcNode
  104. shardCount int
  105. }
  106. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  107. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  108. for i := index - 1; i >= 0; i-- {
  109. if lessThan(i+1, i) {
  110. swap(data, i, i+1)
  111. } else {
  112. break
  113. }
  114. }
  115. for i := index + 1; i < len(data); i++ {
  116. if lessThan(i, i-1) {
  117. swap(data, i, i-1)
  118. } else {
  119. break
  120. }
  121. }
  122. }
  123. func swap(data []*CandidateEcNode, i, j int) {
  124. t := data[i]
  125. data[i] = data[j]
  126. data[j] = t
  127. }
  128. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  129. for _, ecShardInfo := range ecShardInfos {
  130. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  131. count += shardBits.ShardIdCount()
  132. }
  133. return
  134. }
  135. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  136. if dn.DiskInfos == nil {
  137. return 0
  138. }
  139. diskInfo := dn.DiskInfos[string(diskType)]
  140. if diskInfo == nil {
  141. return 0
  142. }
  143. return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  144. }
  145. type RackId string
  146. type EcNodeId string
  147. type EcNode struct {
  148. info *master_pb.DataNodeInfo
  149. dc string
  150. rack RackId
  151. freeEcSlot int
  152. }
  153. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  154. for _, diskInfo := range ecNode.info.DiskInfos {
  155. for _, ecShardInfo := range diskInfo.EcShardInfos {
  156. if vid == ecShardInfo.Id {
  157. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  158. return shardBits.ShardIdCount()
  159. }
  160. }
  161. }
  162. return 0
  163. }
  164. type EcRack struct {
  165. ecNodes map[EcNodeId]*EcNode
  166. freeEcSlot int
  167. }
  168. func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  169. // list all possible locations
  170. // collect topology information
  171. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  172. if err != nil {
  173. return
  174. }
  175. // find out all volume servers with one slot left.
  176. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  177. sortEcNodesByFreeslotsDecending(ecNodes)
  178. return
  179. }
  180. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  181. eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  182. if selectedDataCenter != "" && selectedDataCenter != dc {
  183. return
  184. }
  185. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  186. ecNodes = append(ecNodes, &EcNode{
  187. info: dn,
  188. dc: dc,
  189. rack: rack,
  190. freeEcSlot: int(freeEcSlots),
  191. })
  192. totalFreeEcSlots += freeEcSlots
  193. })
  194. return
  195. }
  196. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  197. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  198. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  199. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  200. VolumeId: uint32(volumeId),
  201. Collection: collection,
  202. ShardIds: toBeDeletedShardIds,
  203. })
  204. return deleteErr
  205. })
  206. }
  207. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  208. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  209. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  210. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  211. VolumeId: uint32(volumeId),
  212. ShardIds: toBeUnmountedhardIds,
  213. })
  214. return deleteErr
  215. })
  216. }
  217. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  218. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  219. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  220. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  221. VolumeId: uint32(volumeId),
  222. Collection: collection,
  223. ShardIds: toBeMountedhardIds,
  224. })
  225. return mountErr
  226. })
  227. }
  228. func divide(total, n int) float64 {
  229. return float64(total) / float64(n)
  230. }
  231. func ceilDivide(total, n int) int {
  232. return int(math.Ceil(float64(total) / float64(n)))
  233. }
  234. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  235. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  236. for _, shardInfo := range diskInfo.EcShardInfos {
  237. if needle.VolumeId(shardInfo.Id) == vid {
  238. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  239. }
  240. }
  241. }
  242. return 0
  243. }
  244. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  245. foundVolume := false
  246. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  247. if found {
  248. for _, shardInfo := range diskInfo.EcShardInfos {
  249. if needle.VolumeId(shardInfo.Id) == vid {
  250. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  251. newShardBits := oldShardBits
  252. for _, shardId := range shardIds {
  253. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  254. }
  255. shardInfo.EcIndexBits = uint32(newShardBits)
  256. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  257. foundVolume = true
  258. break
  259. }
  260. }
  261. } else {
  262. diskInfo = &master_pb.DiskInfo{
  263. Type: string(types.HardDriveType),
  264. }
  265. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  266. }
  267. if !foundVolume {
  268. var newShardBits erasure_coding.ShardBits
  269. for _, shardId := range shardIds {
  270. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  271. }
  272. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  273. Id: uint32(vid),
  274. Collection: collection,
  275. EcIndexBits: uint32(newShardBits),
  276. DiskType: string(types.HardDriveType),
  277. })
  278. ecNode.freeEcSlot -= len(shardIds)
  279. }
  280. return ecNode
  281. }
  282. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  283. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  284. for _, shardInfo := range diskInfo.EcShardInfos {
  285. if needle.VolumeId(shardInfo.Id) == vid {
  286. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  287. newShardBits := oldShardBits
  288. for _, shardId := range shardIds {
  289. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  290. }
  291. shardInfo.EcIndexBits = uint32(newShardBits)
  292. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  293. }
  294. }
  295. }
  296. return ecNode
  297. }
  298. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  299. countMap := make(map[string]int)
  300. for _, d := range data {
  301. id, count := identifierFn(d)
  302. countMap[id] += count
  303. }
  304. return countMap
  305. }
  306. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  307. groupMap := make(map[string][]*EcNode)
  308. for _, d := range data {
  309. id := identifierFn(d)
  310. groupMap[id] = append(groupMap[id], d)
  311. }
  312. return groupMap
  313. }