From 0d989491993a98753bf34569f74791c87cecb900 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Fri, 27 Jul 2018 01:54:45 -0700
Subject: [PATCH 01/11] tmp commit

---
 weed/pb/seaweed.proto                         |  8 ++
 weed/util/http_util.go                        |  1 +
 .../topolisenter/client_grpc_to_master.go     | 85 +++++++++++++++++++
 .../wdclient/topolisenter/cluster_listener.go | 56 ++++++++++++
 weed/wdclient/wdclient.go                     | 42 +++++++++
 5 files changed, 192 insertions(+)
 create mode 100644 weed/wdclient/topolisenter/client_grpc_to_master.go
 create mode 100644 weed/wdclient/topolisenter/cluster_listener.go
 create mode 100644 weed/wdclient/wdclient.go

diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto
index 8f0fed72c..6ed580b00 100644
--- a/weed/pb/seaweed.proto
+++ b/weed/pb/seaweed.proto
@@ -9,6 +9,8 @@ service Seaweed {
     }
     rpc KeepConnected (stream Empty) returns (stream Empty) {
     }
+    rpc ListenForTopoChange (stream Empty) returns (stream VolumeLocation) {
+    }
 }
 
 //////////////////////////////////////////////////
@@ -55,3 +57,9 @@ message SuperBlockExtra {
     }
     ErasureCoding erasure_coding = 1;
 }
+
+message VolumeLocation {
+    string url = 1;
+    string public_url = 2;
+    repeated uint32 vid = 3;
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 51bedcdfd..7b28d3d91 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -83,6 +83,7 @@ func Head(url string) (http.Header, error) {
 	if err != nil {
 		return nil, err
 	}
+	defer r.Body.Close()
 	if r.StatusCode >= 400 {
 		return nil, fmt.Errorf("%s: %s", url, r.Status)
 	}
diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go
new file mode 100644
index 000000000..863f79a1d
--- /dev/null
+++ b/weed/wdclient/topolisenter/client_grpc_to_master.go
@@ -0,0 +1,85 @@
+package clusterlistener
+
+import (
+	"context"
+	"fmt"
+	"io"
+
+	"google.golang.org/grpc"
+
+	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+	"github.com/golang/glog"
+)
+
+func (clusterListener *ClusterListener) establishConnectionWithMaster(
+	master string, msgChan chan *pb.ClusterStatusMessage) error {
+	grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
+	if err != nil {
+		return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err)
+	}
+	defer func() { _ = grpcConnection.Close() }()
+
+	masterClient := pb.NewAlpineMasterClient(grpcConnection)
+
+	stream, err := masterClient.RegisterClient(context.Background())
+	if err != nil {
+		return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err)
+	}
+
+	// TODO possible goroutine leaks if retry happens
+	go func() {
+		for keyspace := range clusterListener.clusters {
+			// glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter)
+			if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil {
+				// glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err)
+				return
+			}
+		}
+
+		for {
+			msg := <-clusterListener.keyspaceFollowMessageChan
+			if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil {
+				if msg.isUnfollow {
+					glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
+				} else {
+					glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
+				}
+				return
+			}
+		}
+
+	}()
+
+	// glog.V(2).Infof("Reporting allocated %v", as.allocatedResource)
+
+	// glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master)
+
+	for {
+		msg, err := stream.Recv()
+		if err == io.EOF {
+			// read done.
+			return nil
+		}
+		if err != nil {
+			return fmt.Errorf("client receive topology : %v", err)
+		}
+		msgChan <- msg
+		// glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg)
+	}
+
+}
+
+func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error {
+	clientHeartbeat := &pb.ClientHeartbeat{
+		ClientName: clientName,
+		ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{
+			Keyspace:   keyspace,
+			IsUnfollow: isUnfollow,
+		},
+	}
+
+	if err := stream.Send(clientHeartbeat); err != nil {
+		return fmt.Errorf("%s client send heartbeat: %v", clientName, err)
+	}
+	return nil
+}
diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go
new file mode 100644
index 000000000..91ca6fb6f
--- /dev/null
+++ b/weed/wdclient/topolisenter/cluster_listener.go
@@ -0,0 +1,56 @@
+package clusterlistener
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+	"code.uber.internal/fraud/alpine/server/util"
+	"github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+type Location struct {
+	Url       string
+	PublicUrl string
+}
+
+type ClusterListener struct {
+	sync.RWMutex
+	vid2locations map[storage.VolumeId][]*Location
+	clientName    string
+}
+
+func NewClusterListener(clientName string) *ClusterListener {
+	return &ClusterListener{
+		vid2locations: make(map[storage.VolumeId][]*Location),
+		clientName:    clientName,
+	}
+}
+
+// StartListener keeps the listener connected to the master.
+func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) {
+
+	clusterUpdatesChan := make(chan *pb.ClusterStatusMessage)
+
+	go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error {
+		return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan)
+	}, 2*time.Second)
+
+	go func() {
+		for {
+			select {
+			case msg := <-clusterUpdatesChan:
+				clusterListener.processClusterStatusMessage(msg)
+			}
+		}
+	}()
+
+	// println("client is connected to master", master, "data center", dataCenter)
+
+	return
+
+}
+
+func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) {
+}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
new file mode 100644
index 000000000..cbe03359f
--- /dev/null
+++ b/weed/wdclient/wdclient.go
@@ -0,0 +1,42 @@
+package wdclient
+
+import (
+	"context"
+	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+)
+
+type SeaweedClient struct {
+	ctx             context.Context
+	Master          string
+	ClientName      string
+	ClusterListener *clusterlistener.ClusterListener
+}
+
+// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes
+func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient {
+	c := &SeaweedClient{
+		ctx:             ctx,
+		Master:          master,
+		ClusterListener: clusterlistener.NewClusterListener(clientName),
+		ClientName:      clientName,
+	}
+	c.ClusterListener.StartListener(ctx, c.Master)
+
+	conn, err := grpc.Dial(c.Master, grpc.WithInsecure())
+	if err != nil {
+		glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err)
+	}
+	c.MasterClient = pb.NewAlpineMasterClient(conn)
+
+	return c
+}
+
+// NewClusterClient create a lightweight client to access a specific cluster
+// TODO The call will block if the keyspace is not created in this data center.
+func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) {
+
+	return &ClusterClient{
+		keyspace: keyspace,
+	}
+
+}

From a12c7b86b0ca7ebd35f8763ebc89c675a49f8c59 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Fri, 27 Jul 2018 23:09:55 -0700
Subject: [PATCH 02/11] broadcast messages of new and deleted volumes

---
 weed/filer2/filer_master.go       |  26 +++--
 weed/pb/master_pb/seaweed.pb.go   | 167 +++++++++++++++++++++---------
 weed/pb/seaweed.proto             |  11 +-
 weed/server/master_grpc_server.go | 102 ++++++++++++++++--
 weed/server/master_server.go      |   6 ++
 weed/topology/data_node.go        |  11 +-
 weed/topology/topology.go         |   5 +-
 7 files changed, 251 insertions(+), 77 deletions(-)

diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go
index 63c3ef452..bbac17940 100644
--- a/weed/filer2/filer_master.go
+++ b/weed/filer2/filer_master.go
@@ -3,11 +3,10 @@ package filer2
 import (
 	"context"
 	"fmt"
-	"time"
-
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
+	"time"
 )
 
 func (fs *Filer) GetMaster() string {
@@ -16,6 +15,13 @@ func (fs *Filer) GetMaster() string {
 
 func (fs *Filer) KeepConnectedToMaster() {
 	glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
+	for {
+		fs.tryAllMasters()
+		time.Sleep(time.Second)
+	}
+}
+
+func (fs *Filer) tryAllMasters() {
 	for _, master := range fs.masters {
 		glog.V(0).Infof("Connecting to %v", master)
 		withMasterClient(master, func(client master_pb.SeaweedClient) error {
@@ -28,17 +34,17 @@ func (fs *Filer) KeepConnectedToMaster() {
 			glog.V(0).Infof("Connected to %v", master)
 			fs.currentMaster = master
 
-			for {
-				time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
-
-				if err = stream.Send(&master_pb.Empty{}); err != nil {
-					glog.V(0).Infof("failed to send to %s: %v", master, err)
-					return err
-				}
+			if err = stream.Send(&master_pb.ClientListenRequest{Name: "filer"}); err != nil {
+				glog.V(0).Infof("failed to send to %s: %v", master, err)
+				return err
+			}
 
-				if _, err = stream.Recv(); err != nil {
+			for {
+				if volumeLocation, err := stream.Recv(); err != nil {
 					glog.V(0).Infof("failed to receive from %s: %v", master, err)
 					return err
+				} else {
+					glog.V(0).Infof("volume location: %+v", volumeLocation)
 				}
 			}
 		})
diff --git a/weed/pb/master_pb/seaweed.pb.go b/weed/pb/master_pb/seaweed.pb.go
index 5416e19b0..15a72f0bf 100644
--- a/weed/pb/master_pb/seaweed.pb.go
+++ b/weed/pb/master_pb/seaweed.pb.go
@@ -14,6 +14,8 @@ It has these top-level messages:
 	VolumeInformationMessage
 	Empty
 	SuperBlockExtra
+	ClientListenRequest
+	VolumeLocation
 */
 package master_pb
 
@@ -295,6 +297,62 @@ func (m *SuperBlockExtra_ErasureCoding) GetVolumeIds() []uint32 {
 	return nil
 }
 
+type ClientListenRequest struct {
+	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+}
+
+func (m *ClientListenRequest) Reset()                    { *m = ClientListenRequest{} }
+func (m *ClientListenRequest) String() string            { return proto.CompactTextString(m) }
+func (*ClientListenRequest) ProtoMessage()               {}
+func (*ClientListenRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
+
+func (m *ClientListenRequest) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+type VolumeLocation struct {
+	Url         string   `protobuf:"bytes,1,opt,name=url" json:"url,omitempty"`
+	PublicUrl   string   `protobuf:"bytes,2,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
+	NewVids     []uint32 `protobuf:"varint,3,rep,packed,name=new_vids,json=newVids" json:"new_vids,omitempty"`
+	DeletedVids []uint32 `protobuf:"varint,4,rep,packed,name=deleted_vids,json=deletedVids" json:"deleted_vids,omitempty"`
+}
+
+func (m *VolumeLocation) Reset()                    { *m = VolumeLocation{} }
+func (m *VolumeLocation) String() string            { return proto.CompactTextString(m) }
+func (*VolumeLocation) ProtoMessage()               {}
+func (*VolumeLocation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
+
+func (m *VolumeLocation) GetUrl() string {
+	if m != nil {
+		return m.Url
+	}
+	return ""
+}
+
+func (m *VolumeLocation) GetPublicUrl() string {
+	if m != nil {
+		return m.PublicUrl
+	}
+	return ""
+}
+
+func (m *VolumeLocation) GetNewVids() []uint32 {
+	if m != nil {
+		return m.NewVids
+	}
+	return nil
+}
+
+func (m *VolumeLocation) GetDeletedVids() []uint32 {
+	if m != nil {
+		return m.DeletedVids
+	}
+	return nil
+}
+
 func init() {
 	proto.RegisterType((*Heartbeat)(nil), "master_pb.Heartbeat")
 	proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse")
@@ -302,6 +360,8 @@ func init() {
 	proto.RegisterType((*Empty)(nil), "master_pb.Empty")
 	proto.RegisterType((*SuperBlockExtra)(nil), "master_pb.SuperBlockExtra")
 	proto.RegisterType((*SuperBlockExtra_ErasureCoding)(nil), "master_pb.SuperBlockExtra.ErasureCoding")
+	proto.RegisterType((*ClientListenRequest)(nil), "master_pb.ClientListenRequest")
+	proto.RegisterType((*VolumeLocation)(nil), "master_pb.VolumeLocation")
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -368,8 +428,8 @@ func (c *seaweedClient) KeepConnected(ctx context.Context, opts ...grpc.CallOpti
 }
 
 type Seaweed_KeepConnectedClient interface {
-	Send(*Empty) error
-	Recv() (*Empty, error)
+	Send(*ClientListenRequest) error
+	Recv() (*VolumeLocation, error)
 	grpc.ClientStream
 }
 
@@ -377,12 +437,12 @@ type seaweedKeepConnectedClient struct {
 	grpc.ClientStream
 }
 
-func (x *seaweedKeepConnectedClient) Send(m *Empty) error {
+func (x *seaweedKeepConnectedClient) Send(m *ClientListenRequest) error {
 	return x.ClientStream.SendMsg(m)
 }
 
-func (x *seaweedKeepConnectedClient) Recv() (*Empty, error) {
-	m := new(Empty)
+func (x *seaweedKeepConnectedClient) Recv() (*VolumeLocation, error) {
+	m := new(VolumeLocation)
 	if err := x.ClientStream.RecvMsg(m); err != nil {
 		return nil, err
 	}
@@ -431,8 +491,8 @@ func _Seaweed_KeepConnected_Handler(srv interface{}, stream grpc.ServerStream) e
 }
 
 type Seaweed_KeepConnectedServer interface {
-	Send(*Empty) error
-	Recv() (*Empty, error)
+	Send(*VolumeLocation) error
+	Recv() (*ClientListenRequest, error)
 	grpc.ServerStream
 }
 
@@ -440,12 +500,12 @@ type seaweedKeepConnectedServer struct {
 	grpc.ServerStream
 }
 
-func (x *seaweedKeepConnectedServer) Send(m *Empty) error {
+func (x *seaweedKeepConnectedServer) Send(m *VolumeLocation) error {
 	return x.ServerStream.SendMsg(m)
 }
 
-func (x *seaweedKeepConnectedServer) Recv() (*Empty, error) {
-	m := new(Empty)
+func (x *seaweedKeepConnectedServer) Recv() (*ClientListenRequest, error) {
+	m := new(ClientListenRequest)
 	if err := x.ServerStream.RecvMsg(m); err != nil {
 		return nil, err
 	}
@@ -476,45 +536,50 @@ var _Seaweed_serviceDesc = grpc.ServiceDesc{
 func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) }
 
 var fileDescriptor0 = []byte{
-	// 627 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x94, 0xcf, 0x6e, 0xd3, 0x40,
-	0x10, 0xc6, 0x71, 0x92, 0x26, 0xf5, 0xa4, 0x6e, 0xd3, 0x15, 0x42, 0x56, 0x29, 0x10, 0xc2, 0xc5,
-	0x12, 0x28, 0x42, 0xe5, 0xc4, 0x81, 0x4b, 0xa3, 0x22, 0xaa, 0x82, 0x5a, 0x39, 0x82, 0x03, 0x17,
-	0x6b, 0xe3, 0x9d, 0x56, 0xab, 0xae, 0xff, 0x68, 0x77, 0x53, 0xe2, 0xbe, 0x04, 0x4f, 0xc2, 0x2b,
-	0x70, 0xe2, 0xc1, 0xd0, 0x8e, 0xed, 0x34, 0x14, 0xb8, 0xcd, 0xfc, 0x66, 0xc6, 0x3b, 0xf9, 0xbe,
-	0xdd, 0x40, 0x60, 0x90, 0x7f, 0x43, 0x14, 0xd3, 0x52, 0x17, 0xb6, 0x60, 0x7e, 0xc6, 0x8d, 0x45,
-	0x9d, 0x94, 0x8b, 0xc9, 0x8f, 0x0e, 0xf8, 0x1f, 0x90, 0x6b, 0xbb, 0x40, 0x6e, 0xd9, 0x2e, 0x74,
-	0x64, 0x19, 0x7a, 0x63, 0x2f, 0xf2, 0xe3, 0x8e, 0x2c, 0x19, 0x83, 0x5e, 0x59, 0x68, 0x1b, 0x76,
-	0xc6, 0x5e, 0x14, 0xc4, 0x14, 0xb3, 0x27, 0x00, 0xe5, 0x72, 0xa1, 0x64, 0x9a, 0x2c, 0xb5, 0x0a,
-	0xbb, 0xd4, 0xeb, 0xd7, 0xe4, 0xb3, 0x56, 0x2c, 0x82, 0x51, 0xc6, 0x57, 0xc9, 0x4d, 0xa1, 0x96,
-	0x19, 0x26, 0x69, 0xb1, 0xcc, 0x6d, 0xd8, 0xa3, 0xf1, 0xdd, 0x8c, 0xaf, 0xbe, 0x10, 0x9e, 0x39,
-	0xca, 0xc6, 0xb0, 0xe3, 0x3a, 0x2f, 0xa5, 0xc2, 0xe4, 0x1a, 0xab, 0x70, 0x6b, 0xec, 0x45, 0xbd,
-	0x18, 0x32, 0xbe, 0x7a, 0x2f, 0x15, 0x9e, 0x61, 0xc5, 0x9e, 0xc1, 0x50, 0x70, 0xcb, 0x93, 0x14,
-	0x73, 0x8b, 0x3a, 0xec, 0xd3, 0x59, 0xe0, 0xd0, 0x8c, 0x88, 0xdb, 0x4f, 0xf3, 0xf4, 0x3a, 0x1c,
-	0x50, 0x85, 0x62, 0xb7, 0x1f, 0x17, 0x99, 0xcc, 0x13, 0xda, 0x7c, 0x9b, 0x8e, 0xf6, 0x89, 0x5c,
-	0xb8, 0xf5, 0xdf, 0xc1, 0xa0, 0xde, 0xcd, 0x84, 0xfe, 0xb8, 0x1b, 0x0d, 0x8f, 0x5e, 0x4c, 0xd7,
-	0x6a, 0x4c, 0xeb, 0xf5, 0x4e, 0xf3, 0xcb, 0x42, 0x67, 0xdc, 0xca, 0x22, 0xff, 0x84, 0xc6, 0xf0,
-	0x2b, 0x8c, 0xdb, 0x99, 0x89, 0x81, 0xfd, 0xb5, 0x5c, 0x31, 0x9a, 0xb2, 0xc8, 0x0d, 0xb2, 0x08,
-	0xf6, 0xea, 0xfa, 0x5c, 0xde, 0xe2, 0x47, 0x99, 0x49, 0x4b, 0x1a, 0xf6, 0xe2, 0xfb, 0x98, 0x1d,
-	0x82, 0x6f, 0x30, 0xd5, 0x68, 0xcf, 0xb0, 0x22, 0x55, 0xfd, 0xf8, 0x0e, 0xb0, 0x47, 0xd0, 0x57,
-	0xc8, 0x05, 0xea, 0x46, 0xd6, 0x26, 0x9b, 0xfc, 0xea, 0x40, 0xf8, 0xbf, 0xd5, 0xc8, 0x33, 0x41,
-	0xe7, 0x05, 0x71, 0x47, 0x0a, 0xa7, 0x89, 0x91, 0xb7, 0x48, 0x5f, 0xef, 0xc5, 0x14, 0xb3, 0xa7,
-	0x00, 0x69, 0xa1, 0x14, 0xa6, 0x6e, 0xb0, 0xf9, 0xf8, 0x06, 0x71, 0x9a, 0x91, 0x0d, 0x77, 0x76,
-	0xf5, 0x62, 0xdf, 0x91, 0xda, 0xa9, 0xe7, 0xb0, 0x23, 0x50, 0xa1, 0x6d, 0x1b, 0x6a, 0xa7, 0x86,
-	0x35, 0xab, 0x5b, 0x5e, 0x01, 0xab, 0x53, 0x91, 0x2c, 0xaa, 0x75, 0x63, 0x9f, 0x1a, 0x47, 0x4d,
-	0xe5, 0xb8, 0x6a, 0xbb, 0x1f, 0x83, 0xaf, 0x91, 0x8b, 0xa4, 0xc8, 0x55, 0x45, 0xe6, 0x6d, 0xc7,
-	0xdb, 0x0e, 0x9c, 0xe7, 0xaa, 0x62, 0x2f, 0x61, 0x5f, 0x63, 0xa9, 0x64, 0xca, 0x93, 0x52, 0xf1,
-	0x14, 0x33, 0xcc, 0x5b, 0x1f, 0x47, 0x4d, 0xe1, 0xa2, 0xe5, 0x2c, 0x84, 0xc1, 0x0d, 0x6a, 0xe3,
-	0x7e, 0x96, 0x4f, 0x2d, 0x6d, 0xca, 0x46, 0xd0, 0xb5, 0x56, 0x85, 0x40, 0xd4, 0x85, 0x93, 0x01,
-	0x6c, 0x9d, 0x64, 0xa5, 0xad, 0x26, 0x3f, 0x3d, 0xd8, 0x9b, 0x2f, 0x4b, 0xd4, 0xc7, 0xaa, 0x48,
-	0xaf, 0x4f, 0x56, 0x56, 0x73, 0x76, 0x0e, 0xbb, 0xa8, 0xb9, 0x59, 0x6a, 0xb7, 0xbb, 0x90, 0xf9,
-	0x15, 0x49, 0x3a, 0x3c, 0x8a, 0x36, 0xae, 0xc7, 0xbd, 0x99, 0xe9, 0x49, 0x3d, 0x30, 0xa3, 0xfe,
-	0x38, 0xc0, 0xcd, 0xf4, 0xe0, 0x2b, 0x04, 0x7f, 0xd4, 0x9d, 0x31, 0xee, 0xea, 0x36, 0x56, 0x51,
-	0xec, 0x1c, 0x2f, 0xb9, 0x96, 0xb6, 0x6a, 0x9e, 0x58, 0x93, 0x39, 0x43, 0x9a, 0x17, 0x24, 0x85,
-	0x09, 0xbb, 0xe3, 0xae, 0xbb, 0xc4, 0x35, 0x39, 0x15, 0xe6, 0xe8, 0xbb, 0x07, 0x83, 0x79, 0xfd,
-	0xa4, 0xd9, 0x29, 0x04, 0x73, 0xcc, 0xc5, 0xdd, 0x23, 0x7e, 0xb8, 0xb1, 0xf1, 0x9a, 0x1e, 0x1c,
-	0xfe, 0x8b, 0xb6, 0x37, 0x78, 0xf2, 0x20, 0xf2, 0x5e, 0x7b, 0xec, 0x2d, 0x04, 0x67, 0x88, 0xe5,
-	0xac, 0xc8, 0x73, 0x4c, 0x2d, 0x0a, 0x36, 0xda, 0x18, 0x22, 0xe9, 0x0e, 0xfe, 0x22, 0xf5, 0xe8,
-	0xa2, 0x4f, 0xff, 0x2c, 0x6f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0x73, 0x2b, 0x68, 0x15, 0x6a,
-	0x04, 0x00, 0x00,
+	// 708 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x54, 0x41, 0x6f, 0xf3, 0x44,
+	0x10, 0xc5, 0x49, 0xbe, 0x24, 0x9e, 0x7c, 0xc9, 0x97, 0x2e, 0x08, 0xb9, 0xa5, 0x94, 0x60, 0x2e,
+	0x46, 0xa0, 0x08, 0x95, 0x33, 0x97, 0x46, 0x45, 0x54, 0x2d, 0x6a, 0xe5, 0x88, 0x1e, 0xb8, 0x58,
+	0x1b, 0xef, 0xb4, 0x5a, 0x75, 0xbd, 0x36, 0xbb, 0x9b, 0x36, 0xee, 0x85, 0x7f, 0xc3, 0x8d, 0xbf,
+	0xc0, 0x89, 0x1f, 0x86, 0x76, 0xd7, 0x4e, 0x42, 0x28, 0xb7, 0xd9, 0x37, 0x6f, 0xbc, 0xe3, 0xf7,
+	0x66, 0x16, 0xc6, 0x1a, 0xe9, 0x0b, 0x22, 0x9b, 0x57, 0xaa, 0x34, 0x25, 0x09, 0x0b, 0xaa, 0x0d,
+	0xaa, 0xac, 0x5a, 0xc5, 0x7f, 0x76, 0x20, 0xfc, 0x09, 0xa9, 0x32, 0x2b, 0xa4, 0x86, 0x4c, 0xa0,
+	0xc3, 0xab, 0x28, 0x98, 0x05, 0x49, 0x98, 0x76, 0x78, 0x45, 0x08, 0xf4, 0xaa, 0x52, 0x99, 0xa8,
+	0x33, 0x0b, 0x92, 0x71, 0xea, 0x62, 0xf2, 0x39, 0x40, 0xb5, 0x5e, 0x09, 0x9e, 0x67, 0x6b, 0x25,
+	0xa2, 0xae, 0xe3, 0x86, 0x1e, 0xf9, 0x45, 0x09, 0x92, 0xc0, 0xb4, 0xa0, 0x9b, 0xec, 0xb9, 0x14,
+	0xeb, 0x02, 0xb3, 0xbc, 0x5c, 0x4b, 0x13, 0xf5, 0x5c, 0xf9, 0xa4, 0xa0, 0x9b, 0x7b, 0x07, 0x2f,
+	0x2c, 0x4a, 0x66, 0xf0, 0xde, 0x32, 0x1f, 0xb8, 0xc0, 0xec, 0x09, 0xeb, 0xe8, 0xdd, 0x2c, 0x48,
+	0x7a, 0x29, 0x14, 0x74, 0xf3, 0x23, 0x17, 0x78, 0x8d, 0x35, 0xf9, 0x02, 0x46, 0x8c, 0x1a, 0x9a,
+	0xe5, 0x28, 0x0d, 0xaa, 0xa8, 0xef, 0xee, 0x02, 0x0b, 0x2d, 0x1c, 0x62, 0xfb, 0x53, 0x34, 0x7f,
+	0x8a, 0x06, 0x2e, 0xe3, 0x62, 0xdb, 0x1f, 0x65, 0x05, 0x97, 0x99, 0xeb, 0x7c, 0xe8, 0xae, 0x0e,
+	0x1d, 0x72, 0x67, 0xdb, 0xff, 0x01, 0x06, 0xbe, 0x37, 0x1d, 0x85, 0xb3, 0x6e, 0x32, 0x3a, 0xff,
+	0x6a, 0xbe, 0x55, 0x63, 0xee, 0xdb, 0xbb, 0x92, 0x0f, 0xa5, 0x2a, 0xa8, 0xe1, 0xa5, 0xfc, 0x19,
+	0xb5, 0xa6, 0x8f, 0x98, 0xb6, 0x35, 0xb1, 0x86, 0xa3, 0xad, 0x5c, 0x29, 0xea, 0xaa, 0x94, 0x1a,
+	0x49, 0x02, 0x1f, 0x7c, 0x7e, 0xc9, 0x5f, 0xf1, 0x86, 0x17, 0xdc, 0x38, 0x0d, 0x7b, 0xe9, 0x21,
+	0x4c, 0x4e, 0x21, 0xd4, 0x98, 0x2b, 0x34, 0xd7, 0x58, 0x3b, 0x55, 0xc3, 0x74, 0x07, 0x90, 0x4f,
+	0xa1, 0x2f, 0x90, 0x32, 0x54, 0x8d, 0xac, 0xcd, 0x29, 0xfe, 0xbb, 0x03, 0xd1, 0xff, 0xb5, 0xe6,
+	0x3c, 0x63, 0xee, 0xbe, 0x71, 0xda, 0xe1, 0xcc, 0x6a, 0xa2, 0xf9, 0x2b, 0xba, 0xaf, 0xf7, 0x52,
+	0x17, 0x93, 0x33, 0x80, 0xbc, 0x14, 0x02, 0x73, 0x5b, 0xd8, 0x7c, 0x7c, 0x0f, 0xb1, 0x9a, 0x39,
+	0x1b, 0x76, 0x76, 0xf5, 0xd2, 0xd0, 0x22, 0xde, 0xa9, 0x2f, 0xe1, 0x3d, 0x43, 0x81, 0xa6, 0x25,
+	0x78, 0xa7, 0x46, 0x1e, 0xf3, 0x94, 0x6f, 0x81, 0xf8, 0x23, 0xcb, 0x56, 0xf5, 0x96, 0xd8, 0x77,
+	0xc4, 0x69, 0x93, 0xb9, 0xa8, 0x5b, 0xf6, 0x67, 0x10, 0x2a, 0xa4, 0x2c, 0x2b, 0xa5, 0xa8, 0x9d,
+	0x79, 0xc3, 0x74, 0x68, 0x81, 0x5b, 0x29, 0x6a, 0xf2, 0x0d, 0x1c, 0x29, 0xac, 0x04, 0xcf, 0x69,
+	0x56, 0x09, 0x9a, 0x63, 0x81, 0xb2, 0xf5, 0x71, 0xda, 0x24, 0xee, 0x5a, 0x9c, 0x44, 0x30, 0x78,
+	0x46, 0xa5, 0xed, 0x6f, 0x85, 0x8e, 0xd2, 0x1e, 0xc9, 0x14, 0xba, 0xc6, 0x88, 0x08, 0x1c, 0x6a,
+	0xc3, 0x78, 0x00, 0xef, 0x2e, 0x8b, 0xca, 0xd4, 0xf1, 0x5f, 0x01, 0x7c, 0x58, 0xae, 0x2b, 0x54,
+	0x17, 0xa2, 0xcc, 0x9f, 0x2e, 0x37, 0x46, 0x51, 0x72, 0x0b, 0x13, 0x54, 0x54, 0xaf, 0x95, 0xed,
+	0x9d, 0x71, 0xf9, 0xe8, 0x24, 0x1d, 0x9d, 0x27, 0x7b, 0xe3, 0x71, 0x50, 0x33, 0xbf, 0xf4, 0x05,
+	0x0b, 0xc7, 0x4f, 0xc7, 0xb8, 0x7f, 0x3c, 0xf9, 0x15, 0xc6, 0xff, 0xca, 0x5b, 0x63, 0xec, 0xe8,
+	0x36, 0x56, 0xb9, 0xd8, 0x3a, 0x5e, 0x51, 0xc5, 0x4d, 0xdd, 0xac, 0x58, 0x73, 0xb2, 0x86, 0x34,
+	0x1b, 0xc4, 0x99, 0x8e, 0xba, 0xb3, 0xae, 0x1d, 0x62, 0x8f, 0x5c, 0x31, 0x1d, 0x7f, 0x0d, 0x1f,
+	0x2f, 0x04, 0x47, 0x69, 0x6e, 0xb8, 0x36, 0x28, 0x53, 0xfc, 0x6d, 0x8d, 0xda, 0xd8, 0x1b, 0x24,
+	0x2d, 0xb0, 0x59, 0x60, 0x17, 0xc7, 0xbf, 0xc3, 0xc4, 0x8f, 0xce, 0x4d, 0x99, 0xbb, 0xb9, 0xb1,
+	0xc2, 0xd8, 0xcd, 0xf5, 0x24, 0x1b, 0x1e, 0xac, 0x74, 0xe7, 0x70, 0xa5, 0x8f, 0x61, 0x28, 0xf1,
+	0x25, 0x7b, 0xde, 0xb5, 0x32, 0x90, 0xf8, 0x72, 0xcf, 0x99, 0xde, 0x4d, 0x06, 0xf3, 0xe9, 0x9e,
+	0x4b, 0x37, 0x93, 0xc1, 0x2c, 0xe5, 0xfc, 0x8f, 0x00, 0x06, 0x4b, 0xff, 0xfc, 0x90, 0x2b, 0x18,
+	0x2f, 0x51, 0xb2, 0xdd, 0x83, 0xf3, 0xc9, 0x9e, 0xba, 0x5b, 0xf4, 0xe4, 0xf4, 0x2d, 0xb4, 0xdd,
+	0xb6, 0xf8, 0xa3, 0x24, 0xf8, 0x2e, 0x20, 0x77, 0x30, 0xbe, 0x46, 0xac, 0x16, 0xa5, 0x94, 0x98,
+	0x1b, 0x64, 0xe4, 0x6c, 0xaf, 0xe8, 0x0d, 0x71, 0x4e, 0x8e, 0xff, 0xb3, 0xe7, 0xad, 0x22, 0xfe,
+	0x8b, 0xab, 0xbe, 0x7b, 0x1c, 0xbf, 0xff, 0x27, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x83, 0xd9, 0x6b,
+	0x2d, 0x05, 0x00, 0x00,
 }
diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto
index 6ed580b00..e7b757bd8 100644
--- a/weed/pb/seaweed.proto
+++ b/weed/pb/seaweed.proto
@@ -7,9 +7,7 @@ package master_pb;
 service Seaweed {
     rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
     }
-    rpc KeepConnected (stream Empty) returns (stream Empty) {
-    }
-    rpc ListenForTopoChange (stream Empty) returns (stream VolumeLocation) {
+    rpc KeepConnected (stream ClientListenRequest) returns (stream VolumeLocation) {
     }
 }
 
@@ -58,8 +56,13 @@ message SuperBlockExtra {
     ErasureCoding erasure_coding = 1;
 }
 
+message ClientListenRequest {
+    string name = 1;
+}
+
 message VolumeLocation {
     string url = 1;
     string public_url = 2;
-    repeated uint32 vid = 3;
+    repeated uint32 new_vids = 3;
+    repeated uint32 deleted_vids = 4;
 }
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index f24cea619..c12938374 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -8,6 +8,7 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"google.golang.org/grpc/peer"
+	"fmt"
 )
 
 func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -16,8 +17,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
 
 	defer func() {
 		if dn != nil {
+
 			glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
 			t.UnRegisterDataNode(dn)
+
+			message := &master_pb.VolumeLocation{
+				Url:       dn.Url(),
+				PublicUrl: dn.PublicUrl,
+			}
+			for _, v := range dn.GetVolumes() {
+				message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+			}
+
+			if len(message.DeletedVids) > 0 {
+				ms.clientChansLock.RLock()
+				for _, ch := range ms.clientChans {
+					ch <- message
+				}
+				ms.clientChansLock.RUnlock()
+			}
+
 		}
 	}()
 
@@ -49,7 +68,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
 				}
 			}
 
-			t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+			newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+
+			message := &master_pb.VolumeLocation{
+				Url:       dn.Url(),
+				PublicUrl: dn.PublicUrl,
+			}
+			for _, v := range newVolumes {
+				message.NewVids = append(message.NewVids, uint32(v.Id))
+			}
+			for _, v := range deletedVolumes {
+				message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+			}
+
+			if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
+				ms.clientChansLock.RLock()
+				for _, ch := range ms.clientChans {
+					ch <- message
+				}
+				ms.clientChansLock.RUnlock()
+			}
 
 		} else {
 			return err
@@ -69,13 +107,63 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
 
 // KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
 func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
-	for {
-		_, err := stream.Recv()
-		if err != nil {
-			return err
+
+	req, err := stream.Recv()
+	if err != nil {
+		return err
+	}
+
+	// remember client address
+	ctx := stream.Context()
+	// fmt.Printf("FromContext %+v\n", ctx)
+	pr, ok := peer.FromContext(ctx)
+	if !ok {
+		glog.Error("failed to get peer from ctx")
+		return fmt.Errorf("failed to get peer from ctx")
+	}
+	if pr.Addr == net.Addr(nil) {
+		glog.Error("failed to get peer address")
+		return fmt.Errorf("failed to get peer address")
+	}
+
+	clientName := req.Name + pr.Addr.String()
+	glog.V(0).Infof("+ client %v", clientName)
+
+	messageChan := make(chan *master_pb.VolumeLocation)
+	stopChan := make(chan bool)
+
+	ms.clientChansLock.Lock()
+	ms.clientChans[clientName] = messageChan
+	ms.clientChansLock.Unlock()
+
+	defer func() {
+		glog.V(0).Infof("- client %v", clientName)
+		ms.clientChansLock.Lock()
+		delete(ms.clientChans, clientName)
+		ms.clientChansLock.Unlock()
+	}()
+
+	go func() {
+		for {
+			_, err := stream.Recv()
+			if err != nil {
+				glog.V(2).Infof("- client %v: %v", clientName, err)
+				stopChan <- true
+				break
+			}
 		}
-		if err := stream.Send(&master_pb.Empty{}); err != nil {
-			return err
+	}()
+
+	for {
+		select {
+		case message := <-messageChan:
+			if err := stream.Send(message); err != nil {
+				return err
+			}
+		case <-stopChan:
+			return nil
 		}
 	}
+
+	return nil
 }
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index efa0ae104..5b6bc8509 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -14,6 +14,7 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"github.com/chrislusf/seaweedfs/weed/util"
 	"github.com/gorilla/mux"
+	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 )
 
 type MasterServer struct {
@@ -31,6 +32,10 @@ type MasterServer struct {
 	vgLock sync.Mutex
 
 	bounedLeaderChan chan int
+
+	// notifying clients
+	clientChansLock sync.RWMutex
+	clientChans     map[string]chan *master_pb.VolumeLocation
 }
 
 func NewMasterServer(r *mux.Router, port int, metaFolder string,
@@ -54,6 +59,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
 		pulseSeconds:            pulseSeconds,
 		defaultReplicaPlacement: defaultReplicaPlacement,
 		garbageThreshold:        garbageThreshold,
+		clientChans:             make(map[string]chan *master_pb.VolumeLocation),
 	}
 	ms.bounedLeaderChan = make(chan int, 16)
 	seq := sequence.NewMemorySequencer()
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 0ef8ae14e..6ee9a8a03 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -32,7 +32,7 @@ func (dn *DataNode) String() string {
 	return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
 }
 
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
 	dn.Lock()
 	defer dn.Unlock()
 	if _, ok := dn.volumes[v.Id]; !ok {
@@ -42,12 +42,14 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
 			dn.UpAdjustActiveVolumeCountDelta(1)
 		}
 		dn.UpAdjustMaxVolumeId(v.Id)
+		isNew = true
 	} else {
 		dn.volumes[v.Id] = v
 	}
+	return
 }
 
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
 	actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
 	for _, v := range actualVolumes {
 		actualVolumeMap[v.Id] = v
@@ -64,7 +66,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
 	}
 	dn.Unlock()
 	for _, v := range actualVolumes {
-		dn.AddOrUpdateVolume(v)
+		isNew := dn.AddOrUpdateVolume(v)
+		if isNew {
+			newVolumes = append(newVolumes, v)
+		}
 	}
 	return
 }
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 177c2a181..4242bfa05 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -151,7 +151,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
 	return dc
 }
 
-func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) {
+func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
 	var volumeInfos []storage.VolumeInfo
 	for _, v := range volumes {
 		if vi, err := storage.NewVolumeInfo(v); err == nil {
@@ -160,11 +160,12 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
 			glog.V(0).Infof("Fail to convert joined volume information: %v", err)
 		}
 	}
-	deletedVolumes := dn.UpdateVolumes(volumeInfos)
+	newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
 	for _, v := range volumeInfos {
 		t.RegisterVolumeLayout(v, dn)
 	}
 	for _, v := range deletedVolumes {
 		t.UnRegisterVolumeLayout(v, dn)
 	}
+	return
 }

From e8d4be579de16f334a13edd70afcbe95c60f6703 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 01:17:35 -0700
Subject: [PATCH 03/11] send initial list of volume location

---
 weed/server/master_grpc_server.go |  6 ++++++
 weed/topology/topology_map.go     | 23 +++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index c12938374..815bcfba3 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -143,6 +143,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
 		ms.clientChansLock.Unlock()
 	}()
 
+	for _, message := range ms.Topo.ToVolumeLocations() {
+		if err := stream.Send(message); err != nil {
+			return err
+		}
+	}
+
 	go func() {
 		for {
 			_, err := stream.Recv()
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index ce8e9e663..769ba0e2a 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -1,5 +1,7 @@
 package topology
 
+import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+
 func (t *Topology) ToMap() interface{} {
 	m := make(map[string]interface{})
 	m["Max"] = t.GetMaxVolumeCount()
@@ -51,3 +53,24 @@ func (t *Topology) ToVolumeMap() interface{} {
 	m["DataCenters"] = dcs
 	return m
 }
+
+func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocation) {
+	for _, c := range t.Children() {
+		dc := c.(*DataCenter)
+		for _, r := range dc.Children() {
+			rack := r.(*Rack)
+			for _, d := range rack.Children() {
+				dn := d.(*DataNode)
+				volumeLocation := &master_pb.VolumeLocation{
+					Url:       dn.Url(),
+					PublicUrl: dn.PublicUrl,
+				}
+				for _, v := range dn.GetVolumes() {
+					volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id))
+				}
+				volumeLocations = append(volumeLocations, volumeLocation)
+			}
+		}
+	}
+	return
+}

From 1ab8232b55e4572a4b456f02854640f74acbceef Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 01:30:03 -0700
Subject: [PATCH 04/11] filer only talk to the master leader

---
 weed/server/master_grpc_server.go | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 815bcfba3..53e4b59e1 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -9,6 +9,7 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"google.golang.org/grpc/peer"
 	"fmt"
+	"github.com/chrislusf/raft"
 )
 
 func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -105,7 +106,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
 	}
 }
 
-// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
+// KeepConnected keep a stream gRPC call to the master. Used by clients to know the master is up.
+// And clients gets the up-to-date list of volume locations
 func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
 
 	req, err := stream.Recv()
@@ -113,6 +115,10 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
 		return err
 	}
 
+	if !ms.Topo.IsLeader() {
+		return raft.NotLeaderError
+	}
+
 	// remember client address
 	ctx := stream.Context()
 	// fmt.Printf("FromContext %+v\n", ctx)

From 01bcc89803b5caefe6d1809d4a85bc8a1d19918e Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 02:10:32 -0700
Subject: [PATCH 05/11] refactor into MasterClient

---
 weed/filer2/filer.go                          | 16 +++-
 .../masterclient.go}                          | 42 ++++++---
 .../topolisenter/client_grpc_to_master.go     | 85 -------------------
 .../wdclient/topolisenter/cluster_listener.go | 56 ------------
 weed/wdclient/wdclient.go                     | 37 ++------
 5 files changed, 48 insertions(+), 188 deletions(-)
 rename weed/{filer2/filer_master.go => wdclient/masterclient.go} (61%)
 delete mode 100644 weed/wdclient/topolisenter/client_grpc_to_master.go
 delete mode 100644 weed/wdclient/topolisenter/cluster_listener.go

diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 2deb8ffd5..f5c2849fe 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -11,20 +11,20 @@ import (
 	"path/filepath"
 	"strings"
 	"time"
+	"github.com/chrislusf/seaweedfs/weed/wdclient"
+	"context"
 )
 
 type Filer struct {
-	masters        []string
 	store          FilerStore
 	directoryCache *ccache.Cache
-
-	currentMaster string
+	masterClient   *wdclient.MasterClient
 }
 
 func NewFiler(masters []string) *Filer {
 	return &Filer{
-		masters:        masters,
 		directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+		masterClient:   wdclient.NewMasterClient(context.Background(), "filer", masters),
 	}
 }
 
@@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() {
 	f.directoryCache = nil
 }
 
+func (fs *Filer) GetMaster() string {
+	return fs.masterClient.GetMaster()
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+	fs.masterClient.KeepConnectedToMaster()
+}
+
 func (f *Filer) CreateEntry(entry *Entry) error {
 
 	dirParts := strings.Split(string(entry.FullPath), "/")
diff --git a/weed/filer2/filer_master.go b/weed/wdclient/masterclient.go
similarity index 61%
rename from weed/filer2/filer_master.go
rename to weed/wdclient/masterclient.go
index bbac17940..fb634d0f0 100644
--- a/weed/filer2/filer_master.go
+++ b/weed/wdclient/masterclient.go
@@ -1,28 +1,44 @@
-package filer2
+package wdclient
 
 import (
 	"context"
+	"time"
 	"fmt"
-	"github.com/chrislusf/seaweedfs/weed/glog"
+
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
-	"time"
+	"github.com/chrislusf/seaweedfs/weed/glog"
 )
 
-func (fs *Filer) GetMaster() string {
-	return fs.currentMaster
+type MasterClient struct {
+	ctx           context.Context
+	name          string
+	currentMaster string
+	masters       []string
+}
+
+func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
+	return &MasterClient{
+		ctx:     ctx,
+		name:    clientName,
+		masters: masters,
+	}
+}
+
+func (mc *MasterClient) GetMaster() string {
+	return mc.currentMaster
 }
 
-func (fs *Filer) KeepConnectedToMaster() {
-	glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
+func (mc *MasterClient) KeepConnectedToMaster() {
+	glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
 	for {
-		fs.tryAllMasters()
+		mc.tryAllMasters()
 		time.Sleep(time.Second)
 	}
 }
 
-func (fs *Filer) tryAllMasters() {
-	for _, master := range fs.masters {
+func (mc *MasterClient) tryAllMasters() {
+	for _, master := range mc.masters {
 		glog.V(0).Infof("Connecting to %v", master)
 		withMasterClient(master, func(client master_pb.SeaweedClient) error {
 			stream, err := client.KeepConnected(context.Background())
@@ -32,9 +48,9 @@ func (fs *Filer) tryAllMasters() {
 			}
 
 			glog.V(0).Infof("Connected to %v", master)
-			fs.currentMaster = master
+			mc.currentMaster = master
 
-			if err = stream.Send(&master_pb.ClientListenRequest{Name: "filer"}); err != nil {
+			if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
 				glog.V(0).Infof("failed to send to %s: %v", master, err)
 				return err
 			}
@@ -48,7 +64,7 @@ func (fs *Filer) tryAllMasters() {
 				}
 			}
 		})
-		fs.currentMaster = ""
+		mc.currentMaster = ""
 	}
 }
 
diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go
deleted file mode 100644
index 863f79a1d..000000000
--- a/weed/wdclient/topolisenter/client_grpc_to_master.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package clusterlistener
-
-import (
-	"context"
-	"fmt"
-	"io"
-
-	"google.golang.org/grpc"
-
-	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
-	"github.com/golang/glog"
-)
-
-func (clusterListener *ClusterListener) establishConnectionWithMaster(
-	master string, msgChan chan *pb.ClusterStatusMessage) error {
-	grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
-	if err != nil {
-		return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err)
-	}
-	defer func() { _ = grpcConnection.Close() }()
-
-	masterClient := pb.NewAlpineMasterClient(grpcConnection)
-
-	stream, err := masterClient.RegisterClient(context.Background())
-	if err != nil {
-		return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err)
-	}
-
-	// TODO possible goroutine leaks if retry happens
-	go func() {
-		for keyspace := range clusterListener.clusters {
-			// glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter)
-			if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil {
-				// glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err)
-				return
-			}
-		}
-
-		for {
-			msg := <-clusterListener.keyspaceFollowMessageChan
-			if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil {
-				if msg.isUnfollow {
-					glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
-				} else {
-					glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
-				}
-				return
-			}
-		}
-
-	}()
-
-	// glog.V(2).Infof("Reporting allocated %v", as.allocatedResource)
-
-	// glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master)
-
-	for {
-		msg, err := stream.Recv()
-		if err == io.EOF {
-			// read done.
-			return nil
-		}
-		if err != nil {
-			return fmt.Errorf("client receive topology : %v", err)
-		}
-		msgChan <- msg
-		// glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg)
-	}
-
-}
-
-func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error {
-	clientHeartbeat := &pb.ClientHeartbeat{
-		ClientName: clientName,
-		ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{
-			Keyspace:   keyspace,
-			IsUnfollow: isUnfollow,
-		},
-	}
-
-	if err := stream.Send(clientHeartbeat); err != nil {
-		return fmt.Errorf("%s client send heartbeat: %v", clientName, err)
-	}
-	return nil
-}
diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go
deleted file mode 100644
index 91ca6fb6f..000000000
--- a/weed/wdclient/topolisenter/cluster_listener.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package clusterlistener
-
-import (
-	"context"
-	"sync"
-	"time"
-
-	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
-	"code.uber.internal/fraud/alpine/server/util"
-	"github.com/chrislusf/seaweedfs/weed/storage"
-)
-
-type Location struct {
-	Url       string
-	PublicUrl string
-}
-
-type ClusterListener struct {
-	sync.RWMutex
-	vid2locations map[storage.VolumeId][]*Location
-	clientName    string
-}
-
-func NewClusterListener(clientName string) *ClusterListener {
-	return &ClusterListener{
-		vid2locations: make(map[storage.VolumeId][]*Location),
-		clientName:    clientName,
-	}
-}
-
-// StartListener keeps the listener connected to the master.
-func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) {
-
-	clusterUpdatesChan := make(chan *pb.ClusterStatusMessage)
-
-	go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error {
-		return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan)
-	}, 2*time.Second)
-
-	go func() {
-		for {
-			select {
-			case msg := <-clusterUpdatesChan:
-				clusterListener.processClusterStatusMessage(msg)
-			}
-		}
-	}()
-
-	// println("client is connected to master", master, "data center", dataCenter)
-
-	return
-
-}
-
-func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) {
-}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
index cbe03359f..b16e239fb 100644
--- a/weed/wdclient/wdclient.go
+++ b/weed/wdclient/wdclient.go
@@ -2,41 +2,18 @@ package wdclient
 
 import (
 	"context"
-	"code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
 )
 
 type SeaweedClient struct {
-	ctx             context.Context
-	Master          string
-	ClientName      string
-	ClusterListener *clusterlistener.ClusterListener
+	ctx        context.Context
+	Master     string
+	ClientName string
 }
 
-// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes
-func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient {
-	c := &SeaweedClient{
-		ctx:             ctx,
-		Master:          master,
-		ClusterListener: clusterlistener.NewClusterListener(clientName),
-		ClientName:      clientName,
-	}
-	c.ClusterListener.StartListener(ctx, c.Master)
+func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
+	return &SeaweedClient{
+		ctx:        ctx,
+		ClientName: clientName,
 
-	conn, err := grpc.Dial(c.Master, grpc.WithInsecure())
-	if err != nil {
-		glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err)
 	}
-	c.MasterClient = pb.NewAlpineMasterClient(conn)
-
-	return c
-}
-
-// NewClusterClient create a lightweight client to access a specific cluster
-// TODO The call will block if the keyspace is not created in this data center.
-func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) {
-
-	return &ClusterClient{
-		keyspace: keyspace,
-	}
-
 }

From 1d779389cbf9e5cf19de5ceba136d862c49b9d8f Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 14:22:46 -0700
Subject: [PATCH 06/11] MasterClient replicates all vid locations

---
 weed/filer2/filer.go             |  8 ++---
 weed/server/filer_grpc_server.go | 16 ++++-----
 weed/wdclient/masterclient.go    | 12 +++++++
 weed/wdclient/vid_map.go         | 59 ++++++++++++++++++++++++++++++++
 4 files changed, 83 insertions(+), 12 deletions(-)
 create mode 100644 weed/wdclient/vid_map.go

diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index f5c2849fe..e1c3fea84 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -18,13 +18,13 @@ import (
 type Filer struct {
 	store          FilerStore
 	directoryCache *ccache.Cache
-	masterClient   *wdclient.MasterClient
+	MasterClient   *wdclient.MasterClient
 }
 
 func NewFiler(masters []string) *Filer {
 	return &Filer{
 		directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
-		masterClient:   wdclient.NewMasterClient(context.Background(), "filer", masters),
+		MasterClient:   wdclient.NewMasterClient(context.Background(), "filer", masters),
 	}
 }
 
@@ -37,11 +37,11 @@ func (f *Filer) DisableDirectoryCache() {
 }
 
 func (fs *Filer) GetMaster() string {
-	return fs.masterClient.GetMaster()
+	return fs.MasterClient.GetMaster()
 }
 
 func (fs *Filer) KeepConnectedToMaster() {
-	fs.masterClient.KeepConnectedToMaster()
+	fs.MasterClient.KeepConnectedToMaster()
 }
 
 func (f *Filer) CreateEntry(entry *Entry) error {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b2f2d7a2d..f66a6eca8 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -99,24 +99,24 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get
 
 func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
 
-	lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
-	if err != nil {
-		return nil, err
-	}
-
 	resp := &filer_pb.LookupVolumeResponse{
 		LocationsMap: make(map[string]*filer_pb.Locations),
 	}
 
-	for vid, locations := range lookupResult {
+	for _, vidString := range req.VolumeIds {
+		vid, err := strconv.Atoi(vidString)
+		if err != nil {
+			glog.V(1).Infof("Unknown volume id %s", vid)
+			return nil, err
+		}
 		var locs []*filer_pb.Location
-		for _, loc := range locations.Locations {
+		for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
 			locs = append(locs, &filer_pb.Location{
 				Url:       loc.Url,
 				PublicUrl: loc.PublicUrl,
 			})
 		}
-		resp.LocationsMap[vid] = &filer_pb.Locations{
+		resp.LocationsMap[vidString] = &filer_pb.Locations{
 			Locations: locs,
 		}
 	}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index fb634d0f0..13383c9f1 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -15,6 +15,8 @@ type MasterClient struct {
 	name          string
 	currentMaster string
 	masters       []string
+
+	VidMap
 }
 
 func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
@@ -61,6 +63,16 @@ func (mc *MasterClient) tryAllMasters() {
 					return err
 				} else {
 					glog.V(0).Infof("volume location: %+v", volumeLocation)
+					loc := Location{
+						Url:       volumeLocation.Url,
+						PublicUrl: volumeLocation.PublicUrl,
+					}
+					for _, newVid := range volumeLocation.NewVids {
+						mc.AddLocation(newVid, loc)
+					}
+					for _, deletedVid := range volumeLocation.DeletedVids {
+						mc.DeleteLocation(deletedVid, loc)
+					}
 				}
 			}
 		})
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
new file mode 100644
index 000000000..0eb5ee5d0
--- /dev/null
+++ b/weed/wdclient/vid_map.go
@@ -0,0 +1,59 @@
+package wdclient
+
+import (
+	"sync"
+)
+
+type Location struct {
+	Url       string `json:"url,omitempty"`
+	PublicUrl string `json:"publicUrl,omitempty"`
+}
+
+type VidMap struct {
+	sync.RWMutex
+	vid2Locations map[uint32][]Location
+}
+
+func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
+	vc.RLock()
+	defer vc.RUnlock()
+
+	return vc.vid2Locations[vid]
+}
+
+func (vc *VidMap) AddLocation(vid uint32, location Location) {
+	vc.Lock()
+	defer vc.Unlock()
+
+	locations, found := vc.vid2Locations[vid]
+	if !found {
+		vc.vid2Locations[vid] = []Location{location}
+		return
+	}
+
+	for _, loc := range locations {
+		if loc.Url == location.Url {
+			return
+		}
+	}
+
+	vc.vid2Locations[vid] = append(locations, location)
+
+}
+
+func (vc *VidMap) DeleteLocation(vid uint32, location Location) {
+	vc.Lock()
+	defer vc.Unlock()
+
+	locations, found := vc.vid2Locations[vid]
+	if !found {
+		return
+	}
+
+	for i, loc := range locations {
+		if loc.Url == location.Url {
+			vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
+		}
+	}
+
+}

From 888eb2abb58413d52eb0a053f3bd7f94149f8f49 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 14:51:36 -0700
Subject: [PATCH 07/11] filer read write all via locations from MasterClient

---
 weed/filer2/filer.go                       | 14 +++++++--
 weed/operation/delete_content.go           |  8 ++++++
 weed/server/filer_grpc_server.go           |  4 +--
 weed/server/filer_server_handlers_read.go  |  5 ++--
 weed/server/filer_server_handlers_write.go |  6 ++--
 weed/wdclient/vid_map.go                   | 33 ++++++++++++++++++++++
 6 files changed, 59 insertions(+), 11 deletions(-)

diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index e1c3fea84..100117e6a 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -206,9 +206,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
 
 func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) {
 	for _, chunk := range chunks {
-		if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
-			glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
-		}
+		f.DeleteFileByFileId(chunk.FileId)
+	}
+}
+
+func (f *Filer) DeleteFileByFileId(fileId string) {
+	fileUrlOnVolume, err := f.MasterClient.LookupFileId(fileId)
+	if err != nil {
+		glog.V(0).Infof("can not find file %s: %v", fileId, err)
+	}
+	if err := operation.DeleteFromVolumeServer(fileUrlOnVolume, ""); err != nil {
+		glog.V(0).Infof("deleting file %s: %v", fileId, err)
 	}
 }
 
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index b78221da1..2d7e71c6e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -21,6 +21,14 @@ type DeleteResult struct {
 	Error  string `json:"error,omitempty"`
 }
 
+func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error {
+	err = util.Delete(fileUrlOnVolume, jwt)
+	if err != nil {
+		return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err)
+	}
+	return nil
+}
+
 func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
 	fileUrl, err := LookupFileId(master, fileId)
 	if err != nil {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index f66a6eca8..0155eeccf 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -176,11 +176,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
 	if err = fs.filer.UpdateEntry(newEntry); err == nil {
 		for _, garbage := range unusedChunks {
 			glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
-			operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
+			fs.filer.DeleteFileByFileId(garbage.FileId)
 		}
 		for _, garbage := range garbages {
 			glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
-			operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
+			fs.filer.DeleteFileByFileId(garbage.FileId)
 		}
 	}
 
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 4e20be5da..e17cd776d 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -8,7 +8,6 @@ import (
 
 	"github.com/chrislusf/seaweedfs/weed/filer2"
 	"github.com/chrislusf/seaweedfs/weed/glog"
-	"github.com/chrislusf/seaweedfs/weed/operation"
 	"github.com/chrislusf/seaweedfs/weed/util"
 	"mime"
 	"mime/multipart"
@@ -63,7 +62,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
 
 	fileId := entry.Chunks[0].FileId
 
-	urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId)
+	urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
 	if err != nil {
 		glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
 		w.WriteHeader(http.StatusNotFound)
@@ -223,7 +222,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int
 
 	for _, chunkView := range chunkViews {
 
-		urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId)
+		urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId)
 		if err != nil {
 			glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
 			return err
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index c1b7e3826..8a19f3fdb 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -49,7 +49,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
 		w.WriteHeader(http.StatusNoContent)
 	} else {
 		fileId = entry.Chunks[0].FileId
-		urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId)
+		urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId)
 		if err != nil {
 			glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
 			w.WriteHeader(http.StatusNotFound)
@@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
 		if ret.Name != "" {
 			path += ret.Name
 		} else {
-			operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
+			fs.filer.DeleteFileByFileId(fileId)
 			glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
 			writeJsonError(w, r, http.StatusInternalServerError,
 				errors.New("Can not to write to folder "+path+" without a file name"))
@@ -205,7 +205,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
 		}},
 	}
 	if db_err := fs.filer.CreateEntry(entry); db_err != nil {
-		operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
+		fs.filer.DeleteFileByFileId(fileId)
 		glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
 		writeJsonError(w, r, http.StatusInternalServerError, db_err)
 		return
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 0eb5ee5d0..93884c53d 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -2,6 +2,12 @@ package wdclient
 
 import (
 	"sync"
+	"strings"
+	"math/rand"
+	"errors"
+	"strconv"
+	"github.com/chrislusf/seaweedfs/weed/glog"
+	"fmt"
 )
 
 type Location struct {
@@ -14,6 +20,33 @@ type VidMap struct {
 	vid2Locations map[uint32][]Location
 }
 
+func (vc *VidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+	id, err := strconv.Atoi(vid)
+	if err != nil {
+		glog.V(1).Infof("Unknown volume id %s", vid)
+		return "", err
+	}
+
+	locations := vc.GetLocations(uint32(id))
+	if len(locations) == 0 {
+		return "", fmt.Errorf("volume %d not found", id)
+	}
+
+	return locations[rand.Intn(len(locations))].Url, nil
+}
+
+func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) {
+	parts := strings.Split(fileId, ",")
+	if len(parts) != 2 {
+		return "", errors.New("Invalid fileId " + fileId)
+	}
+	serverUrl, lookupError := LookupVolumeServerUrl(parts[0])
+	if lookupError != nil {
+		return "", lookupError
+	}
+	return "http://" + serverUrl + "/" + fileId, nil
+}
+
 func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
 	vc.RLock()
 	defer vc.RUnlock()

From 4e0522a80ca5933f9a4c89b274fcef65e4bef4d9 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 18:17:31 -0700
Subject: [PATCH 08/11] adjust API

---
 weed/wdclient/wdclient.go | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
index b16e239fb..722f4d061 100644
--- a/weed/wdclient/wdclient.go
+++ b/weed/wdclient/wdclient.go
@@ -5,15 +5,11 @@ import (
 )
 
 type SeaweedClient struct {
-	ctx        context.Context
-	Master     string
-	ClientName string
+	*MasterClient
 }
 
 func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
 	return &SeaweedClient{
-		ctx:        ctx,
-		ClientName: clientName,
-
+		MasterClient: NewMasterClient(ctx, clientName, masters),
 	}
 }

From cfbfc7cb67db224bdf6b0b533162d16039245f52 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 18:34:15 -0700
Subject: [PATCH 09/11] fix compilation error

---
 weed/operation/delete_content.go | 2 +-
 weed/wdclient/masterclient.go    | 4 ++--
 weed/wdclient/vid_map.go         | 6 +++---
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 2d7e71c6e..41f98e4d4 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -22,7 +22,7 @@ type DeleteResult struct {
 }
 
 func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error {
-	err = util.Delete(fileUrlOnVolume, jwt)
+	err := util.Delete(fileUrlOnVolume, jwt)
 	if err != nil {
 		return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err)
 	}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 13383c9f1..d2d5954e4 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -68,10 +68,10 @@ func (mc *MasterClient) tryAllMasters() {
 						PublicUrl: volumeLocation.PublicUrl,
 					}
 					for _, newVid := range volumeLocation.NewVids {
-						mc.AddLocation(newVid, loc)
+						mc.addLocation(newVid, loc)
 					}
 					for _, deletedVid := range volumeLocation.DeletedVids {
-						mc.DeleteLocation(deletedVid, loc)
+						mc.deleteLocation(deletedVid, loc)
 					}
 				}
 			}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 93884c53d..7299606af 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -40,7 +40,7 @@ func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) {
 	if len(parts) != 2 {
 		return "", errors.New("Invalid fileId " + fileId)
 	}
-	serverUrl, lookupError := LookupVolumeServerUrl(parts[0])
+	serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
 	if lookupError != nil {
 		return "", lookupError
 	}
@@ -54,7 +54,7 @@ func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
 	return vc.vid2Locations[vid]
 }
 
-func (vc *VidMap) AddLocation(vid uint32, location Location) {
+func (vc *VidMap) addLocation(vid uint32, location Location) {
 	vc.Lock()
 	defer vc.Unlock()
 
@@ -74,7 +74,7 @@ func (vc *VidMap) AddLocation(vid uint32, location Location) {
 
 }
 
-func (vc *VidMap) DeleteLocation(vid uint32, location Location) {
+func (vc *VidMap) deleteLocation(vid uint32, location Location) {
 	vc.Lock()
 	defer vc.Unlock()
 

From 7214a8e265544c1e507351789409f75abe485acc Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 18:40:31 -0700
Subject: [PATCH 10/11] fix init error

---
 weed/wdclient/masterclient.go |  3 ++-
 weed/wdclient/vid_map.go      | 18 ++++++++++++------
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index d2d5954e4..e3ffd4834 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -16,7 +16,7 @@ type MasterClient struct {
 	currentMaster string
 	masters       []string
 
-	VidMap
+	vidMap
 }
 
 func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
@@ -24,6 +24,7 @@ func NewMasterClient(ctx context.Context, clientName string, masters []string) *
 		ctx:     ctx,
 		name:    clientName,
 		masters: masters,
+		vidMap:  newVidMap(),
 	}
 }
 
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 7299606af..0b0cba178 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -15,12 +15,18 @@ type Location struct {
 	PublicUrl string `json:"publicUrl,omitempty"`
 }
 
-type VidMap struct {
+type vidMap struct {
 	sync.RWMutex
 	vid2Locations map[uint32][]Location
 }
 
-func (vc *VidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+func newVidMap() vidMap {
+	return vidMap{
+		vid2Locations: make(map[uint32][]Location),
+	}
+}
+
+func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
 	id, err := strconv.Atoi(vid)
 	if err != nil {
 		glog.V(1).Infof("Unknown volume id %s", vid)
@@ -35,7 +41,7 @@ func (vc *VidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error
 	return locations[rand.Intn(len(locations))].Url, nil
 }
 
-func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) {
+func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
 	parts := strings.Split(fileId, ",")
 	if len(parts) != 2 {
 		return "", errors.New("Invalid fileId " + fileId)
@@ -47,14 +53,14 @@ func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) {
 	return "http://" + serverUrl + "/" + fileId, nil
 }
 
-func (vc *VidMap) GetLocations(vid uint32) (locations []Location) {
+func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
 	vc.RLock()
 	defer vc.RUnlock()
 
 	return vc.vid2Locations[vid]
 }
 
-func (vc *VidMap) addLocation(vid uint32, location Location) {
+func (vc *vidMap) addLocation(vid uint32, location Location) {
 	vc.Lock()
 	defer vc.Unlock()
 
@@ -74,7 +80,7 @@ func (vc *VidMap) addLocation(vid uint32, location Location) {
 
 }
 
-func (vc *VidMap) deleteLocation(vid uint32, location Location) {
+func (vc *vidMap) deleteLocation(vid uint32, location Location) {
 	vc.Lock()
 	defer vc.Unlock()
 

From d3205a007071f26587affb416f71b5c63854b863 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sat, 28 Jul 2018 21:02:56 -0700
Subject: [PATCH 11/11] go fmt

---
 weed/filer2/filer.go              | 12 ++++++------
 weed/server/master_grpc_server.go |  4 ++--
 weed/server/master_server.go      |  2 +-
 weed/wdclient/masterclient.go     |  4 ++--
 weed/wdclient/vid_map.go          |  9 +++++----
 5 files changed, 16 insertions(+), 15 deletions(-)

diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 100117e6a..1f2697cda 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -1,18 +1,18 @@
 package filer2
 
 import (
+	"context"
 	"fmt"
-
-	"github.com/chrislusf/seaweedfs/weed/glog"
-	"github.com/chrislusf/seaweedfs/weed/operation"
-	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-	"github.com/karlseguin/ccache"
 	"os"
 	"path/filepath"
 	"strings"
 	"time"
+
+	"github.com/chrislusf/seaweedfs/weed/glog"
+	"github.com/chrislusf/seaweedfs/weed/operation"
+	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
 	"github.com/chrislusf/seaweedfs/weed/wdclient"
-	"context"
+	"github.com/karlseguin/ccache"
 )
 
 type Filer struct {
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 53e4b59e1..2952a8071 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -1,15 +1,15 @@
 package weed_server
 
 import (
+	"fmt"
 	"net"
 	"strings"
 
+	"github.com/chrislusf/raft"
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"google.golang.org/grpc/peer"
-	"fmt"
-	"github.com/chrislusf/raft"
 )
 
 func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 5b6bc8509..07a398ead 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -9,12 +9,12 @@ import (
 
 	"github.com/chrislusf/raft"
 	"github.com/chrislusf/seaweedfs/weed/glog"
+	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/security"
 	"github.com/chrislusf/seaweedfs/weed/sequence"
 	"github.com/chrislusf/seaweedfs/weed/topology"
 	"github.com/chrislusf/seaweedfs/weed/util"
 	"github.com/gorilla/mux"
-	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 )
 
 type MasterServer struct {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index e3ffd4834..347901f1a 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -2,12 +2,12 @@ package wdclient
 
 import (
 	"context"
-	"time"
 	"fmt"
+	"time"
 
+	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
-	"github.com/chrislusf/seaweedfs/weed/glog"
 )
 
 type MasterClient struct {
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 0b0cba178..2f56b8fce 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -1,13 +1,14 @@
 package wdclient
 
 import (
-	"sync"
-	"strings"
-	"math/rand"
 	"errors"
+	"fmt"
+	"math/rand"
 	"strconv"
+	"strings"
+	"sync"
+
 	"github.com/chrislusf/seaweedfs/weed/glog"
-	"fmt"
 )
 
 type Location struct {