diff --git a/go/metastore/backing_test.go b/go/metastore/backing_test.go index 2a2c23323..c8497fe4b 100644 --- a/go/metastore/backing_test.go +++ b/go/metastore/backing_test.go @@ -14,13 +14,18 @@ func TestFileBacking(t *testing.T) { verifySetGet(t, ms) } +func TestEtcdBacking(t *testing.T) { + ms := &MetaStore{NewMetaStoreEtcdBacking("http://localhost:4001")} + verifySetGet(t, ms) +} + func verifySetGet(t *testing.T, ms *MetaStore) { data := uint64(234234) - ms.SetUint64(data, "/tmp", "sequence") - if !ms.Has("/tmp", "sequence") { + ms.SetUint64("/tmp/sequence", data) + if !ms.Has("/tmp/sequence") { t.Errorf("Failed to set data") } - if val, err := ms.GetUint64("/tmp", "sequence"); err == nil { + if val, err := ms.GetUint64("/tmp/sequence"); err == nil { if val != data { t.Errorf("Set %d, but read back %d", data, val) } diff --git a/go/metastore/etcd_backing.go b/go/metastore/etcd_backing.go new file mode 100644 index 000000000..ac5f2b1a5 --- /dev/null +++ b/go/metastore/etcd_backing.go @@ -0,0 +1,51 @@ +package metastore + +import ( + "code.google.com/p/weed-fs/go/glog" + "errors" + "github.com/coreos/go-etcd/etcd" + "strings" +) + +// store data on etcd + +type MetaStoreEtcdBacking struct { + client *etcd.Client +} + +func NewMetaStoreEtcdBacking(etcdCluster string) *MetaStoreEtcdBacking { + m := &MetaStoreEtcdBacking{} + m.client = etcd.NewClient(strings.Split(etcdCluster, ",")) + return m +} + +func (m MetaStoreEtcdBacking) Set(path, val string) error { + res, e := m.client.Set(path, val, 0) + glog.V(0).Infof("etcd set response: %+v\n", res) + return e +} + +func (m MetaStoreEtcdBacking) Get(path string) (string, error) { + results, err := m.client.Get(path) + for i, res := range results { + glog.V(0).Infof("[%d] get response: %+v\n", i, res) + } + if err != nil { + return "", err + } + if results[0].Key != path { + return "", errors.New("Key Not Found:" + path) + } + return results[0].Value, nil +} + +func (m MetaStoreEtcdBacking) Has(path string) (ok bool) { + results, err := m.client.Get(path) + if err != nil { + return false + } + if results[0].Key != path { + return false + } + return true +} diff --git a/go/metastore/file_backing.go b/go/metastore/file_backing.go index 5fb3b39cc..1dc0c963f 100644 --- a/go/metastore/file_backing.go +++ b/go/metastore/file_backing.go @@ -3,7 +3,6 @@ package metastore import ( "io/ioutil" "os" - "path" ) // store data on disk, enough for most cases @@ -11,21 +10,22 @@ import ( type MetaStoreFileBacking struct { } -func NewMetaStoreFileBacking() MetaStoreFileBacking { - mms := MetaStoreFileBacking{} +func NewMetaStoreFileBacking() *MetaStoreFileBacking { + mms := &MetaStoreFileBacking{} return mms } -func (mms MetaStoreFileBacking) Set(val []byte, elem ...string) error { - return ioutil.WriteFile(path.Join(elem...), val, 0644) +func (mms *MetaStoreFileBacking) Set(path, val string) error { + return ioutil.WriteFile(path, []byte(val), 0644) } -func (mms MetaStoreFileBacking) Get(elem ...string) (val []byte, err error) { - return ioutil.ReadFile(path.Join(elem...)) +func (mms *MetaStoreFileBacking) Get(path string) (string, error) { + val, e := ioutil.ReadFile(path) + return string(val), e } -func (mms MetaStoreFileBacking) Has(elem ...string) (ok bool) { - seqFile, se := os.OpenFile(path.Join(elem...), os.O_RDONLY, 0644) +func (mms *MetaStoreFileBacking) Has(path string) (ok bool) { + seqFile, se := os.OpenFile(path, os.O_RDONLY, 0644) if se != nil { return false } diff --git a/go/metastore/memory_backing.go b/go/metastore/memory_backing.go index 86957225a..4f45c2e5f 100644 --- a/go/metastore/memory_backing.go +++ b/go/metastore/memory_backing.go @@ -2,36 +2,35 @@ package metastore import ( "fmt" - "path" ) //this is for testing only type MetaStoreMemoryBacking struct { - m map[string][]byte + m map[string]string } -func NewMetaStoreMemoryBacking() MetaStoreMemoryBacking { - mms := MetaStoreMemoryBacking{} - mms.m = make(map[string][]byte) +func NewMetaStoreMemoryBacking() *MetaStoreMemoryBacking { + mms := &MetaStoreMemoryBacking{} + mms.m = make(map[string]string) return mms } -func (mms MetaStoreMemoryBacking) Set(val []byte, elem ...string) error { - mms.m[path.Join(elem...)] = val +func (mms MetaStoreMemoryBacking) Set(path, val string) error { + mms.m[path] = val return nil } -func (mms MetaStoreMemoryBacking) Get(elem ...string) (val []byte, err error) { +func (mms MetaStoreMemoryBacking) Get(path string) (val string, err error) { var ok bool - val, ok = mms.m[path.Join(elem...)] + val, ok = mms.m[path] if !ok { - return nil, fmt.Errorf("Missing value for %s", path.Join(elem...)) + return "", fmt.Errorf("Missing value for %s", path) } return } -func (mms MetaStoreMemoryBacking) Has(elem ...string) (ok bool) { - _, ok = mms.m[path.Join(elem...)] +func (mms MetaStoreMemoryBacking) Has(path string) (ok bool) { + _, ok = mms.m[path] return } diff --git a/go/metastore/metastore.go b/go/metastore/metastore.go index da87dbe85..2b1fdc6d8 100644 --- a/go/metastore/metastore.go +++ b/go/metastore/metastore.go @@ -1,35 +1,33 @@ package metastore import ( - "code.google.com/p/weed-fs/go/util" "errors" - "path" + "strconv" ) type MetaStoreBacking interface { - Get(elem ...string) ([]byte, error) - Set(val []byte, elem ...string) error - Has(elem ...string) bool + Get(path string) (string, error) + Set(path, val string) error + Has(path string) bool } type MetaStore struct { MetaStoreBacking } -func (m *MetaStore) SetUint64(val uint64, elem ...string) error { - b := make([]byte, 8) - util.Uint64toBytes(b, val) - return m.Set(b, elem...) +func (m *MetaStore) SetUint64(path string, val uint64) error { + return m.Set(path, strconv.FormatUint(val, 10)) } -func (m *MetaStore) GetUint64(elem ...string) (val uint64, err error) { - if b, e := m.Get(elem...); e == nil && len(b) == 8 { - val = util.BytesToUint64(b) +func (m *MetaStore) GetUint64(path string) (val uint64, err error) { + if b, e := m.Get(path); e == nil { + val, err = strconv.ParseUint(b, 10, 64) + return } else { if e != nil { return 0, e } - err = errors.New("Not found value for " + path.Join(elem...)) + err = errors.New("Not found value for " + path) } return } diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go index 031972bee..99f82a7fa 100644 --- a/go/replication/volume_growth_test.go +++ b/go/replication/volume_growth_test.go @@ -1,6 +1,7 @@ package replication import ( + "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "encoding/json" @@ -79,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { //need to connect all nodes first before server adding volumes topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf", - "/tmp", "testing", 32*1024, 5) + sequence.NewMemorySequencer(), 32*1024, 5) if err != nil { panic("error: " + err.Error()) } diff --git a/go/sequence/memory_sequencer.go b/go/sequence/memory_sequencer.go new file mode 100644 index 000000000..d72952ff4 --- /dev/null +++ b/go/sequence/memory_sequencer.go @@ -0,0 +1,19 @@ +package sequence + +import () + +// just for testing +type MemorySequencer struct { + counter uint64 +} + +func NewMemorySequencer() (m *MemorySequencer) { + m = &MemorySequencer{counter: 1} + return +} + +func (m *MemorySequencer) NextFileId(count int) (uint64, int) { + ret := m.counter + m.counter += uint64(count) + return ret, count +} diff --git a/go/sequence/sequence.go b/go/sequence/sequence.go index 774607e54..bbc4bdf82 100644 --- a/go/sequence/sequence.go +++ b/go/sequence/sequence.go @@ -5,7 +5,6 @@ import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/metastore" "encoding/gob" - "path" "sync" ) @@ -17,8 +16,7 @@ type Sequencer interface { NextFileId(count int) (uint64, int) } type SequencerImpl struct { - dir string - fileName string + fileFullPath string volumeLock sync.Mutex sequenceLock sync.Mutex @@ -29,19 +27,30 @@ type SequencerImpl struct { metaStore *metastore.MetaStore } -func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} +func NewFileSequencer(filepath string) (m *SequencerImpl) { + m = &SequencerImpl{fileFullPath: filepath} m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()} + m.initilize() + return +} + +func NewEtcdSequencer(etcdCluster string) (m *SequencerImpl) { + m = &SequencerImpl{fileFullPath: "/weedfs/default/sequence"} + m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreEtcdBacking(etcdCluster)} + m.initilize() + return +} - if !m.metaStore.Has(m.dir, m.fileName+".seq") { +func (m *SequencerImpl) initilize() { + if !m.metaStore.Has(m.fileFullPath) { m.FileIdSequence = FileIdSaveInterval glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence) } else { var err error - if m.FileIdSequence, err = m.metaStore.GetUint64(m.dir, m.fileName+".seq"); err != nil { - if data, err := m.metaStore.Get(m.dir, m.fileName+".seq"); err == nil { + if m.FileIdSequence, err = m.metaStore.GetUint64(m.fileFullPath); err != nil { + if data, err := m.metaStore.Get(m.fileFullPath); err == nil { m.FileIdSequence = decode(data) - glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence) + glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence) } else { glog.V(0).Infof("No existing FileIdSequence: %s", err) } @@ -69,16 +78,16 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) { return m.FileIdSequence - m.fileIdCounter - uint64(count), count } func (m *SequencerImpl) saveSequence() { - glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - if e := m.metaStore.SetUint64(m.FileIdSequence, m.dir, m.fileName+".seq"); e != nil { + glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", m.fileFullPath) + if e := m.metaStore.SetUint64(m.fileFullPath, m.FileIdSequence); e != nil { glog.Fatalf("Sequence id Save [ERROR] %s", e) } } //decode are for backward compatible purpose -func decode(input []byte) uint64 { +func decode(input string) uint64 { var x uint64 - b := bytes.NewReader(input) + b := bytes.NewReader([]byte(input)) decoder := gob.NewDecoder(b) if e := decoder.Decode(&x); e == nil { return x diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go index c7b165ea6..c526f55f8 100644 --- a/go/topology/node_list_test.go +++ b/go/topology/node_list_test.go @@ -1,13 +1,14 @@ package topology import ( + "code.google.com/p/weed-fs/go/sequence" _ "fmt" "strconv" "testing" ) func TestXYZ(t *testing.T) { - topo, err := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5) if err != nil { t.Error("cannot create new topology:", err) t.FailNow() diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index d5ea08086..36f4963db 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -1,6 +1,7 @@ package topology import ( + "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "encoding/json" "fmt" @@ -78,7 +79,7 @@ func setup(topologyLayout string) *Topology { } //need to connect all nodes first before server adding volumes - topo, err := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + topo, err := NewTopology("mynetwork", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5) if err != nil { fmt.Println("error:", err) } diff --git a/go/topology/topology.go b/go/topology/topology.go index d0e9fb42b..b21601210 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -28,7 +28,7 @@ type Topology struct { configuration *Configuration } -func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) (*Topology, error) { +func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" @@ -38,7 +38,7 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit - t.sequence = sequence.NewSequencer(dirname, sequenceFilename) + t.sequence = seq t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) diff --git a/go/weed/master.go b/go/weed/master.go index c1ada76fb..950aaca6d 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -5,12 +5,14 @@ import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "encoding/json" "errors" "net/http" "os" + "path" "runtime" "strconv" "strings" @@ -43,6 +45,7 @@ var ( mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + etcdCluster = cmdMaster.Flag.String("etcd", "", "comma separated etcd urls, e.g., http://localhost:4001, See github.com/coreos/go-etcd/etcd") masterWhiteList []string ) @@ -215,8 +218,14 @@ func runMaster(cmd *Command, args []string) bool { if *masterWhiteListOption != "" { masterWhiteList = strings.Split(*masterWhiteListOption, ",") } + var seq sequence.Sequencer + if len(*etcdCluster) == 0 { + seq = sequence.NewFileSequencer(path.Join(*metaFolder, "weed.seq")) + } else { + seq = sequence.NewEtcdSequencer(*etcdCluster) + } var e error - if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed", + if topo, e = topology.NewTopology("topo", *confFile, seq, uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil { glog.Fatalf("cannot create topology:%s", e) }