You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

666 lines
21 KiB

4 years ago
6 years ago
5 years ago
6 years ago
4 months ago
5 years ago
3 years ago
3 years ago
5 years ago
4 years ago
3 years ago
5 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
5 years ago
5 years ago
5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "slices"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. "google.golang.org/grpc"
  17. "github.com/seaweedfs/seaweedfs/weed/operation"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  21. )
  22. func init() {
  23. Commands = append(Commands, &commandVolumeFixReplication{})
  24. }
  25. type commandVolumeFixReplication struct {
  26. collectionPattern *string
  27. }
  28. func (c *commandVolumeFixReplication) Name() string {
  29. return "volume.fix.replication"
  30. }
  31. func (c *commandVolumeFixReplication) Help() string {
  32. return `add or remove replicas to volumes that are missing replicas or over-replicated
  33. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  34. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  35. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  36. volume.fix.replication -n # do not take action
  37. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  38. volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
  39. Note:
  40. * each time this will only add back one replica for each volume id that is under replicated.
  41. If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
  42. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  43. to register itself to the master.
  44. `
  45. }
  46. func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
  47. return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
  48. }
  49. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  50. volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  51. c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
  52. applyChanges := volFixReplicationCommand.Bool("force", false, "apply the fix")
  53. doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
  54. doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
  55. retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
  56. volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
  57. maxParallelization := volFixReplicationCommand.Int("maxParallelization", 1, "run up to X tasks in parallel, whenever possible")
  58. if err = volFixReplicationCommand.Parse(args); err != nil {
  59. return nil
  60. }
  61. infoAboutSimulationMode(writer, *applyChanges, "-force")
  62. commandEnv.noLock = !*applyChanges
  63. if err = commandEnv.confirmIsLocked(args); *applyChanges && err != nil {
  64. return
  65. }
  66. underReplicatedVolumeIdsCount := 1
  67. for underReplicatedVolumeIdsCount > 0 {
  68. fixedVolumeReplicas := map[string]int{}
  69. // collect topology information
  70. topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
  71. if err != nil {
  72. return err
  73. }
  74. // find all volumes that needs replication
  75. // collect all data nodes
  76. volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
  77. if len(allLocations) == 0 {
  78. return fmt.Errorf("no data nodes at all")
  79. }
  80. // find all under replicated volumes
  81. var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
  82. for vid, replicas := range volumeReplicas {
  83. replica := replicas[0]
  84. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  85. switch {
  86. case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
  87. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  88. case isMisplaced(replicas, replicaPlacement):
  89. misplacedVolumeIds = append(misplacedVolumeIds, vid)
  90. fmt.Fprintf(writer, "volume %d replication %s is not well placed %s\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id)
  91. case replicaPlacement.GetCopyCount() < len(replicas):
  92. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  93. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  94. }
  95. }
  96. if !commandEnv.isLocked() {
  97. return fmt.Errorf("lock is lost")
  98. }
  99. if len(overReplicatedVolumeIds) > 0 && *doDelete {
  100. if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
  101. return err
  102. }
  103. }
  104. if len(misplacedVolumeIds) > 0 && *doDelete {
  105. if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil {
  106. return err
  107. }
  108. }
  109. underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
  110. if underReplicatedVolumeIdsCount > 0 {
  111. // find the most underpopulated data nodes
  112. fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep, *maxParallelization)
  113. if err != nil {
  114. return err
  115. }
  116. }
  117. if !*applyChanges {
  118. break
  119. }
  120. // check that the topology has been updated
  121. if len(fixedVolumeReplicas) > 0 {
  122. fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
  123. for k, _ := range fixedVolumeReplicas {
  124. fixedVolumes = append(fixedVolumes, k)
  125. }
  126. volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
  127. if err != nil {
  128. return err
  129. }
  130. for _, volumeIdLocation := range volumeIdLocations {
  131. volumeId := volumeIdLocation.VolumeOrFileId
  132. volumeIdLocationCount := len(volumeIdLocation.Locations)
  133. i := 0
  134. for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
  135. fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
  136. time.Sleep(time.Duration(i+1) * time.Second * 7)
  137. volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
  138. if err != nil {
  139. return err
  140. }
  141. volumeIdLocationCount = len(volumeLocIds[0].Locations)
  142. if *retryCount <= i {
  143. return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
  144. }
  145. i += 1
  146. }
  147. }
  148. }
  149. }
  150. return nil
  151. }
  152. func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
  153. volumeReplicas := make(map[uint32][]*VolumeReplica)
  154. var allLocations []location
  155. eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  156. loc := newLocation(string(dc), string(rack), dn)
  157. for _, diskInfo := range dn.DiskInfos {
  158. for _, v := range diskInfo.VolumeInfos {
  159. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  160. location: &loc,
  161. info: v,
  162. })
  163. }
  164. }
  165. allLocations = append(allLocations, loc)
  166. })
  167. return volumeReplicas, allLocations
  168. }
  169. type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
  170. func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) {
  171. aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
  172. defer func() {
  173. aDB.Close()
  174. bDB.Close()
  175. }()
  176. // read index db
  177. readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
  178. if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil {
  179. return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
  180. }
  181. if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil {
  182. return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
  183. }
  184. if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil {
  185. return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
  186. }
  187. return
  188. }
  189. func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
  190. for _, vid := range overReplicatedVolumeIds {
  191. replicas := volumeReplicas[vid]
  192. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  193. replica := selectOneVolumeFn(replicas, replicaPlacement)
  194. // check collection name pattern
  195. if *c.collectionPattern != "" {
  196. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  197. if err != nil {
  198. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  199. }
  200. if !matched {
  201. continue
  202. }
  203. }
  204. collectionIsMismatch := false
  205. for _, volumeReplica := range replicas {
  206. if volumeReplica.info.Collection != replica.info.Collection {
  207. fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
  208. collectionIsMismatch = true
  209. }
  210. }
  211. if collectionIsMismatch {
  212. continue
  213. }
  214. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  215. if !applyChanges {
  216. break
  217. }
  218. if doCheck {
  219. var checkErr error
  220. for _, replicaB := range replicas {
  221. if replicaB.location.dataNode == replica.location.dataNode {
  222. continue
  223. }
  224. if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil {
  225. fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
  226. break
  227. }
  228. }
  229. if checkErr != nil {
  230. continue
  231. }
  232. }
  233. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
  234. pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil {
  235. fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  236. }
  237. }
  238. return nil
  239. }
  240. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep, maxParallelization int) (fixedVolumes map[string]int, err error) {
  241. fixedVolumes = map[string]int{}
  242. if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
  243. underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
  244. }
  245. var (
  246. wg sync.WaitGroup
  247. semaphore = make(chan struct{}, maxParallelization)
  248. )
  249. for _, vid := range underReplicatedVolumeIds {
  250. wg.Add(1)
  251. semaphore <- struct{}{} // Acquire semaphore
  252. go func(vid uint32) {
  253. defer wg.Done()
  254. defer func() { <-semaphore }() // Release semaphore
  255. for attempt := 0; attempt <= retryCount; attempt++ {
  256. if err := c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
  257. if applyChanges {
  258. fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
  259. }
  260. break
  261. }
  262. fmt.Fprintf(writer, "Failed to fix volume %d (attempt %d): %v\n", vid, attempt+1, err)
  263. }
  264. }(vid)
  265. }
  266. wg.Wait()
  267. return fixedVolumes, nil
  268. }
  269. func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
  270. replicas := volumeReplicas[vid]
  271. replica := pickOneReplicaToCopyFrom(replicas)
  272. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  273. foundNewLocation := false
  274. hasSkippedCollection := false
  275. keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
  276. fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
  277. for _, dst := range allLocations {
  278. // check whether data nodes satisfy the constraints
  279. if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  280. // check collection name pattern
  281. if *c.collectionPattern != "" {
  282. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  283. if err != nil {
  284. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  285. }
  286. if !matched {
  287. hasSkippedCollection = true
  288. break
  289. }
  290. }
  291. // ask the volume server to replicate the volume
  292. foundNewLocation = true
  293. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  294. if !applyChanges {
  295. // adjust volume count
  296. addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
  297. break
  298. }
  299. err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  300. stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  301. VolumeId: replica.info.Id,
  302. SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
  303. })
  304. if replicateErr != nil {
  305. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  306. }
  307. for {
  308. resp, recvErr := stream.Recv()
  309. if recvErr != nil {
  310. if recvErr == io.EOF {
  311. break
  312. } else {
  313. return recvErr
  314. }
  315. }
  316. if resp.ProcessedBytes > 0 {
  317. fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes)
  318. }
  319. }
  320. return nil
  321. })
  322. if err != nil {
  323. return err
  324. }
  325. // adjust volume count
  326. addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
  327. break
  328. }
  329. }
  330. if !foundNewLocation && !hasSkippedCollection {
  331. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  332. }
  333. return nil
  334. }
  335. func addVolumeCount(info *master_pb.DiskInfo, count int) {
  336. if info == nil {
  337. return
  338. }
  339. info.VolumeCount += int64(count)
  340. info.FreeVolumeCount -= int64(count)
  341. }
  342. func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
  343. fn := capacityByFreeVolumeCount(diskType)
  344. slices.SortFunc(dataNodes, func(a, b location) int {
  345. return int(fn(b.dataNode) - fn(a.dataNode))
  346. })
  347. }
  348. func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
  349. existingDataCenters, existingRacks, _ := countReplicas(replicas)
  350. if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
  351. return false
  352. }
  353. if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
  354. return false
  355. }
  356. if replicaPlacement.SameRackCount > 0 {
  357. foundSatisfyRack := false
  358. for _, rackCount := range existingRacks {
  359. if rackCount >= replicaPlacement.SameRackCount+1 {
  360. foundSatisfyRack = true
  361. }
  362. }
  363. return foundSatisfyRack
  364. }
  365. return true
  366. }
  367. /*
  368. if on an existing data node {
  369. return false
  370. }
  371. if different from existing dcs {
  372. if lack on different dcs {
  373. return true
  374. }else{
  375. return false
  376. }
  377. }
  378. if not on primary dc {
  379. return false
  380. }
  381. if different from existing racks {
  382. if lack on different racks {
  383. return true
  384. }else{
  385. return false
  386. }
  387. }
  388. if not on primary rack {
  389. return false
  390. }
  391. if lacks on same rack {
  392. return true
  393. } else {
  394. return false
  395. }
  396. */
  397. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  398. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  399. if _, found := existingDataNodes[possibleLocation.String()]; found {
  400. // avoid duplicated volume on the same data node
  401. return false
  402. }
  403. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  404. // ensure data center count is within limit
  405. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  406. // different from existing dcs
  407. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  408. // lack on different dcs
  409. return true
  410. } else {
  411. // adding this would go over the different dcs limit
  412. return false
  413. }
  414. }
  415. // now this is same as one of the existing data center
  416. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  417. // not on one of the primary dcs
  418. return false
  419. }
  420. // now this is one of the primary dcs
  421. primaryDcRacks := make(map[string]int)
  422. for _, replica := range replicas {
  423. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  424. continue
  425. }
  426. primaryDcRacks[replica.location.Rack()] += 1
  427. }
  428. primaryRacks, _ := findTopKeys(primaryDcRacks)
  429. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  430. // ensure rack count is within limit
  431. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  432. // different from existing racks
  433. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  434. // lack on different racks
  435. return true
  436. } else {
  437. // adding this would go over the different racks limit
  438. return false
  439. }
  440. }
  441. // now this is same as one of the existing racks
  442. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  443. // not on the primary rack
  444. return false
  445. }
  446. // now this is on the primary rack
  447. // different from existing data nodes
  448. if sameRackCount < replicaPlacement.SameRackCount+1 {
  449. // lack on same rack
  450. return true
  451. } else {
  452. // adding this would go over the same data node limit
  453. return false
  454. }
  455. }
  456. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  457. for k, c := range m {
  458. if max < c {
  459. topKeys = topKeys[:0]
  460. topKeys = append(topKeys, k)
  461. max = c
  462. } else if max == c {
  463. topKeys = append(topKeys, k)
  464. }
  465. }
  466. return
  467. }
  468. func isAmong(key string, keys []string) bool {
  469. for _, k := range keys {
  470. if k == key {
  471. return true
  472. }
  473. }
  474. return false
  475. }
  476. type VolumeReplica struct {
  477. location *location
  478. info *master_pb.VolumeInformationMessage
  479. }
  480. type location struct {
  481. dc string
  482. rack string
  483. dataNode *master_pb.DataNodeInfo
  484. }
  485. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  486. return location{
  487. dc: dc,
  488. rack: rack,
  489. dataNode: dataNode,
  490. }
  491. }
  492. func (l location) String() string {
  493. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  494. }
  495. func (l location) Rack() string {
  496. return fmt.Sprintf("%s %s", l.dc, l.rack)
  497. }
  498. func (l location) DataCenter() string {
  499. return l.dc
  500. }
  501. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  502. mostRecent := replicas[0]
  503. for _, replica := range replicas {
  504. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  505. mostRecent = replica
  506. }
  507. }
  508. return mostRecent
  509. }
  510. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  511. diffDc = make(map[string]int)
  512. diffRack = make(map[string]int)
  513. diffNode = make(map[string]int)
  514. for _, replica := range replicas {
  515. diffDc[replica.location.DataCenter()] += 1
  516. diffRack[replica.location.Rack()] += 1
  517. diffNode[replica.location.String()] += 1
  518. }
  519. return
  520. }
  521. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  522. slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
  523. if a.info.Size != b.info.Size {
  524. return int(a.info.Size - b.info.Size)
  525. }
  526. if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
  527. return int(a.info.ModifiedAtSecond - b.info.ModifiedAtSecond)
  528. }
  529. if a.info.CompactRevision != b.info.CompactRevision {
  530. return int(a.info.CompactRevision - b.info.CompactRevision)
  531. }
  532. return 0
  533. })
  534. return replicas[0]
  535. }
  536. // check and fix misplaced volumes
  537. func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
  538. for i := 0; i < len(replicas); i++ {
  539. others := otherThan(replicas, i)
  540. if !satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
  541. return true
  542. }
  543. }
  544. return false
  545. }
  546. func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
  547. for i := 0; i < len(replicas); i++ {
  548. if index != i {
  549. others = append(others, replicas[i])
  550. }
  551. }
  552. return
  553. }
  554. func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
  555. var deletionCandidates []*VolumeReplica
  556. for i := 0; i < len(replicas); i++ {
  557. others := otherThan(replicas, i)
  558. if !isMisplaced(others, replicaPlacement) {
  559. deletionCandidates = append(deletionCandidates, replicas[i])
  560. }
  561. }
  562. if len(deletionCandidates) > 0 {
  563. return pickOneReplicaToDelete(deletionCandidates, replicaPlacement)
  564. }
  565. return pickOneReplicaToDelete(replicas, replicaPlacement)
  566. }