You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

416 lines
12 KiB

6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "io"
  8. "path/filepath"
  9. "sort"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  14. )
  15. func init() {
  16. Commands = append(Commands, &commandVolumeFixReplication{})
  17. }
  18. type commandVolumeFixReplication struct {
  19. collectionPattern *string
  20. }
  21. func (c *commandVolumeFixReplication) Name() string {
  22. return "volume.fix.replication"
  23. }
  24. func (c *commandVolumeFixReplication) Help() string {
  25. return `add replicas to volumes that are missing replicas
  26. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  27. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  28. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  29. volume.fix.replication -n # do not take action
  30. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  31. volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
  32. Note:
  33. * each time this will only add back one replica for one volume id. If there are multiple replicas
  34. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  35. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  36. to register itself to the master.
  37. `
  38. }
  39. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  40. if err = commandEnv.confirmIsLocked(); err != nil {
  41. return
  42. }
  43. volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  44. c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
  45. skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
  46. if err = volFixReplicationCommand.Parse(args); err != nil {
  47. return nil
  48. }
  49. takeAction := !*skipChange
  50. var resp *master_pb.VolumeListResponse
  51. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  52. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  53. return err
  54. })
  55. if err != nil {
  56. return err
  57. }
  58. // find all volumes that needs replication
  59. // collect all data nodes
  60. volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
  61. if len(allLocations) == 0 {
  62. return fmt.Errorf("no data nodes at all")
  63. }
  64. // find all under replicated volumes
  65. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  66. for vid, replicas := range volumeReplicas {
  67. replica := replicas[0]
  68. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  69. if replicaPlacement.GetCopyCount() > len(replicas) {
  70. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  71. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  72. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  73. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  74. }
  75. }
  76. if len(overReplicatedVolumeIds) > 0 {
  77. return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
  78. }
  79. if len(underReplicatedVolumeIds) == 0 {
  80. return nil
  81. }
  82. // find the most under populated data nodes
  83. keepDataNodesSorted(allLocations)
  84. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  85. }
  86. func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
  87. volumeReplicas := make(map[uint32][]*VolumeReplica)
  88. var allLocations []location
  89. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  90. loc := newLocation(dc, string(rack), dn)
  91. for _, v := range dn.VolumeInfos {
  92. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  93. location: &loc,
  94. info: v,
  95. })
  96. }
  97. allLocations = append(allLocations, loc)
  98. })
  99. return volumeReplicas, allLocations
  100. }
  101. func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  102. for _, vid := range overReplicatedVolumeIds {
  103. replicas := volumeReplicas[vid]
  104. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  105. replica := pickOneReplicaToDelete(replicas, replicaPlacement)
  106. // check collection name pattern
  107. if *c.collectionPattern != "" {
  108. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  109. if err != nil {
  110. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  111. }
  112. if !matched {
  113. break
  114. }
  115. }
  116. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  117. if !takeAction {
  118. break
  119. }
  120. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
  121. return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  122. }
  123. }
  124. return nil
  125. }
  126. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  127. for _, vid := range underReplicatedVolumeIds {
  128. replicas := volumeReplicas[vid]
  129. replica := pickOneReplicaToCopyFrom(replicas)
  130. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  131. foundNewLocation := false
  132. hasSkippedCollection := false
  133. for _, dst := range allLocations {
  134. // check whether data nodes satisfy the constraints
  135. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  136. // check collection name pattern
  137. if *c.collectionPattern != "" {
  138. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  139. if err != nil {
  140. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  141. }
  142. if !matched {
  143. hasSkippedCollection = true
  144. break
  145. }
  146. }
  147. // ask the volume server to replicate the volume
  148. foundNewLocation = true
  149. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  150. if !takeAction {
  151. break
  152. }
  153. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  154. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  155. VolumeId: replica.info.Id,
  156. SourceDataNode: replica.location.dataNode.Id,
  157. })
  158. if replicateErr != nil {
  159. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  160. }
  161. return nil
  162. })
  163. if err != nil {
  164. return err
  165. }
  166. // adjust free volume count
  167. dst.dataNode.FreeVolumeCount--
  168. keepDataNodesSorted(allLocations)
  169. break
  170. }
  171. }
  172. if !foundNewLocation && !hasSkippedCollection {
  173. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  174. }
  175. }
  176. return nil
  177. }
  178. func keepDataNodesSorted(dataNodes []location) {
  179. sort.Slice(dataNodes, func(i, j int) bool {
  180. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  181. })
  182. }
  183. /*
  184. if on an existing data node {
  185. return false
  186. }
  187. if different from existing dcs {
  188. if lack on different dcs {
  189. return true
  190. }else{
  191. return false
  192. }
  193. }
  194. if not on primary dc {
  195. return false
  196. }
  197. if different from existing racks {
  198. if lack on different racks {
  199. return true
  200. }else{
  201. return false
  202. }
  203. }
  204. if not on primary rack {
  205. return false
  206. }
  207. if lacks on same rack {
  208. return true
  209. } else {
  210. return false
  211. }
  212. */
  213. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  214. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  215. if _, found := existingDataNodes[possibleLocation.String()]; found {
  216. // avoid duplicated volume on the same data node
  217. return false
  218. }
  219. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  220. // ensure data center count is within limit
  221. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  222. // different from existing dcs
  223. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  224. // lack on different dcs
  225. return true
  226. } else {
  227. // adding this would go over the different dcs limit
  228. return false
  229. }
  230. }
  231. // now this is same as one of the existing data center
  232. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  233. // not on one of the primary dcs
  234. return false
  235. }
  236. // now this is one of the primary dcs
  237. primaryDcRacks := make(map[string]int)
  238. for _, replica := range replicas {
  239. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  240. continue
  241. }
  242. primaryDcRacks[replica.location.Rack()] += 1
  243. }
  244. primaryRacks, _ := findTopKeys(primaryDcRacks)
  245. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  246. // ensure rack count is within limit
  247. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  248. // different from existing racks
  249. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  250. // lack on different racks
  251. return true
  252. } else {
  253. // adding this would go over the different racks limit
  254. return false
  255. }
  256. }
  257. // now this is same as one of the existing racks
  258. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  259. // not on the primary rack
  260. return false
  261. }
  262. // now this is on the primary rack
  263. // different from existing data nodes
  264. if sameRackCount < replicaPlacement.SameRackCount+1 {
  265. // lack on same rack
  266. return true
  267. } else {
  268. // adding this would go over the same data node limit
  269. return false
  270. }
  271. }
  272. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  273. for k, c := range m {
  274. if max < c {
  275. topKeys = topKeys[:0]
  276. topKeys = append(topKeys, k)
  277. max = c
  278. } else if max == c {
  279. topKeys = append(topKeys, k)
  280. }
  281. }
  282. return
  283. }
  284. func isAmong(key string, keys []string) bool {
  285. for _, k := range keys {
  286. if k == key {
  287. return true
  288. }
  289. }
  290. return false
  291. }
  292. type VolumeReplica struct {
  293. location *location
  294. info *master_pb.VolumeInformationMessage
  295. }
  296. type location struct {
  297. dc string
  298. rack string
  299. dataNode *master_pb.DataNodeInfo
  300. }
  301. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  302. return location{
  303. dc: dc,
  304. rack: rack,
  305. dataNode: dataNode,
  306. }
  307. }
  308. func (l location) String() string {
  309. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  310. }
  311. func (l location) Rack() string {
  312. return fmt.Sprintf("%s %s", l.dc, l.rack)
  313. }
  314. func (l location) DataCenter() string {
  315. return l.dc
  316. }
  317. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  318. mostRecent := replicas[0]
  319. for _, replica := range replicas {
  320. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  321. mostRecent = replica
  322. }
  323. }
  324. return mostRecent
  325. }
  326. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  327. diffDc = make(map[string]int)
  328. diffRack = make(map[string]int)
  329. diffNode = make(map[string]int)
  330. for _, replica := range replicas {
  331. diffDc[replica.location.DataCenter()] += 1
  332. diffRack[replica.location.Rack()] += 1
  333. diffNode[replica.location.String()] += 1
  334. }
  335. return
  336. }
  337. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  338. sort.Slice(replicas, func(i, j int) bool {
  339. a, b := replicas[i], replicas[j]
  340. if a.info.CompactRevision != b.info.CompactRevision {
  341. return a.info.CompactRevision < b.info.CompactRevision
  342. }
  343. if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
  344. return a.info.ModifiedAtSecond < b.info.ModifiedAtSecond
  345. }
  346. if a.info.Size != b.info.Size {
  347. return a.info.Size < b.info.Size
  348. }
  349. return false
  350. })
  351. return replicas[0]
  352. }