Browse Source

Add read only public port on volume server

Add read only public port on volume server
pull/114/head
chrislusf 10 years ago
parent
commit
f511b507a5
  1. 3
      go/operation/assign_file_id.go
  2. 2
      go/operation/submit.go
  3. 6
      go/storage/store.go
  4. 2
      go/topology/allocate_volume.go
  5. 6
      go/topology/data_node.go
  6. 3
      go/topology/rack.go
  7. 6
      go/topology/topology.go
  8. 12
      go/topology/topology_vacuum.go
  9. 35
      go/weed/server.go
  10. 36
      go/weed/volume.go
  11. 9
      go/weed/weed_server/volume_server.go
  12. 25
      go/weed/weed_server/volume_server_handlers.go

3
go/operation/assign_file_id.go

@ -3,6 +3,7 @@ package operation
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net/url" "net/url"
"strconv" "strconv"
@ -38,7 +39,7 @@ func Assign(server string, count int, replication string, collection string, ttl
var ret AssignResult var ret AssignResult
err = json.Unmarshal(jsonBlob, &ret) err = json.Unmarshal(jsonBlob, &ret)
if err != nil { if err != nil {
return nil, err
return nil, fmt.Errorf("JSON unmarshal error:%v, json:%s", err, string(jsonBlob))
} }
if ret.Count <= 0 { if ret.Count <= 0 {
return nil, errors.New(ret.Error) return nil, errors.New(ret.Error)

2
go/operation/submit.go

@ -55,7 +55,7 @@ func SubmitFiles(master string, files []FilePart,
if index > 0 { if index > 0 {
file.Fid = file.Fid + "_" + strconv.Itoa(index) file.Fid = file.Fid + "_" + strconv.Itoa(index)
} }
file.Server = ret.PublicUrl
file.Server = ret.Url
file.Replication = replication file.Replication = replication
file.Collection = collection file.Collection = collection
results[index].Size, err = file.Upload(maxMB, master, secret) results[index].Size, err = file.Upload(maxMB, master, secret)

6
go/storage/store.go

@ -76,7 +76,6 @@ func (mn *MasterNodes) findMaster() (string, error) {
type Store struct { type Store struct {
Ip string Ip string
Port int Port int
AdminPort int
PublicUrl string PublicUrl string
Locations []*DiskLocation Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists dataCenter string //optional informaton, overwriting master setting if exists
@ -91,8 +90,8 @@ func (s *Store) String() (str string) {
return return
} }
func NewStore(port, adminPort int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
s = &Store{Port: port, AdminPort: adminPort, Ip: ip, PublicUrl: publicUrl}
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ { for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
@ -310,7 +309,6 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
DataCenter: proto.String(s.dataCenter), DataCenter: proto.String(s.dataCenter),
Rack: proto.String(s.rack), Rack: proto.String(s.rack),
Volumes: volumeMessages, Volumes: volumeMessages,
AdminPort: proto.Uint32(uint32(s.AdminPort)),
} }
data, err := proto.Marshal(joinMessage) data, err := proto.Marshal(joinMessage)

2
go/topology/allocate_volume.go

@ -20,7 +20,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values.Add("collection", option.Collection) values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String()) values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String()) values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.AdminUrl()+"/admin/assign_volume", values)
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil { if err != nil {
return err return err
} }

6
go/topology/data_node.go

@ -13,7 +13,6 @@ type DataNode struct {
volumes map[storage.VolumeId]storage.VolumeInfo volumes map[storage.VolumeId]storage.VolumeInfo
Ip string Ip string
Port int Port int
AdminPort int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds LastSeen int64 // unix time in seconds
Dead bool Dead bool
@ -90,10 +89,6 @@ func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port) return dn.Ip + ":" + strconv.Itoa(dn.Port)
} }
func (dn *DataNode) AdminUrl() string {
return dn.Ip + ":" + strconv.Itoa(dn.AdminPort)
}
func (dn *DataNode) ToMap() interface{} { func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{}) ret := make(map[string]interface{})
ret["Url"] = dn.Url() ret["Url"] = dn.Url()
@ -101,6 +96,5 @@ func (dn *DataNode) ToMap() interface{} {
ret["Max"] = dn.GetMaxVolumeCount() ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace() ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl ret["PublicUrl"] = dn.PublicUrl
ret["AdminUrl"] = dn.AdminUrl()
return ret return ret
} }

3
go/topology/rack.go

