From 364f7200ad78d83cff5f213965546548227d5da2 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Mon, 4 Nov 2019 16:36:06 +0800 Subject: [PATCH 1/5] Create etcd_sequencer.go the 1st version --- weed/sequence/etcd_sequencer.go | 378 ++++++++++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 weed/sequence/etcd_sequencer.go diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go new file mode 100644 index 000000000..51e0ec93f --- /dev/null +++ b/weed/sequence/etcd_sequencer.go @@ -0,0 +1,378 @@ +package sequence + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "go.etcd.io/etcd/client" + "io" + "os" + "strconv" + "strings" + "sync" + "time" +) + +const ( + EtcdKeySequence = "/master/sequence" + EtcdKeyPrefix = "/seaweedfs" + EtcdContextTimeoutSecond = 100 * time.Second + DefaultEtcdSteps uint64 = 500 // internal counter + SequencerFileName = "sequencer.dat" + FileMaxSequenceLength = 128 +) + +type EtcdSequencer struct { + sequenceLock sync.Mutex + + // available sequence range : [steps, maxCounter) + maxCounter uint64 + steps uint64 + + etcdClient client.Client + keysAPI client.KeysAPI + seqFile *os.File +} + +func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) { + file, err := openSequenceFile(metaFolder + "/" + SequencerFileName) + if nil != err { + return nil, fmt.Errorf("open sequence file fialed, %v", err) + } + + cli, err := client.New(client.Config{ + Endpoints: strings.Split(etcdUrls, ","), + Username: "", + Password: "", + }) + if err != nil { + return nil, err + } + keysApi := client.NewKeysAPI(cli) + + maxValue, _, err := readSequenceFile(file) + if err != nil { + return nil, fmt.Errorf("read sequence from file failed, %v", err) + } + glog.V(4).Infof("read sequence from file : %d", maxValue) + + newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue) + if err != nil { + return nil, err + } + + // make the step and max the same, and then they are fake, + // after invoking the NextFileId(), they are different and real + maxCounter, steps := newSeq, newSeq + sequencer := &EtcdSequencer{maxCounter: maxCounter, + steps: steps, + etcdClient: cli, + keysAPI: keysApi, + seqFile: file, + } + return sequencer, nil +} + +func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) { + es.sequenceLock.Lock() + defer es.sequenceLock.Unlock() + if (es.steps + count) >= es.maxCounter { + reqSteps := DefaultEtcdSteps + if count > DefaultEtcdSteps { + reqSteps += count + } + maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps) + glog.V(4).Infof("get max sequence id from etcd, %d", maxId) + if err != nil { + glog.Error(err) + return 0, 0 + } + es.steps, es.maxCounter = maxId-reqSteps, maxId + glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) + + if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { + glog.Errorf("flush sequence to file failed, %v", err) + } + } + ret := es.steps + es.steps += count + return ret, count +} + +/** +instead of collecting the max value from volume server, +the max value should be saved in local config file and ETCD cluster +*/ +func (es *EtcdSequencer) SetMax(seenValue uint64) { + es.sequenceLock.Lock() + defer es.sequenceLock.Unlock() + if seenValue > es.maxCounter { + maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue) + if err != nil { + glog.Errorf("set Etcd Max sequence failed : %v", err) + return + } + es.steps, es.maxCounter = maxId, maxId + + if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil { + glog.Errorf("flush sequence to file failed, %v", err) + } + } +} + +func (es *EtcdSequencer) GetMax() uint64 { + return es.maxCounter +} + +func (es *EtcdSequencer) Peek() uint64 { + return es.steps +} + +func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) { + if step <= 0 { + return 0, fmt.Errorf("the step must be large than 1") + } + + ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) + var endSeqValue uint64 = 0 + defer cancel() + for { + getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true}) + if err != nil { + return 0, err + } + if getResp.Node == nil { + continue + } + + prevValue := getResp.Node.Value + prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64) + if err != nil { + return 0, fmt.Errorf("get sequence from etcd failed, %v", err) + } + endSeqValue = prevSeqValue + step + endSeqStr := strconv.FormatUint(endSeqValue, 10) + + _, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue}) + if err == nil { + break + } + glog.Error(err) + } + + return endSeqValue, nil +} + +/** + update the key of EtcdKeySequence in ETCD cluster with the parameter of maxSeq, +until the value of EtcdKeySequence is equal to or larger than the maxSeq +*/ +func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { + maxSeqStr := strconv.FormatUint(maxSeq, 10) + ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) + defer cancel() + + for { + getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true}) + if err != nil { + if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) { + _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr) + if err == nil { + continue // create ETCD key success, retry get ETCD value + } + if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) { + continue // ETCD key exist, retry get ETCD value + } + return 0, err + } else { + return 0, err + } + } + + if getResp.Node == nil { + continue + } + prevSeqStr := getResp.Node.Value + prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64) + if err != nil { + return 0, err + } + if prevSeq >= maxSeq { + return prevSeq, nil + } + + _, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr}) + if err != nil { + return 0, err + } + } + + return maxSeq, nil +} + +func openSequenceFile(file string) (*os.File, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + if err := writeSequenceFile(fid, 1, 0); err != nil { + return nil, err + } + return fid, nil + } else { + return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644) + } +} + +/* + sequence : step 以冒号分割 +*/ +func readSequenceFile(file *os.File) (uint64, uint64, error) { + sequence := make([]byte, FileMaxSequenceLength) + size, err := file.ReadAt(sequence, 0) + if (err != nil) && (err != io.EOF) { + err := fmt.Errorf("cannot read file %s, %v", file.Name(), err) + return 0, 0, err + } + sequence = sequence[0:size] + seqs := strings.Split(string(sequence), ":") + maxId, err := strconv.ParseUint(seqs[0], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err) + } + + if len(seqs) > 1 { + step, err := strconv.ParseUint(seqs[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err) + } + return maxId, step, nil + } + + return maxId, 0, nil +} + +/** +先不存放step到文件中 +*/ +func writeSequenceFile(file *os.File, sequence, step uint64) error { + _ = step + seqStr := fmt.Sprintf("%d:%d", sequence, sequence) + if _, err := file.Seek(0, 0); err != nil { + err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) + return err + } + if err := file.Truncate(0); err != nil { + return fmt.Errorf("truncate sequence file faield : %v", err) + } + if _, err := file.WriteString(seqStr); err != nil { + return fmt.Errorf("write file %s failed, %v", file.Name(), err) + } + if err := file.Sync(); err != nil { + return fmt.Errorf("flush file %s failed, %v", file.Name(), err) + } + return nil +} + +func deleteEtcdKey(kvApi client.KeysAPI, key string) error { + ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) + defer cancel() + _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) + if err != nil { + return err + } + return nil +} + +//func (es *EtcdSequencer) Load() error { +// es.sequenceLock.Lock() +// defer es.sequenceLock.Unlock() +// reqSteps := DefaultEtcdSteps +// maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps) +// glog.V(4).Infof("get max sequence id from etcd, %d", maxId) +// if err != nil { +// glog.Error(err) +// return err +// } +// es.steps, es.maxCounter = maxId-reqSteps, maxId +// glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) +// +// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { +// glog.Errorf("flush sequence to file failed, %v", err) +// return err +// } +// return nil +//} + +//func getEtcdKey(kv client.KeysAPI, key string) (string, error) { +// resp, err := kv.Get(context.Background(), key, &client.GetOptions{Recursive: false, Quorum: true}) +// if err != nil { +// glog.Warningf("key:%s result:%v", EtcdKeySequence, err) +// return "", err +// } +// if resp.Node == nil { +// return "", fmt.Errorf("the key is not exist, %s", key) +// } +// return resp.Node.Value, nil +//} + +//func (es *EtcdSequencer) setLocalSequence(maxValue uint64) { +// es.sequenceLock.Lock() +// defer es.sequenceLock.Unlock() +// if maxValue > es.maxCounter { +// es.maxCounter, es.steps = maxValue, maxValue-DefaultEtcdSteps +// +// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { +// glog.Errorf("flush sequence to file failed, %v", err) +// } +// } +//} + +//func getEtcdKeysApi(etcdUrls, user, passwd string) (client.KeysAPI, error) { +// cli, err := client.New(client.Config{ +// Endpoints: strings.Split(etcdUrls, ","), +// Username: user, +// Password: passwd, +// }) +// if err != nil { +// return nil, err +// } +// keysApi := client.NewKeysAPI(cli) +// return keysApi, nil +//} + +//func (es *EtcdSequencer) asyncStartWatcher() { +// es.startWatcher(es.keysAPI, EtcdKeySequence, func(value string, index uint64) { +// newValue, err := strconv.ParseUint(value, 10, 64) +// if err != nil { +// glog.Warning(err) +// } +// es.setLocalSequence(newValue) +// }) +//} + +//func (es *EtcdSequencer) startWatcher(kvApi client.KeysAPI, key string, callback func(value string, index uint64)) { +// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) +// defer cancel() +// ctx.Done() +// +// getResp, err := kvApi.Get(ctx, key, &client.GetOptions{Recursive: false, Quorum: true}) +// if err != nil { +// return +// } +// +// watcher := kvApi.Watcher(key, &client.WatcherOptions{AfterIndex: getResp.Index, Recursive: false}) +// go func(w client.Watcher) { +// for { +// resp, err := w.Next(context.Background()) +// if err != nil { +// glog.Error(err) +// continue +// } +// callback(resp.Node.Value, resp.Index) +// } +// }(watcher) +// return +//} From 1c8bed381068f3a3bd6858c774c39744f760db73 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Mon, 4 Nov 2019 16:45:38 +0800 Subject: [PATCH 2/5] delete the var etcdClient and comments refactor the code add sequencer cmd-line delete nerver used codes --- weed/command/master.go | 6 + weed/sequence/etcd_sequencer.go | 198 ++++++++++---------------------- weed/server/master_server.go | 28 ++++- 3 files changed, 91 insertions(+), 141 deletions(-) diff --git a/weed/command/master.go b/weed/command/master.go index 3d33f4f7a..55e3409ed 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -37,6 +37,9 @@ type MasterOptions struct { disableHttp *bool metricsAddress *string metricsIntervalSec *int + + sequencerType *string + etcdUrls *string } func init() { @@ -55,6 +58,9 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + m.sequencerType = cmdMaster.Flag.String("sequencerType", "memory", "Choose [memory|etcd] type for store the file sequence") + m.etcdUrls = cmdMaster.Flag.String("etcdUrls", "", + "when sequencerType=etcd, set etcdUrls for etcd cluster that store file sequence, example : http://127.0.0.1:2379,http://127.0.0.1:2389") } var cmdMaster = &Command{ diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go index 51e0ec93f..1fc378640 100644 --- a/weed/sequence/etcd_sequencer.go +++ b/weed/sequence/etcd_sequencer.go @@ -1,21 +1,30 @@ package sequence +/* +Note : +(1) store the sequence in the ETCD cluster, and local file(sequence.dat) +(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file +(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId. +*/ + import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "go.etcd.io/etcd/client" + "sync" + "time" + "io" "os" "strconv" "strings" - "sync" - "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "go.etcd.io/etcd/client" ) const ( + // EtcdKeyPrefix = "/seaweedfs" EtcdKeySequence = "/master/sequence" - EtcdKeyPrefix = "/seaweedfs" EtcdContextTimeoutSecond = 100 * time.Second DefaultEtcdSteps uint64 = 500 // internal counter SequencerFileName = "sequencer.dat" @@ -25,13 +34,12 @@ const ( type EtcdSequencer struct { sequenceLock sync.Mutex - // available sequence range : [steps, maxCounter) - maxCounter uint64 - steps uint64 + // available sequence range : [currentSeqId, maxSeqId) + currentSeqId uint64 + maxSeqId uint64 - etcdClient client.Client - keysAPI client.KeysAPI - seqFile *os.File + keysAPI client.KeysAPI + seqFile *os.File } func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) { @@ -50,6 +58,7 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error } keysApi := client.NewKeysAPI(cli) + // TODO: the current sequence id in local file is not used maxValue, _, err := readSequenceFile(file) if err != nil { return nil, fmt.Errorf("read sequence from file failed, %v", err) @@ -61,22 +70,19 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error return nil, err } - // make the step and max the same, and then they are fake, - // after invoking the NextFileId(), they are different and real - maxCounter, steps := newSeq, newSeq - sequencer := &EtcdSequencer{maxCounter: maxCounter, - steps: steps, - etcdClient: cli, - keysAPI: keysApi, - seqFile: file, + sequencer := &EtcdSequencer{maxSeqId: newSeq, + currentSeqId: newSeq, + keysAPI: keysApi, + seqFile: file, } return sequencer, nil } -func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) { +func (es *EtcdSequencer) NextFileId(count uint64) uint64 { es.sequenceLock.Lock() defer es.sequenceLock.Unlock() - if (es.steps + count) >= es.maxCounter { + + if (es.currentSeqId + count) >= es.maxSeqId { reqSteps := DefaultEtcdSteps if count > DefaultEtcdSteps { reqSteps += count @@ -85,18 +91,19 @@ func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) { glog.V(4).Infof("get max sequence id from etcd, %d", maxId) if err != nil { glog.Error(err) - return 0, 0 + return 0 } - es.steps, es.maxCounter = maxId-reqSteps, maxId - glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) + es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId + glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId) - if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { + if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil { glog.Errorf("flush sequence to file failed, %v", err) } } - ret := es.steps - es.steps += count - return ret, count + + ret := es.currentSeqId + es.currentSeqId += count + return ret } /** @@ -106,13 +113,13 @@ the max value should be saved in local config file and ETCD cluster func (es *EtcdSequencer) SetMax(seenValue uint64) { es.sequenceLock.Lock() defer es.sequenceLock.Unlock() - if seenValue > es.maxCounter { + if seenValue > es.maxSeqId { maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue) if err != nil { glog.Errorf("set Etcd Max sequence failed : %v", err) return } - es.steps, es.maxCounter = maxId, maxId + es.currentSeqId, es.maxSeqId = maxId, maxId if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil { glog.Errorf("flush sequence to file failed, %v", err) @@ -121,11 +128,11 @@ func (es *EtcdSequencer) SetMax(seenValue uint64) { } func (es *EtcdSequencer) GetMax() uint64 { - return es.maxCounter + return es.maxSeqId } func (es *EtcdSequencer) Peek() uint64 { - return es.steps + return es.currentSeqId } func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) { @@ -164,8 +171,11 @@ func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) } /** - update the key of EtcdKeySequence in ETCD cluster with the parameter of maxSeq, -until the value of EtcdKeySequence is equal to or larger than the maxSeq +update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq, +when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq, +return the value of EtcdKeySequence in the ETCD cluster; +when the value of the EtcdKeySequence is less than the parameter maxSeq, +return the value of the parameter maxSeq */ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { maxSeqStr := strconv.FormatUint(maxSeq, 10) @@ -178,10 +188,10 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) { _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr) if err == nil { - continue // create ETCD key success, retry get ETCD value + continue } if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) { - continue // ETCD key exist, retry get ETCD value + continue } return 0, err } else { @@ -206,8 +216,6 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { return 0, err } } - - return maxSeq, nil } func openSequenceFile(file string) (*os.File, error) { @@ -227,7 +235,7 @@ func openSequenceFile(file string) (*os.File, error) { } /* - sequence : step 以冒号分割 +read sequence and step from sequence file */ func readSequenceFile(file *os.File) (uint64, uint64, error) { sequence := make([]byte, FileMaxSequenceLength) @@ -255,7 +263,7 @@ func readSequenceFile(file *os.File) (uint64, uint64, error) { } /** -先不存放step到文件中 +write the sequence and step to sequence file */ func writeSequenceFile(file *os.File, sequence, step uint64) error { _ = step @@ -276,103 +284,13 @@ func writeSequenceFile(file *os.File, sequence, step uint64) error { return nil } -func deleteEtcdKey(kvApi client.KeysAPI, key string) error { - ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) - defer cancel() - _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) - if err != nil { - return err - } - return nil -} - -//func (es *EtcdSequencer) Load() error { -// es.sequenceLock.Lock() -// defer es.sequenceLock.Unlock() -// reqSteps := DefaultEtcdSteps -// maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps) -// glog.V(4).Infof("get max sequence id from etcd, %d", maxId) -// if err != nil { -// glog.Error(err) -// return err -// } -// es.steps, es.maxCounter = maxId-reqSteps, maxId -// glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter) -// -// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { -// glog.Errorf("flush sequence to file failed, %v", err) -// return err -// } -// return nil -//} - -//func getEtcdKey(kv client.KeysAPI, key string) (string, error) { -// resp, err := kv.Get(context.Background(), key, &client.GetOptions{Recursive: false, Quorum: true}) -// if err != nil { -// glog.Warningf("key:%s result:%v", EtcdKeySequence, err) -// return "", err -// } -// if resp.Node == nil { -// return "", fmt.Errorf("the key is not exist, %s", key) -// } -// return resp.Node.Value, nil -//} - -//func (es *EtcdSequencer) setLocalSequence(maxValue uint64) { -// es.sequenceLock.Lock() -// defer es.sequenceLock.Unlock() -// if maxValue > es.maxCounter { -// es.maxCounter, es.steps = maxValue, maxValue-DefaultEtcdSteps -// -// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil { -// glog.Errorf("flush sequence to file failed, %v", err) -// } -// } -//} - -//func getEtcdKeysApi(etcdUrls, user, passwd string) (client.KeysAPI, error) { -// cli, err := client.New(client.Config{ -// Endpoints: strings.Split(etcdUrls, ","), -// Username: user, -// Password: passwd, -// }) -// if err != nil { -// return nil, err -// } -// keysApi := client.NewKeysAPI(cli) -// return keysApi, nil -//} - -//func (es *EtcdSequencer) asyncStartWatcher() { -// es.startWatcher(es.keysAPI, EtcdKeySequence, func(value string, index uint64) { -// newValue, err := strconv.ParseUint(value, 10, 64) -// if err != nil { -// glog.Warning(err) -// } -// es.setLocalSequence(newValue) -// }) -//} - -//func (es *EtcdSequencer) startWatcher(kvApi client.KeysAPI, key string, callback func(value string, index uint64)) { -// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) -// defer cancel() -// ctx.Done() -// -// getResp, err := kvApi.Get(ctx, key, &client.GetOptions{Recursive: false, Quorum: true}) -// if err != nil { -// return -// } -// -// watcher := kvApi.Watcher(key, &client.WatcherOptions{AfterIndex: getResp.Index, Recursive: false}) -// go func(w client.Watcher) { -// for { -// resp, err := w.Next(context.Background()) -// if err != nil { -// glog.Error(err) -// continue -// } -// callback(resp.Node.Value, resp.Index) -// } -// }(watcher) -// return -//} +// the UT helper method +// func deleteEtcdKey(kvApi client.KeysAPI, key string) error { +// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) +// defer cancel() +// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) +// if err != nil { +// return err +// } +// return nil +// } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index cde583560..fd3236c53 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -39,6 +39,9 @@ type MasterOption struct { DisableHttp bool MetricsAddress string MetricsIntervalSec int + + sequencerType string + etcdUrls string } type MasterServer struct { @@ -87,7 +90,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewMemorySequencer() + + seq := ms.createSequencer(option) + if nil == seq { + glog.Fatalf("create sequencer failed.") + } ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") @@ -230,3 +237,22 @@ func (ms *MasterServer) startAdminScripts() { } }() } + +func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { + var seq sequence.Sequencer + glog.V(0).Infof("sequencer type [%s]", option.sequencerType) + switch strings.ToLower(option.sequencerType) { + case "memory": + seq = sequence.NewMemorySequencer() + case "etcd": + var err error + seq, err = sequence.NewEtcdSequencer(option.etcdUrls, option.MetaFolder) + if err != nil { + glog.Error(err) + seq = nil + } + default: + seq = sequence.NewMemorySequencer() + } + return seq +} From 802a0eb3fe115cd213d1238912fe431601e8f102 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Mon, 11 Nov 2019 09:15:17 +0800 Subject: [PATCH 3/5] move from cmd-line to scaffold --- weed/command/master.go | 7 +------ weed/command/scaffold.go | 7 +++++++ weed/server/master_server.go | 22 ++++++++++++++-------- weed/util/config.go | 10 ++++++++++ 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/weed/command/master.go b/weed/command/master.go index 55e3409ed..d4c2b9b16 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -37,9 +37,6 @@ type MasterOptions struct { disableHttp *bool metricsAddress *string metricsIntervalSec *int - - sequencerType *string - etcdUrls *string } func init() { @@ -58,9 +55,6 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") - m.sequencerType = cmdMaster.Flag.String("sequencerType", "memory", "Choose [memory|etcd] type for store the file sequence") - m.etcdUrls = cmdMaster.Flag.String("etcdUrls", "", - "when sequencerType=etcd, set etcdUrls for etcd cluster that store file sequence, example : http://127.0.0.1:2379,http://127.0.0.1:2389") } var cmdMaster = &Command{ @@ -84,6 +78,7 @@ func runMaster(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) + glog.V(0).Infof("%v", viper.GetViper().GetString("master.maintenance.scripts")) runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*masterCpuProfile, *masterMemProfile) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 7a988cdcf..9b266a69d 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -346,5 +346,12 @@ scripts = """ """ sleep_minutes = 17 # sleep minutes between each script execution +sequencer.type = memory # Choose [memory|etcd] type for storing the file id sequence + +# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence +# example : http://127.0.0.1:2379,http://127.0.0.1:2389 +sequencer.etcd.urls = http://127.0.0.1:2379 + + ` ) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fd3236c53..41764c2e7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -27,6 +27,12 @@ import ( "google.golang.org/grpc" ) +const ( + MasterPrefix = "master.maintenance." + SequencerType = MasterPrefix + "sequencer_type" + SequencerEtcdUrls = MasterPrefix + "sequencer_etcd_urls" +) + type MasterOption struct { Port int MetaFolder string @@ -39,9 +45,6 @@ type MasterOption struct { DisableHttp bool MetricsAddress string MetricsIntervalSec int - - sequencerType string - etcdUrls string } type MasterServer struct { @@ -172,8 +175,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy.Transport = util.Transport proxy.ServeHTTP(w, r) } else { - //drop it to the floor - //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + // drop it to the floor + // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) } } } @@ -240,13 +243,16 @@ func (ms *MasterServer) startAdminScripts() { func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { var seq sequence.Sequencer - glog.V(0).Infof("sequencer type [%s]", option.sequencerType) - switch strings.ToLower(option.sequencerType) { + seqType := strings.ToLower(util.Config().GetString(SequencerType)) + glog.V(0).Infof("sequencer type [%s]", seqType) + switch strings.ToLower(seqType) { case "memory": seq = sequence.NewMemorySequencer() case "etcd": var err error - seq, err = sequence.NewEtcdSequencer(option.etcdUrls, option.MetaFolder) + urls := util.Config().GetString(SequencerEtcdUrls) + glog.V(4).Infof("ETCD urls : %s", urls) + seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) if err != nil { glog.Error(err) seq = nil diff --git a/weed/util/config.go b/weed/util/config.go index 1ea833d1f..f51955263 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -40,5 +40,15 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { } return true +} +func Config() Configuration { + return viper.GetViper() } + +func SubConfig(subKey string) Configuration { + if subKey != "" { + return viper.GetViper().Sub(subKey) + } + return viper.GetViper() +} \ No newline at end of file From 62d393d6c90362bd596bd841266ca9390fe3bb86 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Mon, 11 Nov 2019 10:52:21 +0800 Subject: [PATCH 4/5] ut pass --- weed/command/master.go | 1 - weed/command/scaffold.go | 4 ++-- weed/server/master_server.go | 10 +++++----- weed/util/config.go | 12 +++++++++--- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/weed/command/master.go b/weed/command/master.go index d4c2b9b16..3d33f4f7a 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -78,7 +78,6 @@ func runMaster(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) - glog.V(0).Infof("%v", viper.GetViper().GetString("master.maintenance.scripts")) runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*masterCpuProfile, *masterMemProfile) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 9b266a69d..6fa72c730 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -346,11 +346,11 @@ scripts = """ """ sleep_minutes = 17 # sleep minutes between each script execution -sequencer.type = memory # Choose [memory|etcd] type for storing the file id sequence +sequencer_type = memory # Choose [memory|etcd] type for storing the file id sequence # when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence # example : http://127.0.0.1:2379,http://127.0.0.1:2389 -sequencer.etcd.urls = http://127.0.0.1:2379 +sequencer_etcd_urls = http://127.0.0.1:2379 ` diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 41764c2e7..15e6ee51c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -28,9 +28,9 @@ import ( ) const ( - MasterPrefix = "master.maintenance." - SequencerType = MasterPrefix + "sequencer_type" - SequencerEtcdUrls = MasterPrefix + "sequencer_etcd_urls" + MasterPrefix = "master.maintenance" + SequencerType = MasterPrefix + ".sequencer_type" + SequencerEtcdUrls = MasterPrefix + ".sequencer_etcd_urls" ) type MasterOption struct { @@ -244,14 +244,14 @@ func (ms *MasterServer) startAdminScripts() { func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { var seq sequence.Sequencer seqType := strings.ToLower(util.Config().GetString(SequencerType)) - glog.V(0).Infof("sequencer type [%s]", seqType) + glog.V(0).Infof("[%s] : [%s]", SequencerType, seqType) switch strings.ToLower(seqType) { case "memory": seq = sequence.NewMemorySequencer() case "etcd": var err error urls := util.Config().GetString(SequencerEtcdUrls) - glog.V(4).Infof("ETCD urls : %s", urls) + glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) if err != nil { glog.Error(err) diff --git a/weed/util/config.go b/weed/util/config.go index f51955263..385ef92d7 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -1,6 +1,8 @@ package util import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" ) @@ -46,9 +48,13 @@ func Config() Configuration { return viper.GetViper() } -func SubConfig(subKey string) Configuration { +func SubConfig(subKey string) (Configuration, error) { if subKey != "" { - return viper.GetViper().Sub(subKey) + sub := viper.GetViper().Sub(subKey) + if sub == nil { + return nil, fmt.Errorf("sub config [%s] not exist", subKey) + } + return sub, nil } - return viper.GetViper() + return viper.GetViper(), nil } \ No newline at end of file From d07701fa757991df6397ffde5887a70e5361d1f4 Mon Sep 17 00:00:00 2001 From: stlpmo Date: Mon, 11 Nov 2019 18:08:48 +0800 Subject: [PATCH 5/5] delete unused function --- weed/util/config.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/weed/util/config.go b/weed/util/config.go index 385ef92d7..7e2f9b373 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -1,8 +1,6 @@ package util import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" ) @@ -48,13 +46,3 @@ func Config() Configuration { return viper.GetViper() } -func SubConfig(subKey string) (Configuration, error) { - if subKey != "" { - sub := viper.GetViper().Sub(subKey) - if sub == nil { - return nil, fmt.Errorf("sub config [%s] not exist", subKey) - } - return sub, nil - } - return viper.GetViper(), nil -} \ No newline at end of file