Browse Source

Merge pull request #79 from aszxqw/master

use sync.RWMutex when masterNode changes
pull/96/head
chrislusf 10 years ago
parent
commit
75f4964c48
  1. 23
      go/weed/weed_server/volume_server.go
  2. 6
      go/weed/weed_server/volume_server_handlers.go

23
go/weed/weed_server/volume_server.go

@ -3,6 +3,7 @@ package weed_server
import ( import (
"math/rand" "math/rand"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/glog"
@ -12,6 +13,7 @@ import (
type VolumeServer struct { type VolumeServer struct {
masterNode string masterNode string
mnLock sync.RWMutex
pulseSeconds int pulseSeconds int
dataCenter string dataCenter string
rack string rack string
@ -29,12 +31,12 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
whiteList []string, whiteList []string,
fixJpgOrientation bool) *VolumeServer { fixJpgOrientation bool) *VolumeServer {
vs := &VolumeServer{ vs := &VolumeServer{
masterNode: masterNode,
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
rack: rack, rack: rack,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
} }
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts) vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")
@ -54,7 +56,8 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
go func() { go func() {
connected := true connected := true
vs.store.SetBootstrapMaster(vs.masterNode)
vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter) vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack) vs.store.SetRack(vs.rack)
for { for {
@ -62,8 +65,8 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
if err == nil { if err == nil {
if !connected { if !connected {
connected = true connected = true
vs.masterNode = master
glog.V(0).Infoln("Volume Server Connected with master at", master)
vs.SetMasterNode(master)
glog.V(0).Infoln("Volume Server Connected with master at", master, "and set it as masterNode")
} }
} else { } else {
glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error()) glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
@ -82,6 +85,18 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
return vs return vs
} }
func (vs *VolumeServer) GetMasterNode() string {
vs.mnLock.RLock()
defer vs.mnLock.RUnlock()
return vs.masterNode
}
func (vs *VolumeServer) SetMasterNode(masterNode string) {
vs.mnLock.Lock()
defer vs.mnLock.Unlock()
vs.masterNode = masterNode
}
func (vs *VolumeServer) Shutdown() { func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...") glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close() vs.store.Close()

6
go/weed/weed_server/volume_server_handlers.go

@ -58,7 +58,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(4).Infoln("volume", volumeId, "reading", n) glog.V(4).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) { if !vs.store.HasVolume(volumeId) {
lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 { if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently) http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently)
@ -253,7 +253,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated httpStatus := http.StatusCreated
if errorStatus != "" { if errorStatus != "" {
httpStatus = http.StatusInternalServerError httpStatus = http.StatusInternalServerError
@ -290,7 +290,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} }
n.Size = 0 n.Size = 0
ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
ret := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r)
if ret != 0 { if ret != 0 {
m := make(map[string]uint32) m := make(map[string]uint32)

Loading…
Cancel
Save