diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index ef395dbdb..e8ead5511 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -18,23 +18,23 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) // prepare the new ec shard map actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) for _, ecShards := range actualShards { - actualEcShardMap[ecShards.VolumeId]= ecShards + actualEcShardMap[ecShards.VolumeId] = ecShards } // found out the newShards and deletedShards dn.ecShardsLock.RLock() - for vid, ecShards := range dn.ecShards{ + for vid, ecShards := range dn.ecShards { if actualEcShards, ok := actualEcShardMap[vid]; !ok { // dn registered ec shards not found in the new set of ec shards deletedShards = append(deletedShards, ecShards) } else { // found, but maybe the actual shard could be missing a := actualEcShards.Minus(ecShards) - if len(a.ShardIds())>0 { + if len(a.ShardIds()) > 0 { newShards = append(newShards, a) } d := ecShards.Minus(actualEcShards) - if len(d.ShardIds())>0 { + if len(d.ShardIds()) > 0 { deletedShards = append(deletedShards, d) } } @@ -46,7 +46,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) } dn.ecShardsLock.RUnlock() - if len(newShards)>0 || len(deletedShards)>0{ + if len(newShards) > 0 || len(deletedShards) > 0 { // if changed, set to the new ec shard map dn.ecShardsLock.Lock() dn.ecShards = actualEcShardMap diff --git a/weed/topology/topology.go b/weed/topology/topology.go index e463aae29..667846f02 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math/rand" + "sync" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -17,7 +18,9 @@ import ( type Topology struct { NodeImpl - collectionMap *util.ConcurrentReadMap + collectionMap *util.ConcurrentReadMap + ecShardMap map[needle.VolumeId]*EcShardLocations + ecShardMapLock sync.RWMutex pulse int64 @@ -39,6 +42,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() + t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go index 5592b9b64..8c8a0b67b 100644 --- a/weed/topology/topology_ec.go +++ b/weed/topology/topology_ec.go @@ -1,6 +1,7 @@ package topology import ( + "math" "sort" "github.com/chrislusf/seaweedfs/weed/glog" @@ -9,6 +10,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" ) +const shardCount = erasure_coding.DataShardsCount + erasure_coding.ParityShardsCount + +type EcShardLocations struct { + Collection string + locations [shardCount][]*DataNode +} func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { // convert into in memory struct storage.VolumeInfo @@ -16,7 +23,7 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf sort.Slice(shardInfos, func(i, j int) bool { return shardInfos[i].Id < shardInfos[j].Id }) - var prevVolumeId uint32 + prevVolumeId := uint32(math.MaxUint32) var ecVolumeInfo *erasure_coding.EcVolumeInfo for _, shardInfo := range shardInfos { if shardInfo.Id != prevVolumeId { @@ -36,8 +43,63 @@ func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInf return } +func NewEcShardLocations(collection string) *EcShardLocations { + return &EcShardLocations{ + Collection: collection, + } +} + +func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) { + dataNodes := loc.locations[shardId] + for _, n := range dataNodes { + if n.Id() == dn.Id() { + return false + } + } + loc.locations[shardId] = append(dataNodes, dn) + return true +} + +func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) { + dataNodes := loc.locations[shardId] + foundIndex := -1 + for index, n := range dataNodes { + if n.Id() == dn.Id() { + foundIndex = index + } + } + if foundIndex < 0 { + return false + } + loc.locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...) + return true +} + func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { + + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + locations = NewEcShardLocations(ecShardInfos.Collection) + t.ecShardMap[ecShardInfos.VolumeId] = locations + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.AddShard(shardId, dn) + } } + func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) { glog.Infof("removing ec shard info:%+v", ecShardInfos) + t.ecShardMapLock.Lock() + defer t.ecShardMapLock.Unlock() + + locations, found := t.ecShardMap[ecShardInfos.VolumeId] + if !found { + return + } + for _, shardId := range ecShardInfos.ShardIds() { + locations.DeleteShard(shardId, dn) + } }