You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.1 KiB

  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  4. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  5. )
  6. func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
  7. dn.RLock()
  8. for _, ecVolumeInfo := range dn.ecShards {
  9. ret = append(ret, ecVolumeInfo)
  10. }
  11. dn.RUnlock()
  12. return ret
  13. }
  14. func (dn *DataNode) GetEcShardsCount() (count int) {
  15. dn.RLock()
  16. defer dn.RUnlock()
  17. for _, ecVolumeInfo := range dn.ecShards {
  18. count += ecVolumeInfo.ShardBits.ShardIdCount()
  19. }
  20. return count
  21. }
  22. func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  23. // prepare the new ec shard map
  24. actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  25. for _, ecShards := range actualShards {
  26. actualEcShardMap[ecShards.VolumeId] = ecShards
  27. }
  28. // found out the newShards and deletedShards
  29. dn.ecShardsLock.RLock()
  30. for vid, ecShards := range dn.ecShards {
  31. if actualEcShards, ok := actualEcShardMap[vid]; !ok {
  32. // dn registered ec shards not found in the new set of ec shards
  33. deletedShards = append(deletedShards, ecShards)
  34. } else {
  35. // found, but maybe the actual shard could be missing
  36. a := actualEcShards.Minus(ecShards)
  37. if a.ShardIdCount() > 0 {
  38. newShards = append(newShards, a)
  39. }
  40. d := ecShards.Minus(actualEcShards)
  41. if d.ShardIdCount() > 0 {
  42. deletedShards = append(deletedShards, d)
  43. }
  44. }
  45. }
  46. for _, ecShards := range actualShards {
  47. if _, found := dn.ecShards[ecShards.VolumeId]; !found {
  48. newShards = append(newShards, ecShards)
  49. }
  50. }
  51. dn.ecShardsLock.RUnlock()
  52. if len(newShards) > 0 || len(deletedShards) > 0 {
  53. // if changed, set to the new ec shard map
  54. dn.ecShardsLock.Lock()
  55. dn.ecShards = actualEcShardMap
  56. dn.ecShardsLock.Unlock()
  57. }
  58. return
  59. }
  60. func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  61. for _, newShard := range newShards {
  62. dn.AddOrUpdateEcShard(newShard)
  63. }
  64. for _, deletedShard := range deletedShards {
  65. dn.DeleteEcShard(deletedShard)
  66. }
  67. }
  68. func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
  69. dn.ecShardsLock.Lock()
  70. defer dn.ecShardsLock.Unlock()
  71. if existing, ok := dn.ecShards[s.VolumeId]; !ok {
  72. dn.ecShards[s.VolumeId] = s
  73. } else {
  74. existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
  75. }
  76. }
  77. func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
  78. dn.ecShardsLock.Lock()
  79. defer dn.ecShardsLock.Unlock()
  80. if existing, ok := dn.ecShards[s.VolumeId]; ok {
  81. existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
  82. if existing.ShardBits.ShardIdCount() == 0 {
  83. delete(dn.ecShards, s.VolumeId)
  84. }
  85. }
  86. }
  87. func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
  88. // check whether normal volumes has this volume id
  89. dn.RLock()
  90. _, ok := dn.volumes[id]
  91. if ok {
  92. hasVolumeId = true
  93. }
  94. dn.RUnlock()
  95. if hasVolumeId {
  96. return
  97. }
  98. // check whether ec shards has this volume id
  99. dn.ecShardsLock.RLock()
  100. _, ok = dn.ecShards[id]
  101. if ok {
  102. hasVolumeId = true
  103. }
  104. dn.ecShardsLock.RUnlock()
  105. return
  106. }