You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

319 lines
9.0 KiB

6 years ago
4 years ago
6 years ago
4 years ago
6 years ago
6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math/rand"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command finds all under-replicated volumes, and finds volume servers with free slots.
  24. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  25. volume.fix.replication -n # do not take action
  26. volume.fix.replication # actually copying the volume files and mount the volume
  27. Note:
  28. * each time this will only add back one replica for one volume id. If there are multiple replicas
  29. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  30. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  31. to register itself to the master.
  32. `
  33. }
  34. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. if err = commandEnv.confirmIsLocked(); err != nil {
  36. return
  37. }
  38. takeAction := true
  39. if len(args) > 0 && args[0] == "-n" {
  40. takeAction = false
  41. }
  42. var resp *master_pb.VolumeListResponse
  43. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  44. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  45. return err
  46. })
  47. if err != nil {
  48. return err
  49. }
  50. // find all volumes that needs replication
  51. // collect all data nodes
  52. volumeReplicas := make(map[uint32][]*VolumeReplica)
  53. var allLocations []location
  54. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  55. loc := newLocation(dc, string(rack), dn)
  56. for _, v := range dn.VolumeInfos {
  57. if v.ReplicaPlacement > 0 {
  58. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  59. location: &loc,
  60. info: v,
  61. })
  62. }
  63. }
  64. allLocations = append(allLocations, loc)
  65. })
  66. // find all under replicated volumes
  67. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  68. for vid, replicas := range volumeReplicas {
  69. replica := replicas[rand.Intn(len(replicas))]
  70. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  71. if replicaPlacement.GetCopyCount() > len(replicas) {
  72. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  73. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  74. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  75. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  76. }
  77. }
  78. if len(underReplicatedVolumeIds) == 0 {
  79. return fmt.Errorf("no under replicated volumes")
  80. }
  81. if len(allLocations) == 0 {
  82. return fmt.Errorf("no data nodes at all")
  83. }
  84. // find the most under populated data nodes
  85. keepDataNodesSorted(allLocations)
  86. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  87. }
  88. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  89. for _, vid := range underReplicatedVolumeIds {
  90. replicas := volumeReplicas[vid]
  91. replica := replicas[rand.Intn(len(replicas))]
  92. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  93. foundNewLocation := false
  94. for _, dst := range allLocations {
  95. // check whether data nodes satisfy the constraints
  96. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  97. // ask the volume server to replicate the volume
  98. foundNewLocation = true
  99. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  100. if !takeAction {
  101. break
  102. }
  103. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  104. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  105. VolumeId: replica.info.Id,
  106. SourceDataNode: replica.location.dataNode.Id,
  107. })
  108. if replicateErr != nil {
  109. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  110. }
  111. return nil
  112. })
  113. if err != nil {
  114. return err
  115. }
  116. // adjust free volume count
  117. dst.dataNode.FreeVolumeCount--
  118. keepDataNodesSorted(allLocations)
  119. break
  120. }
  121. }
  122. if !foundNewLocation {
  123. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  124. }
  125. }
  126. return nil
  127. }
  128. func keepDataNodesSorted(dataNodes []location) {
  129. sort.Slice(dataNodes, func(i, j int) bool {
  130. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  131. })
  132. }
  133. /*
  134. if on an existing data node {
  135. return false
  136. }
  137. if different from existing dcs {
  138. if lack on different dcs {
  139. return true
  140. }else{
  141. return false
  142. }
  143. }
  144. if not on primary dc {
  145. return false
  146. }
  147. if different from existing racks {
  148. if lack on different racks {
  149. return true
  150. }else{
  151. return false
  152. }
  153. }
  154. if not on primary rack {
  155. return false
  156. }
  157. if lacks on same rack {
  158. return true
  159. } else {
  160. return false
  161. }
  162. */
  163. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  164. existingDataNodes := make(map[string]int)
  165. for _, replica := range replicas {
  166. existingDataNodes[replica.location.String()] += 1
  167. }
  168. sameDataNodeCount := existingDataNodes[possibleLocation.String()]
  169. // avoid duplicated volume on the same data node
  170. if sameDataNodeCount > 0 {
  171. return false
  172. }
  173. existingDataCenters := make(map[string]int)
  174. for _, replica := range replicas {
  175. existingDataCenters[replica.location.DataCenter()] += 1
  176. }
  177. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  178. // ensure data center count is within limit
  179. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  180. // different from existing dcs
  181. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  182. // lack on different dcs
  183. return true
  184. } else {
  185. // adding this would go over the different dcs limit
  186. return false
  187. }
  188. }
  189. // now this is same as one of the existing data center
  190. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  191. // not on one of the primary dcs
  192. return false
  193. }
  194. // now this is one of the primary dcs
  195. existingRacks := make(map[string]int)
  196. for _, replica := range replicas {
  197. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  198. continue
  199. }
  200. existingRacks[replica.location.Rack()] += 1
  201. }
  202. primaryRacks, _ := findTopKeys(existingRacks)
  203. sameRackCount := existingRacks[possibleLocation.Rack()]
  204. // ensure rack count is within limit
  205. if _, found := existingRacks[possibleLocation.Rack()]; !found {
  206. // different from existing racks
  207. if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
  208. // lack on different racks
  209. return true
  210. } else {
  211. // adding this would go over the different racks limit
  212. return false
  213. }
  214. }
  215. // now this is same as one of the existing racks
  216. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  217. // not on the primary rack
  218. return false
  219. }
  220. // now this is on the primary rack
  221. // different from existing data nodes
  222. if sameRackCount < replicaPlacement.SameRackCount+1 {
  223. // lack on same rack
  224. return true
  225. } else {
  226. // adding this would go over the same data node limit
  227. return false
  228. }
  229. }
  230. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  231. for k, c := range m {
  232. if max < c {
  233. topKeys = topKeys[:0]
  234. topKeys = append(topKeys, k)
  235. max = c
  236. } else if max == c {
  237. topKeys = append(topKeys, k)
  238. }
  239. }
  240. return
  241. }
  242. func isAmong(key string, keys []string) bool {
  243. for _, k := range keys {
  244. if k == key {
  245. return true
  246. }
  247. }
  248. return false
  249. }
  250. type VolumeReplica struct {
  251. location *location
  252. info *master_pb.VolumeInformationMessage
  253. }
  254. type location struct {
  255. dc string
  256. rack string
  257. dataNode *master_pb.DataNodeInfo
  258. }
  259. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  260. return location{
  261. dc: dc,
  262. rack: rack,
  263. dataNode: dataNode,
  264. }
  265. }
  266. func (l location) String() string {
  267. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  268. }
  269. func (l location) Rack() string {
  270. return fmt.Sprintf("%s %s", l.dc, l.rack)
  271. }
  272. func (l location) DataCenter() string {
  273. return l.dc
  274. }