142 lines
3.7 KiB

  1. package topology
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. )
  7. func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
  8. dn.RLock()
  9. for _, c := range dn.children {
  10. disk := c.(*Disk)
  11. ret = append(ret, disk.GetEcShards()...)
  12. }
  13. dn.RUnlock()
  14. return ret
  15. }
  16. func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  17. // prepare the new ec shard map
  18. actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  19. for _, ecShards := range actualShards {
  20. actualEcShardMap[ecShards.VolumeId] = ecShards
  21. }
  22. existingEcShards := dn.GetEcShards()
  23. // find out the newShards and deletedShards
  24. var newShardCount, deletedShardCount int
  25. for _, ecShards := range existingEcShards {
  26. disk := dn.getOrCreateDisk(ecShards.DiskType)
  27. deltaDiskUsages := newDiskUsages()
  28. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
  29. vid := ecShards.VolumeId
  30. if actualEcShards, ok := actualEcShardMap[vid]; !ok {
  31. // dn registered ec shards not found in the new set of ec shards
  32. deletedShards = append(deletedShards, ecShards)
  33. deletedShardCount += ecShards.ShardIdCount()
  34. } else {
  35. // found, but maybe the actual shard could be missing
  36. a := actualEcShards.Minus(ecShards)
  37. if a.ShardIdCount() > 0 {
  38. newShards = append(newShards, a)
  39. newShardCount += a.ShardIdCount()
  40. }
  41. d := ecShards.Minus(actualEcShards)
  42. if d.ShardIdCount() > 0 {
  43. deletedShards = append(deletedShards, d)
  44. deletedShardCount += d.ShardIdCount()
  45. }
  46. }
  47. deltaDiskUsage.ecShardCount = int64(newShardCount - deletedShardCount)
  48. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  49. }
  50. for _, ecShards := range actualShards {
  51. if dn.HasEcShards(ecShards.VolumeId) {
  52. continue
  53. }
  54. newShards = append(newShards, ecShards)
  55. disk := dn.getOrCreateDisk(ecShards.DiskType)
  56. deltaDiskUsages := newDiskUsages()
  57. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(ecShards.DiskType))
  58. deltaDiskUsage.ecShardCount = int64(ecShards.ShardIdCount())
  59. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  60. }
  61. if len(newShards) > 0 || len(deletedShards) > 0 {
  62. // if changed, set to the new ec shard map
  63. dn.doUpdateEcShards(actualShards)
  64. }
  65. return
  66. }
  67. func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
  68. dn.RLock()
  69. defer dn.RUnlock()
  70. for _, c := range dn.children {
  71. disk := c.(*Disk)
  72. _, found = disk.ecShards[volumeId]
  73. if found {
  74. return
  75. }
  76. }
  77. return
  78. }
  79. func (dn *DataNode) doUpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) {
  80. dn.Lock()
  81. for _, c := range dn.children {
  82. disk := c.(*Disk)
  83. disk.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  84. }
  85. for _, shard := range actualShards {
  86. disk := dn.getOrCreateDisk(shard.DiskType)
  87. disk.ecShards[shard.VolumeId] = shard
  88. }
  89. dn.Unlock()
  90. }
  91. func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  92. for _, newShard := range newShards {
  93. dn.AddOrUpdateEcShard(newShard)
  94. }
  95. for _, deletedShard := range deletedShards {
  96. dn.DeleteEcShard(deletedShard)
  97. }
  98. }
  99. func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
  100. disk := dn.getOrCreateDisk(s.DiskType)
  101. disk.AddOrUpdateEcShard(s)
  102. }
  103. func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
  104. disk := dn.getOrCreateDisk(s.DiskType)
  105. disk.DeleteEcShard(s)
  106. }
  107. func (dn *DataNode) HasVolumesById(volumeId needle.VolumeId) (hasVolumeId bool) {
  108. dn.RLock()
  109. defer dn.RUnlock()
  110. for _, c := range dn.children {
  111. disk := c.(*Disk)
  112. if disk.HasVolumesById(volumeId) {
  113. return true
  114. }
  115. }
  116. return false
  117. }