You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
3.6 KiB

  1. package topology
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. )
  7. func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
  8. dn.RLock()
  9. for _, c := range dn.children {
  10. disk := c.(*Disk)
  11. ret = append(ret, disk.GetEcShards()...)
  12. }
  13. dn.RUnlock()
  14. return ret
  15. }
  16. func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  17. // prepare the new ec shard map
  18. actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  19. for _, ecShards := range actualShards {
  20. actualEcShardMap[ecShards.VolumeId] = ecShards
  21. }
  22. existingEcShards := dn.GetEcShards()
  23. // find out the newShards and deletedShards
  24. for _, ecShards := range existingEcShards {
  25. var newShardCount, deletedShardCount int
  26. disk := dn.getOrCreateDisk(ecShards.DiskType)
  27. vid := ecShards.VolumeId
  28. if actualEcShards, ok := actualEcShardMap[vid]; !ok {
  29. // dn registered ec shards not found in the new set of ec shards
  30. deletedShards = append(deletedShards, ecShards)
  31. deletedShardCount += ecShards.ShardIdCount()
  32. } else {
  33. // found, but maybe the actual shard could be missing
  34. a := actualEcShards.Minus(ecShards)
  35. if a.ShardIdCount() > 0 {
  36. newShards = append(newShards, a)
  37. newShardCount += a.ShardIdCount()
  38. }
  39. d := ecShards.Minus(actualEcShards)
  40. if d.ShardIdCount() > 0 {
  41. deletedShards = append(deletedShards, d)
  42. deletedShardCount += d.ShardIdCount()
  43. }
  44. }
  45. if (newShardCount - deletedShardCount) != 0 {
  46. disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
  47. ecShardCount: int64(newShardCount - deletedShardCount),
  48. })
  49. }
  50. }
  51. for _, ecShards := range actualShards {
  52. if dn.HasEcShards(ecShards.VolumeId) {
  53. continue
  54. }
  55. newShards = append(newShards, ecShards)
  56. disk := dn.getOrCreateDisk(ecShards.DiskType)
  57. disk.UpAdjustDiskUsageDelta(types.ToDiskType(ecShards.DiskType), &DiskUsageCounts{
  58. ecShardCount: int64(ecShards.ShardIdCount()),
  59. })
  60. }
  61. if len(newShards) > 0 || len(deletedShards) > 0 {
  62. // if changed, set to the new ec shard map
  63. dn.doUpdateEcShards(actualShards)
  64. }
  65. return
  66. }
  67. func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
  68. dn.RLock()
  69. defer dn.RUnlock()
  70. for _, c := range dn.children {
  71. disk := c.(*Disk)
  72. _, found = disk.ecShards[volumeId]
  73. if found {
  74. return
  75. }
  76. }
  77. return
  78. }
  79. func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
  80. dn.Lock()
  81. for _, c := range dn.children {
  82. disk := c.(*Disk)
  83. disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  84. }
  85. for _, shard := range actualShards {
  86. disk := dn.getOrCreateDisk(shard.DiskType)
  87. disk.ecShards[shard.VolumeId] = shard
  88. }
  89. dn.Unlock()
  90. }
  91. func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  92. for _, newShard := range newShards {
  93. dn.AddOrUpdateEcShard(newShard)
  94. }
  95. for _, deletedShard := range deletedShards {
  96. dn.DeleteEcShard(deletedShard)
  97. }
  98. }
  99. func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
  100. disk := dn.getOrCreateDisk(s.DiskType)
  101. disk.AddOrUpdateEcShard(s)
  102. }
  103. func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
  104. disk := dn.getOrCreateDisk(s.DiskType)
  105. disk.DeleteEcShard(s)
  106. }
  107. func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
  108. dn.RLock()
  109. defer dn.RUnlock()
  110. for _, c := range dn.children {
  111. disk := c.(*Disk)
  112. if disk.HasVolumesById(volumeId) {
  113. return true
  114. }
  115. }
  116. return false
  117. }