You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
3.0 KiB

  1. package topology
  2. import (
  3. "math"
  4. "sort"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. )
  10. const shardCount = erasure_coding.DataShardsCount + erasure_coding.ParityShardsCount
  11. type EcShardLocations struct {
  12. Collection string
  13. locations [shardCount][]*DataNode
  14. }
  15. func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  16. // convert into in memory struct storage.VolumeInfo
  17. var shards []*erasure_coding.EcVolumeInfo
  18. sort.Slice(shardInfos, func(i, j int) bool {
  19. return shardInfos[i].Id < shardInfos[j].Id
  20. })
  21. prevVolumeId := uint32(math.MaxUint32)
  22. var ecVolumeInfo *erasure_coding.EcVolumeInfo
  23. for _, shardInfo := range shardInfos {
  24. if shardInfo.Id != prevVolumeId {
  25. ecVolumeInfo = erasure_coding.NewEcVolumeInfo(shardInfo.Collection, needle.VolumeId(shardInfo.Id))
  26. shards = append(shards, ecVolumeInfo)
  27. }
  28. prevVolumeId = shardInfo.Id
  29. ecVolumeInfo.AddShardId(erasure_coding.ShardId(shardInfo.EcIndex))
  30. }
  31. // find out the delta volumes
  32. newShards, deletedShards = dn.UpdateEcShards(shards)
  33. for _, v := range newShards {
  34. t.RegisterEcShards(v, dn)
  35. }
  36. for _, v := range deletedShards {
  37. t.UnRegisterEcShards(v, dn)
  38. }
  39. return
  40. }
  41. func NewEcShardLocations(collection string) *EcShardLocations {
  42. return &EcShardLocations{
  43. Collection: collection,
  44. }
  45. }
  46. func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
  47. dataNodes := loc.locations[shardId]
  48. for _, n := range dataNodes {
  49. if n.Id() == dn.Id() {
  50. return false
  51. }
  52. }
  53. loc.locations[shardId] = append(dataNodes, dn)
  54. return true
  55. }
  56. func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
  57. dataNodes := loc.locations[shardId]
  58. foundIndex := -1
  59. for index, n := range dataNodes {
  60. if n.Id() == dn.Id() {
  61. foundIndex = index
  62. }
  63. }
  64. if foundIndex < 0 {
  65. return false
  66. }
  67. loc.locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
  68. return true
  69. }
  70. func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  71. t.ecShardMapLock.Lock()
  72. defer t.ecShardMapLock.Unlock()
  73. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  74. if !found {
  75. locations = NewEcShardLocations(ecShardInfos.Collection)
  76. t.ecShardMap[ecShardInfos.VolumeId] = locations
  77. }
  78. for _, shardId := range ecShardInfos.ShardIds() {
  79. locations.AddShard(shardId, dn)
  80. }
  81. }
  82. func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  83. glog.Infof("removing ec shard info:%+v", ecShardInfos)
  84. t.ecShardMapLock.Lock()
  85. defer t.ecShardMapLock.Unlock()
  86. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  87. if !found {
  88. return
  89. }
  90. for _, shardId := range ecShardInfos.ShardIds() {
  91. locations.DeleteShard(shardId, dn)
  92. }
  93. }