Browse Source

1. volume server now sends master server its max file key, so that

master server does not need to store the sequence on disk any more
2. fix raft server's failure to init cluster during bootstrapping
pull/2/head
Chris Lu 11 years ago
parent
commit
51939efeac
  1. 30
      go/metastore/backing_test.go
  2. 34
      go/metastore/file_backing.go
  3. 36
      go/metastore/memory_backing.go
  4. 33
      go/metastore/metastore.go
  5. 19
      go/sequence/memory_sequencer.go
  6. 86
      go/sequence/sequence.go
  7. 4
      go/storage/cdb_map.go
  8. 13
      go/storage/needle_map.go
  9. 8
      go/storage/store.go
  10. 9
      go/topology/topology.go
  11. 3
      go/weed/master.go
  12. 3
      go/weed/server.go
  13. 5
      go/weed/weed_server/master_server.go
  14. 3
      go/weed/weed_server/master_server_handlers_admin.go
  15. 20
      go/weed/weed_server/raft_server.go
  16. 2
      go/weed/weed_server/raft_server_handlers.go
  17. 4
      go/weed/weed_server/volume_server.go

30
go/metastore/backing_test.go

@ -1,30 +0,0 @@
package metastore
import (
"testing"
)
func TestMemoryBacking(t *testing.T) {
ms := &MetaStore{NewMetaStoreMemoryBacking()}
verifySetGet(t, ms)
}
func TestFileBacking(t *testing.T) {
ms := &MetaStore{NewMetaStoreFileBacking()}
verifySetGet(t, ms)
}
func verifySetGet(t *testing.T, ms *MetaStore) {
data := uint64(234234)
ms.SetUint64("/tmp/sequence", data)
if !ms.Has("/tmp/sequence") {
t.Errorf("Failed to set data")
}
if val, err := ms.GetUint64("/tmp/sequence"); err == nil {
if val != data {
t.Errorf("Set %d, but read back %d", data, val)
}
} else {
t.Errorf("Failed to get back data:%s", err)
}
}

34
go/metastore/file_backing.go

@ -1,34 +0,0 @@
package metastore
import (
"io/ioutil"
"os"
)
// store data on disk, enough for most cases
type MetaStoreFileBacking struct {
}
func NewMetaStoreFileBacking() *MetaStoreFileBacking {
mms := &MetaStoreFileBacking{}
return mms
}
func (mms *MetaStoreFileBacking) Set(path, val string) error {
return ioutil.WriteFile(path, []byte(val), 0644)
}
func (mms *MetaStoreFileBacking) Get(path string) (string, error) {
val, e := ioutil.ReadFile(path)
return string(val), e
}
func (mms *MetaStoreFileBacking) Has(path string) (ok bool) {
seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644)
if se != nil {
return false
}
defer seqFile.Close()
return true
}

36
go/metastore/memory_backing.go

@ -1,36 +0,0 @@
package metastore
import (
"fmt"
)
//this is for testing only
type MetaStoreMemoryBacking struct {
m map[string]string
}
func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking {
mms := &MetaStoreMemoryBacking{}
mms.m = make(map[string]string)
return mms
}
func (mms MetaStoreMemoryBacking) Set(path, val string) error {
mms.m[path] = val
return nil
}
func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) {
var ok bool
val, ok = mms.m[path]
if !ok {
return "", fmt.Errorf("Missing value for %s", path)
}
return
}
func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) {
_, ok = mms.m[path]
return
}

33
go/metastore/metastore.go

@ -1,33 +0,0 @@
package metastore
import (
"errors"
"strconv"
)
type MetaStoreBacking interface {
Get(path string) (string, error)
Set(path, val string) error
Has(path string) bool
}
type MetaStore struct {
MetaStoreBacking
}
func (m *MetaStore) SetUint64(path string, val uint64) error {
return m.Set(path, strconv.FormatUint(val, 10))
}
func (m *MetaStore) GetUint64(path string) (val uint64, err error) {
if b, e := m.Get(path); e == nil {
val, err = strconv.ParseUint(b, 10, 64)
return
} else {
if e != nil {
return 0, e
}
err = errors.New("Not found value for " + path)
}
return
}

19
go/sequence/memory_sequencer.go

