Browse Source

broadcast vid->location map to master clients for ec chards

pull/991/head
Chris Lu 6 years ago
parent
commit
b58e25e588
  1. 25
      weed/server/master_grpc_server.go
  2. 26
      weed/topology/data_node_ec.go

25
weed/server/master_grpc_server.go

@ -9,6 +9,7 @@ import (
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
) )
@ -108,18 +109,32 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// update master internal volume layouts // update master internal volume layouts
t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
}
for _, s := range heartbeat.DeletedEcShards {
if dn.HasVolumesById(needle.VolumeId(s.Id)) {
continue
}
message.DeletedVids = append(message.DeletedVids, s.Id)
}
} }
if len(heartbeat.EcShards) > 0 { if len(heartbeat.EcShards) > 0 {
glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
//TODO broadcast the ec vid
if len(newShards)>0{
// broadcast the ec vid changes to master clients
for _, s := range newShards {
message.NewVids = append(message.NewVids, uint32(s.VolumeId))
} }
if len(deletedShards)>0{
for _, s := range deletedShards {
if dn.HasVolumesById(s.VolumeId) {
continue
}
message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
} }
} }

26
weed/topology/data_node_ec.go

@ -92,3 +92,29 @@ func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
} }
} }
func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
// check whether normal volumes has this volume id
dn.RLock()
_, ok := dn.volumes[id]
if ok {
hasVolumeId = true
}
dn.RUnlock()
if hasVolumeId {
return
}
// check whether ec shards has this volume id
dn.ecShardsLock.RLock()
_, ok = dn.ecShards[id]
if ok {
hasVolumeId = true
}
dn.ecShardsLock.RUnlock()
return
}
Loading…
Cancel
Save