Browse Source

Merge pull request #1102 from stlpmo-jn/add_ETCD_sequencer

Add etcd sequencer
pull/1115/head
Chris Lu 5 years ago
committed by GitHub
parent
commit
d7852ebb3b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      weed/command/scaffold.go
  2. 296
      weed/sequence/etcd_sequencer.go
  3. 38
      weed/server/master_server.go
  4. 4
      weed/util/config.go

7
weed/command/scaffold.go

@ -346,5 +346,12 @@ scripts = """
"""
sleep_minutes = 17 # sleep minutes between each script execution
sequencer_type = memory # Choose [memory|etcd] type for storing the file id sequence
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
# example : http://127.0.0.1:2379,http://127.0.0.1:2389
sequencer_etcd_urls = http://127.0.0.1:2379
`
)

296
weed/sequence/etcd_sequencer.go

@ -0,0 +1,296 @@
package sequence
/*
Note :
(1) store the sequence in the ETCD cluster, and local file(sequence.dat)
(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
*/
import (
"context"
"fmt"
"sync"
"time"
"io"
"os"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"go.etcd.io/etcd/client"
)
const (
// EtcdKeyPrefix = "/seaweedfs"
EtcdKeySequence = "/master/sequence"
EtcdContextTimeoutSecond = 100 * time.Second
DefaultEtcdSteps uint64 = 500 // internal counter
SequencerFileName = "sequencer.dat"
FileMaxSequenceLength = 128
)
type EtcdSequencer struct {
sequenceLock sync.Mutex
// available sequence range : [currentSeqId, maxSeqId)
currentSeqId uint64
maxSeqId uint64
keysAPI client.KeysAPI
seqFile *os.File
}
func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
file, err := openSequenceFile(metaFolder + "/" + SequencerFileName)
if nil != err {
return nil, fmt.Errorf("open sequence file fialed, %v", err)
}
cli, err := client.New(client.Config{
Endpoints: strings.Split(etcdUrls, ","),
Username: "",
Password: "",
})
if err != nil {
return nil, err
}
keysApi := client.NewKeysAPI(cli)
// TODO: the current sequence id in local file is not used
maxValue, _, err := readSequenceFile(file)
if err != nil {
return nil, fmt.Errorf("read sequence from file failed, %v", err)
}
glog.V(4).Infof("read sequence from file : %d", maxValue)
newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue)
if err != nil {
return nil, err
}
sequencer := &EtcdSequencer{maxSeqId: newSeq,
currentSeqId: newSeq,
keysAPI: keysApi,
seqFile: file,
}
return sequencer, nil
}
func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
es.sequenceLock.Lock()
defer es.sequenceLock.Unlock()
if (es.currentSeqId + count) >= es.maxSeqId {
reqSteps := DefaultEtcdSteps
if count > DefaultEtcdSteps {
reqSteps += count
}
maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
if err != nil {
glog.Error(err)
return 0
}
es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err)
}
}
ret := es.currentSeqId
es.currentSeqId += count
return ret
}
/**
instead of collecting the max value from volume server,
the max value should be saved in local config file and ETCD cluster
*/
func (es *EtcdSequencer) SetMax(seenValue uint64) {
es.sequenceLock.Lock()
defer es.sequenceLock.Unlock()
if seenValue > es.maxSeqId {
maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
if err != nil {
glog.Errorf("set Etcd Max sequence failed : %v", err)
return
}
es.currentSeqId, es.maxSeqId = maxId, maxId
if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err)
}
}
}
func (es *EtcdSequencer) GetMax() uint64 {
return es.maxSeqId
}
func (es *EtcdSequencer) Peek() uint64 {
return es.currentSeqId
}
func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
if step <= 0 {
return 0, fmt.Errorf("the step must be large than 1")
}
ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
var endSeqValue uint64 = 0
defer cancel()
for {
getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
if err != nil {
return 0, err
}
if getResp.Node == nil {
continue
}
prevValue := getResp.Node.Value
prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64)
if err != nil {
return 0, fmt.Errorf("get sequence from etcd failed, %v", err)
}
endSeqValue = prevSeqValue + step
endSeqStr := strconv.FormatUint(endSeqValue, 10)
_, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue})
if err == nil {
break
}
glog.Error(err)
}
return endSeqValue, nil
}
/**
update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
return the value of EtcdKeySequence in the ETCD cluster;
when the value of the EtcdKeySequence is less than the parameter maxSeq,
return the value of the parameter maxSeq
*/
func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
maxSeqStr := strconv.FormatUint(maxSeq, 10)
ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
defer cancel()
for {
getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
if err != nil {
if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
_, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
if err == nil {
continue
}
if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
continue
}
return 0, err
} else {
return 0, err
}
}
if getResp.Node == nil {
continue
}
prevSeqStr := getResp.Node.Value
prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64)
if err != nil {
return 0, err
}
if prevSeq >= maxSeq {
return prevSeq, nil
}
_, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr})
if err != nil {
return 0, err
}
}
}
func openSequenceFile(file string) (*os.File, error) {
_, err := os.Stat(file)
if os.IsNotExist(err) {
fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
if err := writeSequenceFile(fid, 1, 0); err != nil {
return nil, err
}
return fid, nil
} else {
return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
}
}
/*
read sequence and step from sequence file
*/
func readSequenceFile(file *os.File) (uint64, uint64, error) {
sequence := make([]byte, FileMaxSequenceLength)
size, err := file.ReadAt(sequence, 0)
if (err != nil) && (err != io.EOF) {
err := fmt.Errorf("cannot read file %s, %v", file.Name(), err)
return 0, 0, err
}
sequence = sequence[0:size]
seqs := strings.Split(string(sequence), ":")
maxId, err := strconv.ParseUint(seqs[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
}
if len(seqs) > 1 {
step, err := strconv.ParseUint(seqs[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
}
return maxId, step, nil
}
return maxId, 0, nil
}
/**
write the sequence and step to sequence file
*/
func writeSequenceFile(file *os.File, sequence, step uint64) error {
_ = step
seqStr := fmt.Sprintf("%d:%d", sequence, sequence)
if _, err := file.Seek(0, 0); err != nil {
err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
return err
}
if err := file.Truncate(0); err != nil {
return fmt.Errorf("truncate sequence file faield : %v", err)
}
if _, err := file.WriteString(seqStr); err != nil {
return fmt.Errorf("write file %s failed, %v", file.Name(), err)
}
if err := file.Sync(); err != nil {
return fmt.Errorf("flush file %s failed, %v", file.Name(), err)
}
return nil
}
// the UT helper method
// func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
// defer cancel()
// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
// if err != nil {
// return err
// }
// return nil
// }

38
weed/server/master_server.go

@ -27,6 +27,12 @@ import (
"google.golang.org/grpc"
)
const (
MasterPrefix = "master.maintenance"
SequencerType = MasterPrefix + ".sequencer_type"
SequencerEtcdUrls = MasterPrefix + ".sequencer_etcd_urls"
)
type MasterOption struct {
Port int
MetaFolder string
@ -87,7 +93,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
seq := ms.createSequencer(option)
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@ -165,8 +175,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
//drop it to the floor
//writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
// drop it to the floor
// writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
@ -230,3 +240,25 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
seqType := strings.ToLower(util.Config().GetString(SequencerType))
glog.V(0).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
case "memory":
seq = sequence.NewMemorySequencer()
case "etcd":
var err error
urls := util.Config().GetString(SequencerEtcdUrls)
glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
if err != nil {
glog.Error(err)
seq = nil
}
default:
seq = sequence.NewMemorySequencer()
}
return seq
}

4
weed/util/config.go

@ -40,5 +40,9 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
}
return true
}
func Config() Configuration {
return viper.GetViper()
}
Loading…
Cancel
Save