@ -1,10 +1,13 @@
package sequence
import ()
import (
"sync"
)
// just for testing
type MemorySequencer struct {
counter uint64
sequenceLock sync.Mutex
}
func NewMemorySequencer() (m *MemorySequencer) {
@ -13,7 +16,21 @@ func NewMemorySequencer() (m *MemorySequencer) {
}
func (m *MemorySequencer) NextFileId(count int) (uint64, int) {
m.sequenceLock.Lock()
defer m.sequenceLock.Unlock()
ret := m.counter
m.counter += uint64(count)
return ret, count
}
func (m *MemorySequencer) SetMax(seenValue uint64) {
m.sequenceLock.Lock()
defer m.sequenceLock.Unlock()
if m.counter <= seenValue {
m.counter = seenValue + 1
}
}
func (m *MemorySequencer) Peek() uint64 {
return m.counter
}

86
go/sequence/sequence.go

@ -1,89 +1,9 @@
package sequence
import (
"bytes"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/metastore"
"encoding/gob"
"sync"
)
const (
FileIdSaveInterval = 10000
)
import ()
type Sequencer interface {
NextFileId(count int) (uint64, int)
}
type SequencerImpl struct {
fileFullPath string
volumeLock sync.Mutex
sequenceLock sync.Mutex
FileIdSequence uint64
fileIdCounter uint64
metaStore *metastore.MetaStore
}
func NewFileSequencer(filepath string) (m *SequencerImpl) {
m = &SequencerImpl{fileFullPath: filepath}
m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()}
m.initilize()
return
}
func (m *SequencerImpl) initilize() {
if !m.metaStore.Has(m.fileFullPath) {
m.FileIdSequence = FileIdSaveInterval
glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
} else {
var err error
if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil {
if data, err := m.metaStore.Get(m.fileFullPath); err == nil {
m.FileIdSequence = decode(data)
glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence)
} else {
glog.V(0).Infof("No existing FileIdSequence: %s", err)
}
} else {
glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence)
}
//in case the server stops between intervals
}
return
}
//count should be 1 or more
func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
if count <= 0 {
return 0, 0
}
m.sequenceLock.Lock()
defer m.sequenceLock.Unlock()
if m.fileIdCounter < uint64(count) {
m.fileIdCounter = FileIdSaveInterval
m.FileIdSequence += FileIdSaveInterval
m.saveSequence()
}
m.fileIdCounter = m.fileIdCounter - uint64(count)
return m.FileIdSequence - m.fileIdCounter - uint64(count), count
}
func (m *SequencerImpl) saveSequence() {
glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath)
if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil {
glog.Fatalf("Sequence id Save [ERROR] %s", e)
}
}
//decode are for backward compatible purpose
func decode(input string) uint64 {
var x uint64
b := bytes.NewReader([]byte(input))
decoder := gob.NewDecoder(b)
if e := decoder.Decode(&x); e == nil {
return x
}
return 0
SetMax(uint64)
Peek() uint64
}

4
go/storage/cdb_map.go

