You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

377 lines
11 KiB

6 years ago
4 years ago
6 years ago
4 years ago
6 years ago
6 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "io"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  24. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  25. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  26. volume.fix.replication -n # do not take action
  27. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  28. Note:
  29. * each time this will only add back one replica for one volume id. If there are multiple replicas
  30. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  31. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  32. to register itself to the master.
  33. `
  34. }
  35. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. if err = commandEnv.confirmIsLocked(); err != nil {
  37. return
  38. }
  39. takeAction := true
  40. if len(args) > 0 && args[0] == "-n" {
  41. takeAction = false
  42. }
  43. var resp *master_pb.VolumeListResponse
  44. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  45. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  46. return err
  47. })
  48. if err != nil {
  49. return err
  50. }
  51. // find all volumes that needs replication
  52. // collect all data nodes
  53. volumeReplicas := make(map[uint32][]*VolumeReplica)
  54. var allLocations []location
  55. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  56. loc := newLocation(dc, string(rack), dn)
  57. for _, v := range dn.VolumeInfos {
  58. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  59. location: &loc,
  60. info: v,
  61. })
  62. }
  63. allLocations = append(allLocations, loc)
  64. })
  65. if len(allLocations) == 0 {
  66. return fmt.Errorf("no data nodes at all")
  67. }
  68. // find all under replicated volumes
  69. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  70. for vid, replicas := range volumeReplicas {
  71. replica := replicas[0]
  72. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  73. if replicaPlacement.GetCopyCount() > len(replicas) {
  74. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  75. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  76. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  77. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  78. }
  79. }
  80. if len(overReplicatedVolumeIds) > 0 {
  81. return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
  82. }
  83. if len(underReplicatedVolumeIds) == 0 {
  84. return nil
  85. }
  86. // find the most under populated data nodes
  87. keepDataNodesSorted(allLocations)
  88. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  89. }
  90. func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  91. for _, vid := range overReplicatedVolumeIds {
  92. replicas := volumeReplicas[vid]
  93. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  94. replica := pickOneReplicaToDelete(replicas, replicaPlacement)
  95. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  96. if !takeAction {
  97. break
  98. }
  99. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
  100. return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  101. }
  102. }
  103. return nil
  104. }
  105. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  106. for _, vid := range underReplicatedVolumeIds {
  107. replicas := volumeReplicas[vid]
  108. replica := pickOneReplicaToCopyFrom(replicas)
  109. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  110. foundNewLocation := false
  111. for _, dst := range allLocations {
  112. // check whether data nodes satisfy the constraints
  113. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  114. // ask the volume server to replicate the volume
  115. foundNewLocation = true
  116. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  117. if !takeAction {
  118. break
  119. }
  120. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  121. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  122. VolumeId: replica.info.Id,
  123. SourceDataNode: replica.location.dataNode.Id,
  124. })
  125. if replicateErr != nil {
  126. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  127. }
  128. return nil
  129. })
  130. if err != nil {
  131. return err
  132. }
  133. // adjust free volume count
  134. dst.dataNode.FreeVolumeCount--
  135. keepDataNodesSorted(allLocations)
  136. break
  137. }
  138. }
  139. if !foundNewLocation {
  140. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  141. }
  142. }
  143. return nil
  144. }
  145. func keepDataNodesSorted(dataNodes []location) {
  146. sort.Slice(dataNodes, func(i, j int) bool {
  147. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  148. })
  149. }
  150. /*
  151. if on an existing data node {
  152. return false
  153. }
  154. if different from existing dcs {
  155. if lack on different dcs {
  156. return true
  157. }else{
  158. return false
  159. }
  160. }
  161. if not on primary dc {
  162. return false
  163. }
  164. if different from existing racks {
  165. if lack on different racks {
  166. return true
  167. }else{
  168. return false
  169. }
  170. }
  171. if not on primary rack {
  172. return false
  173. }
  174. if lacks on same rack {
  175. return true
  176. } else {
  177. return false
  178. }
  179. */
  180. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  181. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  182. if _, found := existingDataNodes[possibleLocation.String()]; found {
  183. // avoid duplicated volume on the same data node
  184. return false
  185. }
  186. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  187. // ensure data center count is within limit
  188. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  189. // different from existing dcs
  190. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  191. // lack on different dcs
  192. return true
  193. } else {
  194. // adding this would go over the different dcs limit
  195. return false
  196. }
  197. }
  198. // now this is same as one of the existing data center
  199. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  200. // not on one of the primary dcs
  201. return false
  202. }
  203. // now this is one of the primary dcs
  204. primaryDcRacks := make(map[string]int)
  205. for _, replica := range replicas {
  206. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  207. continue
  208. }
  209. primaryDcRacks[replica.location.Rack()] += 1
  210. }
  211. primaryRacks, _ := findTopKeys(primaryDcRacks)
  212. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  213. // ensure rack count is within limit
  214. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  215. // different from existing racks
  216. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  217. // lack on different racks
  218. return true
  219. } else {
  220. // adding this would go over the different racks limit
  221. return false
  222. }
  223. }
  224. // now this is same as one of the existing racks
  225. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  226. // not on the primary rack
  227. return false
  228. }
  229. // now this is on the primary rack
  230. // different from existing data nodes
  231. if sameRackCount < replicaPlacement.SameRackCount+1 {
  232. // lack on same rack
  233. return true
  234. } else {
  235. // adding this would go over the same data node limit
  236. return false
  237. }
  238. }
  239. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  240. for k, c := range m {
  241. if max < c {
  242. topKeys = topKeys[:0]
  243. topKeys = append(topKeys, k)
  244. max = c
  245. } else if max == c {
  246. topKeys = append(topKeys, k)
  247. }
  248. }
  249. return
  250. }
  251. func isAmong(key string, keys []string) bool {
  252. for _, k := range keys {
  253. if k == key {
  254. return true
  255. }
  256. }
  257. return false
  258. }
  259. type VolumeReplica struct {
  260. location *location
  261. info *master_pb.VolumeInformationMessage
  262. }
  263. type location struct {
  264. dc string
  265. rack string
  266. dataNode *master_pb.DataNodeInfo
  267. }
  268. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  269. return location{
  270. dc: dc,
  271. rack: rack,
  272. dataNode: dataNode,
  273. }
  274. }
  275. func (l location) String() string {
  276. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  277. }
  278. func (l location) Rack() string {
  279. return fmt.Sprintf("%s %s", l.dc, l.rack)
  280. }
  281. func (l location) DataCenter() string {
  282. return l.dc
  283. }
  284. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  285. mostRecent := replicas[0]
  286. for _, replica := range replicas {
  287. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  288. mostRecent = replica
  289. }
  290. }
  291. return mostRecent
  292. }
  293. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  294. diffDc = make(map[string]int)
  295. diffRack = make(map[string]int)
  296. diffNode = make(map[string]int)
  297. for _, replica := range replicas {
  298. diffDc[replica.location.DataCenter()] += 1
  299. diffRack[replica.location.Rack()] += 1
  300. diffNode[replica.location.String()] += 1
  301. }
  302. return
  303. }
  304. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  305. allSame := true
  306. oldest := replicas[0]
  307. for _, replica := range replicas {
  308. if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
  309. oldest = replica
  310. allSame = false
  311. }
  312. }
  313. if !allSame {
  314. return oldest
  315. }
  316. // TODO what if all the replicas have the same timestamp?
  317. return oldest
  318. }