diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml index 020f48e36..363493db3 100644 --- a/weed/command/scaffold/master.toml +++ b/weed/command/scaffold/master.toml @@ -23,10 +23,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs [master.sequencer] -type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence -# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence -# example : http://127.0.0.1:2379,http://127.0.0.1:2389 -sequencer_etcd_urls = "http://127.0.0.1:2379" +type = "raft" # Choose [raft|snowflake] type for storing the file id sequence # when sequencer.type = snowflake, the snowflake id must be different from other masters sequencer_snowflake_id = 0 # any number between 1~1023 diff --git a/weed/sequence/etcd_sequencer.go b/weed/sequence/etcd_sequencer.go deleted file mode 100644 index a9f2bb97f..000000000 --- a/weed/sequence/etcd_sequencer.go +++ /dev/null @@ -1,296 +0,0 @@ -package sequence - -/* -Note : -(1) store the sequence in the ETCD cluster, and local file(sequence.dat) -(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file -(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId. -*/ - -import ( - "context" - "fmt" - "go.etcd.io/etcd/client" - "sync" - "time" - - "io" - "os" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" -) - -const ( - // EtcdKeyPrefix = "/seaweedfs" - EtcdKeySequence = "/master/sequence" - EtcdContextTimeoutSecond = 100 * time.Second - DefaultEtcdSteps uint64 = 500 // internal counter - SequencerFileName = "sequencer.dat" - FileMaxSequenceLength = 128 -) - -type EtcdSequencer struct { - sequenceLock sync.Mutex - - // available sequence range : [currentSeqId, maxSeqId) - currentSeqId uint64 - maxSeqId uint64 - - keysAPI client.KeysAPI - seqFile *os.File -} - -func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) { - file, err := openSequenceFile(metaFolder + "/" + SequencerFileName) - if nil != err { - return nil, fmt.Errorf("open sequence file fialed, %v", err) - } - - cli, err := client.New(client.Config{ - Endpoints: strings.Split(etcdUrls, ","), - Username: "", - Password: "", - }) - if err != nil { - return nil, err - } - keysApi := client.NewKeysAPI(cli) - - // TODO: the current sequence id in local file is not used - maxValue, _, err := readSequenceFile(file) - if err != nil { - return nil, fmt.Errorf("read sequence from file failed, %v", err) - } - glog.V(4).Infof("read sequence from file : %d", maxValue) - - newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue) - if err != nil { - return nil, err - } - - sequencer := &EtcdSequencer{maxSeqId: newSeq, - currentSeqId: newSeq, - keysAPI: keysApi, - seqFile: file, - } - return sequencer, nil -} - -func (es *EtcdSequencer) NextFileId(count uint64) uint64 { - es.sequenceLock.Lock() - defer es.sequenceLock.Unlock() - - if (es.currentSeqId + count) >= es.maxSeqId { - reqSteps := DefaultEtcdSteps - if count > DefaultEtcdSteps { - reqSteps += count - } - maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps) - glog.V(4).Infof("get max sequence id from etcd, %d", maxId) - if err != nil { - glog.Error(err) - return 0 - } - es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId - glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId) - - if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil { - glog.Errorf("flush sequence to file failed, %v", err) - } - } - - ret := es.currentSeqId - es.currentSeqId += count - return ret -} - -/** -instead of collecting the max value from volume server, -the max value should be saved in local config file and ETCD cluster -*/ -func (es *EtcdSequencer) SetMax(seenValue uint64) { - es.sequenceLock.Lock() - defer es.sequenceLock.Unlock() - if seenValue > es.maxSeqId { - maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue) - if err != nil { - glog.Errorf("set Etcd Max sequence failed : %v", err) - return - } - es.currentSeqId, es.maxSeqId = maxId, maxId - - if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil { - glog.Errorf("flush sequence to file failed, %v", err) - } - } -} - -func (es *EtcdSequencer) GetMax() uint64 { - return es.maxSeqId -} - -func (es *EtcdSequencer) Peek() uint64 { - return es.currentSeqId -} - -func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) { - if step <= 0 { - return 0, fmt.Errorf("the step must be large than 1") - } - - ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) - var endSeqValue uint64 = 0 - defer cancel() - for { - getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true}) - if err != nil { - return 0, err - } - if getResp.Node == nil { - continue - } - - prevValue := getResp.Node.Value - prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64) - if err != nil { - return 0, fmt.Errorf("get sequence from etcd failed, %v", err) - } - endSeqValue = prevSeqValue + step - endSeqStr := strconv.FormatUint(endSeqValue, 10) - - _, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue}) - if err == nil { - break - } - glog.Error(err) - } - - return endSeqValue, nil -} - -/** -update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq, -when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq, -return the value of EtcdKeySequence in the ETCD cluster; -when the value of the EtcdKeySequence is less than the parameter maxSeq, -return the value of the parameter maxSeq -*/ -func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { - maxSeqStr := strconv.FormatUint(maxSeq, 10) - ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) - defer cancel() - - for { - getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true}) - if err != nil { - if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) { - _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr) - if err == nil { - continue - } - if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) { - continue - } - return 0, err - } else { - return 0, err - } - } - - if getResp.Node == nil { - continue - } - prevSeqStr := getResp.Node.Value - prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64) - if err != nil { - return 0, err - } - if prevSeq >= maxSeq { - return prevSeq, nil - } - - _, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr}) - if err != nil { - return 0, err - } - } -} - -func openSequenceFile(file string) (*os.File, error) { - _, err := os.Stat(file) - if os.IsNotExist(err) { - fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - if err := writeSequenceFile(fid, 1, 0); err != nil { - return nil, err - } - return fid, nil - } else { - return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644) - } -} - -/* -read sequence and step from sequence file -*/ -func readSequenceFile(file *os.File) (uint64, uint64, error) { - sequence := make([]byte, FileMaxSequenceLength) - size, err := file.ReadAt(sequence, 0) - if (err != nil) && (err != io.EOF) { - err := fmt.Errorf("cannot read file %s, %v", file.Name(), err) - return 0, 0, err - } - sequence = sequence[0:size] - seqs := strings.Split(string(sequence), ":") - maxId, err := strconv.ParseUint(seqs[0], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err) - } - - if len(seqs) > 1 { - step, err := strconv.ParseUint(seqs[1], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err) - } - return maxId, step, nil - } - - return maxId, 0, nil -} - -/** -write the sequence and step to sequence file -*/ -func writeSequenceFile(file *os.File, sequence, step uint64) error { - _ = step - seqStr := fmt.Sprintf("%d:%d", sequence, sequence) - if _, err := file.Seek(0, 0); err != nil { - err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) - return err - } - if err := file.Truncate(0); err != nil { - return fmt.Errorf("truncate sequence file faield : %v", err) - } - if _, err := file.WriteString(seqStr); err != nil { - return fmt.Errorf("write file %s failed, %v", file.Name(), err) - } - if err := file.Sync(); err != nil { - return fmt.Errorf("flush file %s failed, %v", file.Name(), err) - } - return nil -} - -// the UT helper method -// func deleteEtcdKey(kvApi client.KeysAPI, key string) error { -// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond) -// defer cancel() -// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false}) -// if err != nil { -// return err -// } -// return nil -// } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 8de01abf7..3b3b1c94b 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -28,7 +28,6 @@ import ( const ( SequencerType = "master.sequencer.type" - SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) @@ -286,15 +285,6 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer seqType := strings.ToLower(v.GetString(SequencerType)) glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) switch strings.ToLower(seqType) { - case "etcd": - var err error - urls := v.GetString(SequencerEtcdUrls) - glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) - seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) - if err != nil { - glog.Error(err) - seq = nil - } case "snowflake": var err error snowflakeId := v.GetInt(SequencerSnowflakeId)