You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

382 lines
11 KiB

6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "io"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  24. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  25. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  26. volume.fix.replication -n # do not take action
  27. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  28. Note:
  29. * each time this will only add back one replica for one volume id. If there are multiple replicas
  30. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  31. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  32. to register itself to the master.
  33. `
  34. }
  35. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. if err = commandEnv.confirmIsLocked(); err != nil {
  37. return
  38. }
  39. takeAction := true
  40. if len(args) > 0 && args[0] == "-n" {
  41. takeAction = false
  42. }
  43. var resp *master_pb.VolumeListResponse
  44. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  45. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  46. return err
  47. })
  48. if err != nil {
  49. return err
  50. }
  51. // find all volumes that needs replication
  52. // collect all data nodes
  53. volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
  54. if len(allLocations) == 0 {
  55. return fmt.Errorf("no data nodes at all")
  56. }
  57. // find all under replicated volumes
  58. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  59. for vid, replicas := range volumeReplicas {
  60. replica := replicas[0]
  61. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  62. if replicaPlacement.GetCopyCount() > len(replicas) {
  63. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  64. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  65. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  66. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  67. }
  68. }
  69. if len(overReplicatedVolumeIds) > 0 {
  70. return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
  71. }
  72. if len(underReplicatedVolumeIds) == 0 {
  73. return nil
  74. }
  75. // find the most under populated data nodes
  76. keepDataNodesSorted(allLocations)
  77. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  78. }
  79. func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
  80. volumeReplicas := make(map[uint32][]*VolumeReplica)
  81. var allLocations []location
  82. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  83. loc := newLocation(dc, string(rack), dn)
  84. for _, v := range dn.VolumeInfos {
  85. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  86. location: &loc,
  87. info: v,
  88. })
  89. }
  90. allLocations = append(allLocations, loc)
  91. })
  92. return volumeReplicas, allLocations
  93. }
  94. func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  95. for _, vid := range overReplicatedVolumeIds {
  96. replicas := volumeReplicas[vid]
  97. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  98. replica := pickOneReplicaToDelete(replicas, replicaPlacement)
  99. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  100. if !takeAction {
  101. break
  102. }
  103. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
  104. return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  105. }
  106. }
  107. return nil
  108. }
  109. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  110. for _, vid := range underReplicatedVolumeIds {
  111. replicas := volumeReplicas[vid]
  112. replica := pickOneReplicaToCopyFrom(replicas)
  113. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  114. foundNewLocation := false
  115. for _, dst := range allLocations {
  116. // check whether data nodes satisfy the constraints
  117. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  118. // ask the volume server to replicate the volume
  119. foundNewLocation = true
  120. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  121. if !takeAction {
  122. break
  123. }
  124. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  125. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  126. VolumeId: replica.info.Id,
  127. SourceDataNode: replica.location.dataNode.Id,
  128. })
  129. if replicateErr != nil {
  130. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  131. }
  132. return nil
  133. })
  134. if err != nil {
  135. return err
  136. }
  137. // adjust free volume count
  138. dst.dataNode.FreeVolumeCount--
  139. keepDataNodesSorted(allLocations)
  140. break
  141. }
  142. }
  143. if !foundNewLocation {
  144. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  145. }
  146. }
  147. return nil
  148. }
  149. func keepDataNodesSorted(dataNodes []location) {
  150. sort.Slice(dataNodes, func(i, j int) bool {
  151. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  152. })
  153. }
  154. /*
  155. if on an existing data node {
  156. return false
  157. }
  158. if different from existing dcs {
  159. if lack on different dcs {
  160. return true
  161. }else{
  162. return false
  163. }
  164. }
  165. if not on primary dc {
  166. return false
  167. }
  168. if different from existing racks {
  169. if lack on different racks {
  170. return true
  171. }else{
  172. return false
  173. }
  174. }
  175. if not on primary rack {
  176. return false
  177. }
  178. if lacks on same rack {
  179. return true
  180. } else {
  181. return false
  182. }
  183. */
  184. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  185. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  186. if _, found := existingDataNodes[possibleLocation.String()]; found {
  187. // avoid duplicated volume on the same data node
  188. return false
  189. }
  190. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  191. // ensure data center count is within limit
  192. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  193. // different from existing dcs
  194. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  195. // lack on different dcs
  196. return true
  197. } else {
  198. // adding this would go over the different dcs limit
  199. return false
  200. }
  201. }
  202. // now this is same as one of the existing data center
  203. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  204. // not on one of the primary dcs
  205. return false
  206. }
  207. // now this is one of the primary dcs
  208. primaryDcRacks := make(map[string]int)
  209. for _, replica := range replicas {
  210. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  211. continue
  212. }
  213. primaryDcRacks[replica.location.Rack()] += 1
  214. }
  215. primaryRacks, _ := findTopKeys(primaryDcRacks)
  216. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  217. // ensure rack count is within limit
  218. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  219. // different from existing racks
  220. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  221. // lack on different racks
  222. return true
  223. } else {
  224. // adding this would go over the different racks limit
  225. return false
  226. }
  227. }
  228. // now this is same as one of the existing racks
  229. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  230. // not on the primary rack
  231. return false
  232. }
  233. // now this is on the primary rack
  234. // different from existing data nodes
  235. if sameRackCount < replicaPlacement.SameRackCount+1 {
  236. // lack on same rack
  237. return true
  238. } else {
  239. // adding this would go over the same data node limit
  240. return false
  241. }
  242. }
  243. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  244. for k, c := range m {
  245. if max < c {
  246. topKeys = topKeys[:0]
  247. topKeys = append(topKeys, k)
  248. max = c
  249. } else if max == c {
  250. topKeys = append(topKeys, k)
  251. }
  252. }
  253. return
  254. }
  255. func isAmong(key string, keys []string) bool {
  256. for _, k := range keys {
  257. if k == key {
  258. return true
  259. }
  260. }
  261. return false
  262. }
  263. type VolumeReplica struct {
  264. location *location
  265. info *master_pb.VolumeInformationMessage
  266. }
  267. type location struct {
  268. dc string
  269. rack string
  270. dataNode *master_pb.DataNodeInfo
  271. }
  272. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  273. return location{
  274. dc: dc,
  275. rack: rack,
  276. dataNode: dataNode,
  277. }
  278. }
  279. func (l location) String() string {
  280. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  281. }
  282. func (l location) Rack() string {
  283. return fmt.Sprintf("%s %s", l.dc, l.rack)
  284. }
  285. func (l location) DataCenter() string {
  286. return l.dc
  287. }
  288. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  289. mostRecent := replicas[0]
  290. for _, replica := range replicas {
  291. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  292. mostRecent = replica
  293. }
  294. }
  295. return mostRecent
  296. }
  297. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  298. diffDc = make(map[string]int)
  299. diffRack = make(map[string]int)
  300. diffNode = make(map[string]int)
  301. for _, replica := range replicas {
  302. diffDc[replica.location.DataCenter()] += 1
  303. diffRack[replica.location.Rack()] += 1
  304. diffNode[replica.location.String()] += 1
  305. }
  306. return
  307. }
  308. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  309. allSame := true
  310. oldest := replicas[0]
  311. for _, replica := range replicas {
  312. if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
  313. oldest = replica
  314. allSame = false
  315. }
  316. }
  317. if !allSame {
  318. return oldest
  319. }
  320. // TODO what if all the replicas have the same timestamp?
  321. return oldest
  322. }