You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
13 KiB

6 years ago
4 years ago
6 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "io"
  9. "path/filepath"
  10. "sort"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  15. )
  16. func init() {
  17. Commands = append(Commands, &commandVolumeFixReplication{})
  18. }
  19. type commandVolumeFixReplication struct {
  20. collectionPattern *string
  21. }
  22. func (c *commandVolumeFixReplication) Name() string {
  23. return "volume.fix.replication"
  24. }
  25. func (c *commandVolumeFixReplication) Help() string {
  26. return `add replicas to volumes that are missing replicas
  27. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  28. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  29. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  30. volume.fix.replication -n # do not take action
  31. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  32. volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
  33. Note:
  34. * each time this will only add back one replica for each volume id that is under replicated.
  35. If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
  36. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  37. to register itself to the master.
  38. `
  39. }
  40. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  41. if err = commandEnv.confirmIsLocked(); err != nil {
  42. return
  43. }
  44. volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  45. c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
  46. skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
  47. if err = volFixReplicationCommand.Parse(args); err != nil {
  48. return nil
  49. }
  50. takeAction := !*skipChange
  51. var resp *master_pb.VolumeListResponse
  52. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  53. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  54. return err
  55. })
  56. if err != nil {
  57. return err
  58. }
  59. // find all volumes that needs replication
  60. // collect all data nodes
  61. volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
  62. if len(allLocations) == 0 {
  63. return fmt.Errorf("no data nodes at all")
  64. }
  65. // find all under replicated volumes
  66. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  67. for vid, replicas := range volumeReplicas {
  68. replica := replicas[0]
  69. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  70. if replicaPlacement.GetCopyCount() > len(replicas) {
  71. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  72. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  73. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  74. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  75. }
  76. }
  77. if len(overReplicatedVolumeIds) > 0 {
  78. return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
  79. }
  80. if len(underReplicatedVolumeIds) == 0 {
  81. return nil
  82. }
  83. // find the most under populated data nodes
  84. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  85. }
  86. func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
  87. volumeReplicas := make(map[uint32][]*VolumeReplica)
  88. var allLocations []location
  89. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  90. loc := newLocation(dc, string(rack), dn)
  91. for _, diskInfo := range dn.DiskInfos {
  92. for _, v := range diskInfo.VolumeInfos {
  93. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  94. location: &loc,
  95. info: v,
  96. })
  97. }
  98. }
  99. allLocations = append(allLocations, loc)
  100. })
  101. return volumeReplicas, allLocations
  102. }
  103. func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  104. for _, vid := range overReplicatedVolumeIds {
  105. replicas := volumeReplicas[vid]
  106. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  107. replica := pickOneReplicaToDelete(replicas, replicaPlacement)
  108. // check collection name pattern
  109. if *c.collectionPattern != "" {
  110. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  111. if err != nil {
  112. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  113. }
  114. if !matched {
  115. break
  116. }
  117. }
  118. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  119. if !takeAction {
  120. break
  121. }
  122. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
  123. return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  124. }
  125. }
  126. return nil
  127. }
  128. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  129. for _, vid := range underReplicatedVolumeIds {
  130. replicas := volumeReplicas[vid]
  131. replica := pickOneReplicaToCopyFrom(replicas)
  132. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  133. foundNewLocation := false
  134. hasSkippedCollection := false
  135. keepDataNodesSorted(allLocations, replica.info.DiskType)
  136. for _, dst := range allLocations {
  137. // check whether data nodes satisfy the constraints
  138. fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
  139. if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  140. // check collection name pattern
  141. if *c.collectionPattern != "" {
  142. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  143. if err != nil {
  144. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  145. }
  146. if !matched {
  147. hasSkippedCollection = true
  148. break
  149. }
  150. }
  151. // ask the volume server to replicate the volume
  152. foundNewLocation = true
  153. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  154. if !takeAction {
  155. break
  156. }
  157. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  158. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  159. VolumeId: replica.info.Id,
  160. SourceDataNode: replica.location.dataNode.Id,
  161. })
  162. if replicateErr != nil {
  163. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  164. }
  165. return nil
  166. })
  167. if err != nil {
  168. return err
  169. }
  170. // adjust free volume count
  171. dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
  172. break
  173. }
  174. }
  175. if !foundNewLocation && !hasSkippedCollection {
  176. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  177. }
  178. }
  179. return nil
  180. }
  181. func keepDataNodesSorted(dataNodes []location, diskType string) {
  182. fn := capacityByFreeVolumeCount(types.ToDiskType(diskType))
  183. sort.Slice(dataNodes, func(i, j int) bool {
  184. return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
  185. })
  186. }
  187. /*
  188. if on an existing data node {
  189. return false
  190. }
  191. if different from existing dcs {
  192. if lack on different dcs {
  193. return true
  194. }else{
  195. return false
  196. }
  197. }
  198. if not on primary dc {
  199. return false
  200. }
  201. if different from existing racks {
  202. if lack on different racks {
  203. return true
  204. }else{
  205. return false
  206. }
  207. }
  208. if not on primary rack {
  209. return false
  210. }
  211. if lacks on same rack {
  212. return true
  213. } else {
  214. return false
  215. }
  216. */
  217. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  218. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  219. if _, found := existingDataNodes[possibleLocation.String()]; found {
  220. // avoid duplicated volume on the same data node
  221. return false
  222. }
  223. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  224. // ensure data center count is within limit
  225. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  226. // different from existing dcs
  227. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  228. // lack on different dcs
  229. return true
  230. } else {
  231. // adding this would go over the different dcs limit
  232. return false
  233. }
  234. }
  235. // now this is same as one of the existing data center
  236. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  237. // not on one of the primary dcs
  238. return false
  239. }
  240. // now this is one of the primary dcs
  241. primaryDcRacks := make(map[string]int)
  242. for _, replica := range replicas {
  243. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  244. continue
  245. }
  246. primaryDcRacks[replica.location.Rack()] += 1
  247. }
  248. primaryRacks, _ := findTopKeys(primaryDcRacks)
  249. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  250. // ensure rack count is within limit
  251. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  252. // different from existing racks
  253. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  254. // lack on different racks
  255. return true
  256. } else {
  257. // adding this would go over the different racks limit
  258. return false
  259. }
  260. }
  261. // now this is same as one of the existing racks
  262. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  263. // not on the primary rack
  264. return false
  265. }
  266. // now this is on the primary rack
  267. // different from existing data nodes
  268. if sameRackCount < replicaPlacement.SameRackCount+1 {
  269. // lack on same rack
  270. return true
  271. } else {
  272. // adding this would go over the same data node limit
  273. return false
  274. }
  275. }
  276. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  277. for k, c := range m {
  278. if max < c {
  279. topKeys = topKeys[:0]
  280. topKeys = append(topKeys, k)
  281. max = c
  282. } else if max == c {
  283. topKeys = append(topKeys, k)
  284. }
  285. }
  286. return
  287. }
  288. func isAmong(key string, keys []string) bool {
  289. for _, k := range keys {
  290. if k == key {
  291. return true
  292. }
  293. }
  294. return false
  295. }
  296. type VolumeReplica struct {
  297. location *location
  298. info *master_pb.VolumeInformationMessage
  299. }
  300. type location struct {
  301. dc string
  302. rack string
  303. dataNode *master_pb.DataNodeInfo
  304. }
  305. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  306. return location{
  307. dc: dc,
  308. rack: rack,
  309. dataNode: dataNode,
  310. }
  311. }
  312. func (l location) String() string {
  313. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  314. }
  315. func (l location) Rack() string {
  316. return fmt.Sprintf("%s %s", l.dc, l.rack)
  317. }
  318. func (l location) DataCenter() string {
  319. return l.dc
  320. }
  321. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  322. mostRecent := replicas[0]
  323. for _, replica := range replicas {
  324. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  325. mostRecent = replica
  326. }
  327. }
  328. return mostRecent
  329. }
  330. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  331. diffDc = make(map[string]int)
  332. diffRack = make(map[string]int)
  333. diffNode = make(map[string]int)
  334. for _, replica := range replicas {
  335. diffDc[replica.location.DataCenter()] += 1
  336. diffRack[replica.location.Rack()] += 1
  337. diffNode[replica.location.String()] += 1
  338. }
  339. return
  340. }
  341. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  342. sort.Slice(replicas, func(i, j int) bool {
  343. a, b := replicas[i], replicas[j]
  344. if a.info.CompactRevision != b.info.CompactRevision {
  345. return a.info.CompactRevision < b.info.CompactRevision
  346. }
  347. if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
  348. return a.info.ModifiedAtSecond < b.info.ModifiedAtSecond
  349. }
  350. if a.info.Size != b.info.Size {
  351. return a.info.Size < b.info.Size
  352. }
  353. return false
  354. })
  355. return replicas[0]
  356. }