@ -27,7 +27,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
} }
return nil return nil
} }
func (r *Rack) GetOrCreateDataNode(ip string, port, adminPort int, publicUrl string, maxVolumeCount int) *DataNode {
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)
if dn.MatchLocation(ip, port) { if dn.MatchLocation(ip, port) {
@ -43,7 +43,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port, adminPort int, publicUrl str
dn := NewDataNode(ip + ":" + strconv.Itoa(port)) dn := NewDataNode(ip + ":" + strconv.Itoa(port))
dn.Ip = ip dn.Ip = ip
dn.Port = port dn.Port = port
dn.AdminPort = adminPort
dn.PublicUrl = publicUrl dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount dn.maxVolumeCount = maxVolumeCount
dn.LastSeen = time.Now().Unix() dn.LastSeen = time.Now().Unix()

6
go/topology/topology.go

@ -157,12 +157,8 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
if *joinMessage.IsInit && dn != nil { if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn) t.UnRegisterDataNode(dn)
} }
adminPort := *joinMessage.Port
if joinMessage.AdminPort != nil {
adminPort = *joinMessage.AdminPort
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip, dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), int(adminPort), *joinMessage.PublicUrl,
int(*joinMessage.Port), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount)) int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo var volumeInfos []storage.VolumeInfo
for _, v := range joinMessage.Volumes { for _, v := range joinMessage.Volumes {

12
go/topology/topology_vacuum.go

@ -23,7 +23,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
//glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
ch <- ret ch <- ret
} }
}(index, dn.AdminUrl(), vid)
}(index, dn.Url(), vid)
} }
isCheckSuccess := true isCheckSuccess := true
for _ = range locationlist.list { for _ = range locationlist.list {
@ -50,7 +50,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url) glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
ch <- true ch <- true
} }
}(index, dn.AdminUrl(), vid)
}(index, dn.Url(), vid)
} }
isVacuumSuccess := true isVacuumSuccess := true
for _ = range locationlist.list { for _ = range locationlist.list {
@ -66,12 +66,12 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true isCommitSuccess := true
for _, dn := range locationlist.list { for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.AdminUrl())
if e := vacuumVolume_Commit(dn.AdminUrl(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.AdminUrl(), e)
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
isCommitSuccess = false isCommitSuccess = false
} else { } else {
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.AdminUrl())
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
} }
if isCommitSuccess { if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid) vl.SetVolumeAvailable(dn, vid)

35
go/weed/server.go