@ -80,8 +80,8 @@ func (m cdbMap) FileCount() int {
func (m *cdbMap) DeletedCount() int {
return m.DeletionCounter
}
func (m *cdbMap) NextFileKey(count int) uint64 {
return 0
func (m *cdbMap) MaxFileKey() uint64 {
return m.MaximumFileKey
}
func getMetric(c *cdb.Cdb, m *mapMetric) error {

13
go/storage/needle_map.go

@ -19,7 +19,7 @@ type NeedleMapper interface {
FileCount() int
DeletedCount() int
Visit(visit func(NeedleValue) error) (err error)
NextFileKey(count int) uint64
MaxFileKey() uint64
}
type mapMetric struct {
@ -110,6 +110,9 @@ func walkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
if key > nm.MaximumFileKey {
nm.MaximumFileKey = key
}
oldSize := nm.m.Set(Key(key), offset, size)
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
@ -172,11 +175,3 @@ func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
func (nm NeedleMap) MaxFileKey() uint64 {
return nm.MaximumFileKey
}
func (nm NeedleMap) NextFileKey(count int) (ret uint64) {
if count <= 0 {
return 0
}
ret = nm.MaximumFileKey
nm.MaximumFileKey += uint64(count)
return
}

8
go/storage/store.go

@ -44,6 +44,9 @@ func (mn *MasterNodes) findMaster() (string, error) {
if mn.lastNode < 0 {
for _, m := range mn.nodes {
if masters, e := operation.ListMasters(m); e == nil {
if len(masters) == 0 {
continue
}
mn.nodes = masters
mn.lastNode = rand.Intn(len(mn.nodes))
glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
@ -268,6 +271,7 @@ func (s *Store) Join() error {
}
stats := new([]*VolumeInfo)
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
@ -280,6 +284,9 @@ func (s *Store) Join() error {
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
}
}
bytes, _ := json.Marshal(stats)
@ -292,6 +299,7 @@ func (s *Store) Join() error {
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10))
values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)

9
go/topology/topology.go

@ -19,7 +19,7 @@ type Topology struct {
volumeSizeLimit uint64
sequence sequence.Sequencer
Sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
@ -40,7 +40,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.sequence = seq
t.Sequence = seq
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
@ -118,7 +118,7 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
fileId, count := t.sequence.NextFileId(count)
fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
@ -143,7 +143,8 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) {
t.Sequence.SetMax(maxFileKey)
dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)

3
go/weed/master.go

@ -72,11 +72,12 @@ func runMaster(cmd *Command, args []string) bool {
go func() {
time.Sleep(100 * time.Millisecond)
myAddress := *masterIp + ":" + strconv.Itoa(*mport)
var peers []string
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
}
raftServer := weed_server.NewRaftServer(r, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder, ms.Topo, *mpulse)
raftServer := weed_server.NewRaftServer(r, peers, myAddress, *metaFolder, ms.Topo, *mpulse)
ms.SetRaftServer(raftServer)
}()

3
go/weed/server.go

@ -164,11 +164,12 @@ func runServer(cmd *Command, args []string) bool {
go func() {
raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond)
myAddress := *serverIp + ":" + strconv.Itoa(*masterPort)
var peers []string
if *serverPeers != "" {
peers = strings.Split(*serverPeers, ",")
}
raftServer := weed_server.NewRaftServer(r, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder, ms.Topo, *volumePulse)
raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse)
ms.SetRaftServer(raftServer)
volumeWait.Done()
}()

5
go/weed/weed_server/master_server.go

@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"path"
"sync"
)
@ -48,7 +47,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
whiteList: whiteList,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
seq := sequence.NewMemorySequencer()
var e error
if ms.Topo, e = topology.NewTopology("topo", confFile, seq,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
@ -97,7 +96,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
} else if ms.Topo.RaftServer.Leader() != "" {
} else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())

3
go/weed/weed_server/master_server_handlers_admin.go

@ -36,6 +36,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
port, _ := strconv.Atoi(r.FormValue("port"))
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64)
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
@ -44,7 +45,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack"))
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}

20
go/weed/weed_server/raft_server.go

@ -77,13 +77,6 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
return nil
}
}
var err error
for err != nil {
glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...")
time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond)
err = s.Join(s.peers)
}
glog.V(0).Infoln("Joined cluster")
}
// Initialize the server by joining itself.
@ -124,14 +117,17 @@ func (s *RaftServer) Join(peers []string) error {
ConnectionString: "http://" + s.httpAddr,
}
var err error
var b bytes.Buffer
json.NewEncoder(&b).Encode(command)
for _, m := range peers {
if m == s.httpAddr {
continue
}
target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m))
glog.V(0).Infoln("Attempting to connect to:", target)
err := postFollowingOneRedirect(target, "application/json", &b)
err = postFollowingOneRedirect(target, "application/json", &b)
if err != nil {
glog.V(0).Infoln("Post returned error: ", err.Error())
@ -139,12 +135,10 @@ func (s *RaftServer) Join(peers []string) error {
// If we receive a network error try the next member
continue
}
return err
}
} else {
return nil
}
}
return errors.New("Could not connect to any cluster peers")
}

2
go/weed/weed_server/raft_server_handlers.go

@ -18,7 +18,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
commandText, _ := ioutil.ReadAll(req.Body)
glog.V(0).Info("Command:", string(commandText))
if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil {
glog.V(0).Infoln("Error decoding json message:", err)
glog.V(0).Infoln("Error decoding json message:", err, string(commandText))
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

4
go/weed/weed_server/volume_server.go

@ -54,10 +54,10 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicUrl string, fo
if err == nil {
if !connected {
connected = true
glog.V(0).Infoln("Reconnected with master")
glog.V(0).Infoln("Volume Server Connected with master")
}
} else {
glog.V(4).Infoln("Failing to talk with master:", err.Error())
glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
if connected {
connected = false
}

Loading…
Cancel
Save