From 7b4a53b2c1195a459198e2ee9217b1b8b958e727 Mon Sep 17 00:00:00 2001
From: Chris Lu <chris.lu@gmail.com>
Date: Sun, 18 Jan 2015 17:03:38 -0800
Subject: [PATCH] Add optional admin port to volume server, to seperate admin
 operations from normal file operations.

---
 go/operation/system_message.pb.go    | 10 +++++++-
 go/proto/system_message.proto        |  1 +
 go/storage/store.go                  |  8 ++++---
 go/topology/allocate_volume.go       |  2 +-
 go/topology/data_node.go             |  7 +++++-
 go/topology/rack.go                  |  3 ++-
 go/topology/topology.go              |  8 ++++++-
 go/topology/topology_vacuum.go       | 12 +++++-----
 go/topology/volume_growth.go         |  2 +-
 go/weed/server.go                    |  5 +++-
 go/weed/volume.go                    | 34 +++++++++++++++++++++++-----
 go/weed/weed_server/volume_server.go |  6 +++--
 12 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go
index 6f0f974c5..32e6a6640 100644
--- a/go/operation/system_message.pb.go
+++ b/go/operation/system_message.pb.go
@@ -14,7 +14,7 @@ It has these top-level messages:
 */
 package operation
 
-import "github.com/golang/protobuf/proto"
+import proto "code.google.com/p/goprotobuf/proto"
 import math "math"
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -121,6 +121,7 @@ type JoinMessage struct {
 	DataCenter       *string                     `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
 	Rack             *string                     `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
 	Volumes          []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
+	AdminPort        *uint32                     `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
 	XXX_unrecognized []byte                      `json:"-"`
 }
 
@@ -191,5 +192,12 @@ func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
 	return nil
 }
 
+func (m *JoinMessage) GetAdminPort() uint32 {
+	if m != nil && m.AdminPort != nil {
+		return *m.AdminPort
+	}
+	return 0
+}
+
 func init() {
 }
diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto
index ecd4973f7..548360b27 100644
--- a/go/proto/system_message.proto
+++ b/go/proto/system_message.proto
@@ -23,4 +23,5 @@ message JoinMessage {
   optional string data_center = 7;
   optional string rack = 8;
   repeated VolumeInformationMessage volumes = 9;
+  optional uint32 admin_port = 10;
 }
diff --git a/go/storage/store.go b/go/storage/store.go
index 7e2b23058..2c4434b81 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -73,8 +73,9 @@ func (mn *MasterNodes) findMaster() (string, error) {
  * A VolumeServer contains one Store
  */
 type Store struct {
-	Port            int
 	Ip              string
+	Port            int
+	AdminPort       int
 	PublicUrl       string
 	Locations       []*DiskLocation
 	dataCenter      string //optional informaton, overwriting master setting if exists
@@ -89,8 +90,8 @@ func (s *Store) String() (str string) {
 	return
 }
 
-func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
-	s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
+func NewStore(port, adminPort int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
+	s = &Store{Port: port, AdminPort: adminPort, Ip: ip, PublicUrl: publicUrl}
 	s.Locations = make([]*DiskLocation, 0)
 	for i := 0; i < len(dirnames); i++ {
 		location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
@@ -308,6 +309,7 @@ func (s *Store) Join() (masterNode string, e error) {
 		DataCenter:     proto.String(s.dataCenter),
 		Rack:           proto.String(s.rack),
 		Volumes:        volumeMessages,
+		AdminPort:      proto.Uint32(uint32(s.AdminPort)),
 	}
 
 	data, err := proto.Marshal(joinMessage)
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
index a791b4c1c..669177c09 100644
--- a/go/topology/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -19,7 +19,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
 	values.Add("collection", option.Collection)
 	values.Add("replication", option.ReplicaPlacement.String())
 	values.Add("ttl", option.Ttl.String())
-	jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
+	jsonBlob, err := util.Post("http://"+dn.AdminUrl()+"/admin/assign_volume", values)
 	if err != nil {
 		return err
 	}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 09b9fac6c..2d0a093cc 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -13,6 +13,7 @@ type DataNode struct {
 	volumes   map[storage.VolumeId]storage.VolumeInfo
 	Ip        string
 	Port      int
+	AdminPort int
 	PublicUrl string
 	LastSeen  int64 // unix time in seconds
 	Dead      bool
@@ -28,7 +29,7 @@ func NewDataNode(id string) *DataNode {
 }
 
 func (dn *DataNode) String() string {
-	return fmt.Sprintf("NodeImpl:%s ,volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
+	return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
 }
 
 func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
@@ -89,6 +90,10 @@ func (dn *DataNode) Url() string {
 	return dn.Ip + ":" + strconv.Itoa(dn.Port)
 }
 
+func (dn *DataNode) AdminUrl() string {
+	return dn.Ip + ":" + strconv.Itoa(dn.AdminPort)
+}
+
 func (dn *DataNode) ToMap() interface{} {
 	ret := make(map[string]interface{})
 	ret["Url"] = dn.Url()
diff --git a/go/topology/rack.go b/go/topology/rack.go
index 40e19dd0d..50ad5f009 100644
--- a/go/topology/rack.go
+++ b/go/topology/rack.go
@@ -27,7 +27,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
 	}
 	return nil
 }
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port, adminPort int, publicUrl string, maxVolumeCount int) *DataNode {
 	for _, c := range r.Children() {
 		dn := c.(*DataNode)
 		if dn.MatchLocation(ip, port) {
@@ -43,6 +43,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
 	dn := NewDataNode(ip + ":" + strconv.Itoa(port))
 	dn.Ip = ip
 	dn.Port = port
+	dn.AdminPort = adminPort
 	dn.PublicUrl = publicUrl
 	dn.maxVolumeCount = maxVolumeCount
 	dn.LastSeen = time.Now().Unix()
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 4cfd070db..e1f35f7a3 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -157,7 +157,13 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
 	if *joinMessage.IsInit && dn != nil {
 		t.UnRegisterDataNode(dn)
 	}
-	dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount))
+	adminPort = *joinMessage.Port
+	if joinMessage.AdminPort != nil {
+		adminPort = *joinMessage.AdminPort
+	}
+	dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
+		int(*joinMessage.Port), int(adminPort), *joinMessage.PublicUrl,
+		int(*joinMessage.MaxVolumeCount))
 	var volumeInfos []storage.VolumeInfo
 	for _, v := range joinMessage.Volumes {
 		if vi, err := storage.NewVolumeInfo(v); err == nil {
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index d6fa2213e..0aeb9f132 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -23,7 +23,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
 				//glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
 				ch <- ret
 			}
-		}(index, dn.Url(), vid)
+		}(index, dn.AdminUrl(), vid)
 	}
 	isCheckSuccess := true
 	for _ = range locationlist.list {
@@ -50,7 +50,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
 				glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
 				ch <- true
 			}
-		}(index, dn.Url(), vid)
+		}(index, dn.AdminUrl(), vid)
 	}
 	isVacuumSuccess := true
 	for _ = range locationlist.list {
@@ -66,12 +66,12 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
 func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
 	isCommitSuccess := true
 	for _, dn := range locationlist.list {
-		glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
-		if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
-			glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
+		glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.AdminUrl())
+		if e := vacuumVolume_Commit(dn.AdminUrl(), vid); e != nil {
+			glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.AdminUrl(), e)
 			isCommitSuccess = false
 		} else {
-			glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
+			glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.AdminUrl())
 		}
 		if isCommitSuccess {
 			vl.SetVolumeAvailable(dn, vid)
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
index 3ad6ad757..9de0eb19f 100644
--- a/go/topology/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -201,7 +201,7 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum
 			}
 			server.AddOrUpdateVolume(vi)
 			topo.RegisterVolumeLayout(vi, server)
-			glog.V(0).Infoln("Created Volume", vid, "on", server)
+			glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
 		} else {
 			glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
 			return fmt.Errorf("Failed to assign %d: %v", vid, err)
diff --git a/go/weed/server.go b/go/weed/server.go
index 980c545b4..16b809c53 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -64,6 +64,7 @@ var (
 	masterConfFile                = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
 	masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
 	volumePort                    = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
+	volumeAdminPort               = cmdServer.Flag.Int("volume.port.admin", 0, "volume server admin port to talk with master and other volume servers")
 	volumeDataFolders             = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
 	volumeMaxDataVolumeCounts     = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
 	volumePulse                   = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
@@ -225,7 +226,9 @@ func runServer(cmd *Command, args []string) bool {
 	volumeWait.Wait()
 	time.Sleep(100 * time.Millisecond)
 	r := http.NewServeMux()
-	volumeServer := weed_server.NewVolumeServer(r, r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
+	volumeServer := weed_server.NewVolumeServer(r, r,
+		*serverIp, *volumePort, *volumeAdminPort, *serverPublicIp,
+		folders, maxCounts,
 		*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
 		serverWhiteList, *volumeFixJpgOrientation,
 	)
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 1dfd88576..df1d603ac 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -38,7 +38,7 @@ type VolumeServerOptions struct {
 func init() {
 	cmdVolume.Run = runVolume // break init cycle
 	v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
-	v.adminPort = cmdVolume.Flag.Int("port.admin", 8443, "https admin port, active when SSL certs are specified. Not ready yet.")
+	v.adminPort = cmdVolume.Flag.Int("port.admin", 0, "admin port to talk with master and other volume servers")
 	v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
 	v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
 	v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
@@ -105,28 +105,50 @@ func runVolume(cmd *Command, args []string) bool {
 		}
 	}
 
-	r := http.NewServeMux()
+	if *v.adminPort == 0 {
+		*v.adminPort = *v.port
+	}
+	isSeperatedAdminPort := *v.adminPort != *v.port
+
+	publicMux := http.NewServeMux()
+	adminMux := publicMux
+	if isSeperatedAdminPort {
+		adminMux = http.NewServeMux()
+	}
 
-	volumeServer := weed_server.NewVolumeServer(r, r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits,
+	volumeServer := weed_server.NewVolumeServer(publicMux, adminMux,
+		*v.ip, *v.port, *v.adminPort, *v.publicIp,
+		v.folders, v.folderMaxLimits,
 		*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
 		v.whiteList,
 		*v.fixJpgOrientation,
 	)
 
 	listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
-
 	glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
-
 	listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
 	if e != nil {
 		glog.Fatalf("Volume server listener error:%v", e)
 	}
+	if isSeperatedAdminPort {
+		adminListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.adminPort)
+		glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "admin at", adminListeningAddress)
+		adminListener, e := util.NewListener(adminListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+		if e != nil {
+			glog.Fatalf("Volume server listener error:%v", e)
+		}
+		go func() {
+			if e := http.Serve(adminListener, adminMux); e != nil {
+				glog.Fatalf("Volume server fail to serve admin: %v", e)
+			}
+		}()
+	}
 
 	OnInterrupt(func() {
 		volumeServer.Shutdown()
 	})
 
-	if e := http.Serve(listener, r); e != nil {
+	if e := http.Serve(listener, publicMux); e != nil {
 		glog.Fatalf("Volume server fail to serve: %v", e)
 	}
 	return true
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index 0eb9daa0e..18856c554 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -22,7 +22,9 @@ type VolumeServer struct {
 	FixJpgOrientation bool
 }
 
-func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
+func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
+	port, adminPort int, publicIp string,
+	folders []string, maxCounts []int,
 	masterNode string, pulseSeconds int,
 	dataCenter string, rack string,
 	whiteList []string,
@@ -35,7 +37,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, pu
 		rack:              rack,
 		FixJpgOrientation: fixJpgOrientation,
 	}
-	vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
+	vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
 
 	vs.guard = security.NewGuard(whiteList, "")