From b58e25e5883c3a53f65cf669d0ac1fc7d3354d35 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sun, 26 May 2019 00:49:15 -0700
Subject: [PATCH] broadcast vid->location map to master clients for ec chards

---
 weed/server/master_grpc_server.go | 25 ++++++++++++++++++++-----
 weed/topology/data_node_ec.go     | 26 ++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 69bd56df0..1bc28fdf6 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -9,6 +9,7 @@ import (
 	"github.com/chrislusf/raft"
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+	"github.com/chrislusf/seaweedfs/weed/storage/needle"
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"google.golang.org/grpc/peer"
 )
@@ -108,18 +109,32 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
 
 			// update master internal volume layouts
 			t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+
+			for _, s := range heartbeat.NewEcShards {
+				message.NewVids = append(message.NewVids, s.Id)
+			}
+			for _, s := range heartbeat.DeletedEcShards {
+				if dn.HasVolumesById(needle.VolumeId(s.Id)) {
+					continue
+				}
+				message.DeletedVids = append(message.DeletedVids, s.Id)
+			}
+
 		}
 
 		if len(heartbeat.EcShards) > 0 {
 			glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
 			newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
 
-			//TODO broadcast the ec vid
-			if len(newShards)>0{
-
+			// broadcast the ec vid changes to master clients
+			for _, s := range newShards {
+				message.NewVids = append(message.NewVids, uint32(s.VolumeId))
 			}
-			if len(deletedShards)>0{
-
+			for _, s := range deletedShards {
+				if dn.HasVolumesById(s.VolumeId) {
+					continue
+				}
+				message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
 			}
 
 		}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 95635331b..2ea7fc6ad 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -92,3 +92,29 @@ func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
 	}
 
 }
+
+func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
+
+	// check whether normal volumes has this volume id
+	dn.RLock()
+	_, ok := dn.volumes[id]
+	if ok {
+		hasVolumeId = true
+	}
+	dn.RUnlock()
+
+	if hasVolumeId {
+		return
+	}
+
+	// check whether ec shards has this volume id
+	dn.ecShardsLock.RLock()
+	_, ok = dn.ecShards[id]
+	if ok {
+		hasVolumeId = true
+	}
+	dn.ecShardsLock.RUnlock()
+
+	return
+
+}