@ -64,7 +64,7 @@ var (
masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file") masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
volumeAdminPort = cmdServer.Flag.Int("volume.port.admin", 0, "volume server admin port to talk with master and other volume servers")
volumePublicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
@ -109,8 +109,8 @@ func runServer(cmd *Command, args []string) bool {
*filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement
} }
if *volumeAdminPort == 0 {
*volumeAdminPort = *volumePort
if *volumePublicPort == 0 {
*volumePublicPort = *volumePort
} }
if *serverMaxCpu < 1 { if *serverMaxCpu < 1 {
@ -223,9 +223,17 @@ func runServer(cmd *Command, args []string) bool {
volumeWait.Wait() volumeWait.Wait()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, r,
*serverIp, *volumePort, *volumeAdminPort, *serverPublicUrl,
if *volumePublicPort == 0 {
*volumePublicPort = *volumePort
}
isSeperatedPublicPort := *volumePublicPort != *volumePort
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
if isSeperatedPublicPort {
publicVolumeMux = http.NewServeMux()
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *serverPublicUrl,
folders, maxCounts, folders, maxCounts,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, serverWhiteList, *volumeFixJpgOrientation,
@ -239,13 +247,26 @@ func runServer(cmd *Command, args []string) bool {
if eListen != nil { if eListen != nil {
glog.Fatalf("Volume server listener error: %v", eListen) glog.Fatalf("Volume server listener error: %v", eListen)
} }
if isSeperatedPublicPort {
publicListeningAddress := *serverIp + ":" + strconv.Itoa(*volumePublicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*serverTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
}
go func() {
if e := http.Serve(publicListener, publicVolumeMux); e != nil {
glog.Fatalf("Volume server fail to serve public: %v", e)
}
}()
}
OnInterrupt(func() { OnInterrupt(func() {
volumeServer.Shutdown() volumeServer.Shutdown()
pprof.StopCPUProfile() pprof.StopCPUProfile()
}) })
if e := http.Serve(volumeListener, r); e != nil {
if e := http.Serve(volumeListener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve:%v", e) glog.Fatalf("Volume server fail to serve:%v", e)
} }

36
go/weed/volume.go

@ -19,7 +19,7 @@ var (
type VolumeServerOptions struct { type VolumeServerOptions struct {
port *int port *int
adminPort *int
publicPort *int
folders []string folders []string
folderMaxLimits []int folderMaxLimits []int
ip *string ip *string
@ -38,7 +38,7 @@ type VolumeServerOptions struct {
func init() { func init() {
cmdVolume.Run = runVolume // break init cycle cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.adminPort = cmdVolume.Flag.Int("port.admin", 0, "admin port to talk with master and other volume servers")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
@ -102,19 +102,19 @@ func runVolume(cmd *Command, args []string) bool {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.port) *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.port)
} }
if *v.adminPort == 0 {
*v.adminPort = *v.port
if *v.publicPort == 0 {
*v.publicPort = *v.port
} }
isSeperatedAdminPort := *v.adminPort != *v.port
isSeperatedPublicPort := *v.publicPort != *v.port
publicMux := http.NewServeMux()
adminMux := publicMux
if isSeperatedAdminPort {
adminMux = http.NewServeMux()
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
if isSeperatedPublicPort {
publicVolumeMux = http.NewServeMux()
} }
volumeServer := weed_server.NewVolumeServer(publicMux, adminMux,
*v.ip, *v.port, *v.adminPort, *v.publicUrl,
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.folders, v.folderMaxLimits,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,
@ -127,16 +127,16 @@ func runVolume(cmd *Command, args []string) bool {
if e != nil { if e != nil {
glog.Fatalf("Volume server listener error:%v", e) glog.Fatalf("Volume server listener error:%v", e)
} }
if isSeperatedAdminPort {
adminListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.adminPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "admin at", adminListeningAddress)
adminListener, e := util.NewListener(adminListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if isSeperatedPublicPort {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil { if e != nil {
glog.Fatalf("Volume server listener error:%v", e) glog.Fatalf("Volume server listener error:%v", e)
} }
go func() { go func() {
if e := http.Serve(adminListener, adminMux); e != nil {
glog.Fatalf("Volume server fail to serve admin: %v", e)
if e := http.Serve(publicListener, publicVolumeMux); e != nil {
glog.Fatalf("Volume server fail to serve public: %v", e)
} }
}() }()
} }
@ -145,7 +145,7 @@ func runVolume(cmd *Command, args []string) bool {
volumeServer.Shutdown() volumeServer.Shutdown()
}) })
if e := http.Serve(listener, publicMux); e != nil {
if e := http.Serve(listener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e) glog.Fatalf("Volume server fail to serve: %v", e)
} }
return true return true

9
go/weed/weed_server/volume_server.go

@ -23,8 +23,8 @@ type VolumeServer struct {
FixJpgOrientation bool FixJpgOrientation bool
} }
func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
port, adminPort int, publicUrl string,
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int, folders []string, maxCounts []int,
masterNode string, pulseSeconds int, masterNode string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
@ -37,7 +37,7 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
} }
vs.SetMasterNode(masterNode) vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")
@ -56,8 +56,7 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler)) adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler))
adminMux.HandleFunc("/", vs.privateStoreHandler) adminMux.HandleFunc("/", vs.privateStoreHandler)
} }
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
publicMux.HandleFunc("/", vs.publicStoreHandler)
publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
go func() { go func() {
connected := true connected := true

25
go/weed/weed_server/volume_server_handlers.go

@ -8,19 +8,17 @@ import (
/* /*
Public port supports reads. Writes on public port can have one of the 3
If volume server is started with a separated public port, the public port will
be more "secure".
Public port currently only supports reads.
Later writes on public port can have one of the 3
security settings: security settings:
1. not secured 1. not secured
2. secured by white list 2. secured by white list
3. secured by JWT(Json Web Token) 3. secured by JWT(Json Web Token)
If volume server is started with a separated admin port, the admin port will
have less "security" for easier implementation.
Admin port always supports reads. Writes on admin port can have one of
the 2 security settings:
1. not secured
2. secured by white list
*/ */
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
@ -43,7 +41,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
} }
} }
func (vs *VolumeServer) publicStoreHandler(w http.ResponseWriter, r *http.Request) {
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
case "GET": case "GET":
stats.ReadRequest() stats.ReadRequest()
@ -51,14 +49,5 @@ func (vs *VolumeServer) publicStoreHandler(w http.ResponseWriter, r *http.Reques
case "HEAD": case "HEAD":
stats.ReadRequest() stats.ReadRequest()
vs.GetOrHeadHandler(w, r) vs.GetOrHeadHandler(w, r)
case "DELETE":
stats.DeleteRequest()
vs.guard.Secure(vs.DeleteHandler)(w, r)
case "PUT":
stats.WriteRequest()
vs.guard.Secure(vs.PostHandler)(w, r)
case "POST":
stats.WriteRequest()
vs.guard.Secure(vs.PostHandler)(w, r)
} }
} }
Loading…
Cancel
Save