Browse Source

delete the var etcdClient and comments

refactor the code
add sequencer cmd-line
delete nerver used codes
pull/1102/head
stlpmo 5 years ago
parent
commit
1c8bed3810
  1. 6
      weed/command/master.go
  2. 198
      weed/sequence/etcd_sequencer.go
  3. 28
      weed/server/master_server.go

6
weed/command/master.go

@ -37,6 +37,9 @@ type MasterOptions struct {
disableHttp *bool disableHttp *bool
metricsAddress *string metricsAddress *string
metricsIntervalSec *int metricsIntervalSec *int
sequencerType *string
etcdUrls *string
} }
func init() { func init() {
@ -55,6 +58,9 @@ func init() {
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
m.sequencerType = cmdMaster.Flag.String("sequencerType", "memory", "Choose [memory|etcd] type for store the file sequence")
m.etcdUrls = cmdMaster.Flag.String("etcdUrls", "",
"when sequencerType=etcd, set etcdUrls for etcd cluster that store file sequence, example : http://127.0.0.1:2379,http://127.0.0.1:2389")
} }
var cmdMaster = &Command{ var cmdMaster = &Command{

198
weed/sequence/etcd_sequencer.go

@ -1,21 +1,30 @@
package sequence package sequence
/*
Note :
(1) store the sequence in the ETCD cluster, and local file(sequence.dat)
(2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
(3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
*/
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"go.etcd.io/etcd/client"
"sync"
"time"
"io" "io"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"go.etcd.io/etcd/client"
) )
const ( const (
// EtcdKeyPrefix = "/seaweedfs"
EtcdKeySequence = "/master/sequence" EtcdKeySequence = "/master/sequence"
EtcdKeyPrefix = "/seaweedfs"
EtcdContextTimeoutSecond = 100 * time.Second EtcdContextTimeoutSecond = 100 * time.Second
DefaultEtcdSteps uint64 = 500 // internal counter DefaultEtcdSteps uint64 = 500 // internal counter
SequencerFileName = "sequencer.dat" SequencerFileName = "sequencer.dat"
@ -25,13 +34,12 @@ const (
type EtcdSequencer struct { type EtcdSequencer struct {
sequenceLock sync.Mutex sequenceLock sync.Mutex
// available sequence range : [steps, maxCounter)
maxCounter uint64
steps uint64
// available sequence range : [currentSeqId, maxSeqId)
currentSeqId uint64
maxSeqId uint64
etcdClient client.Client
keysAPI client.KeysAPI
seqFile *os.File
keysAPI client.KeysAPI
seqFile *os.File
} }
func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) { func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
@ -50,6 +58,7 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error
} }
keysApi := client.NewKeysAPI(cli) keysApi := client.NewKeysAPI(cli)
// TODO: the current sequence id in local file is not used
maxValue, _, err := readSequenceFile(file) maxValue, _, err := readSequenceFile(file)
if err != nil { if err != nil {
return nil, fmt.Errorf("read sequence from file failed, %v", err) return nil, fmt.Errorf("read sequence from file failed, %v", err)
@ -61,22 +70,19 @@ func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error
return nil, err return nil, err
} }
// make the step and max the same, and then they are fake,
// after invoking the NextFileId(), they are different and real
maxCounter, steps := newSeq, newSeq
sequencer := &EtcdSequencer{maxCounter: maxCounter,
steps: steps,
etcdClient: cli,
keysAPI: keysApi,
seqFile: file,
sequencer := &EtcdSequencer{maxSeqId: newSeq,
currentSeqId: newSeq,
keysAPI: keysApi,
seqFile: file,
} }
return sequencer, nil return sequencer, nil
} }
func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) {
func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
es.sequenceLock.Lock() es.sequenceLock.Lock()
defer es.sequenceLock.Unlock() defer es.sequenceLock.Unlock()
if (es.steps + count) >= es.maxCounter {
if (es.currentSeqId + count) >= es.maxSeqId {
reqSteps := DefaultEtcdSteps reqSteps := DefaultEtcdSteps
if count > DefaultEtcdSteps { if count > DefaultEtcdSteps {
reqSteps += count reqSteps += count
@ -85,18 +91,19 @@ func (es *EtcdSequencer) NextFileId(count uint64) (new uint64, cnt uint64) {
glog.V(4).Infof("get max sequence id from etcd, %d", maxId) glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
if err != nil { if err != nil {
glog.Error(err) glog.Error(err)
return 0, 0
return 0
} }
es.steps, es.maxCounter = maxId-reqSteps, maxId
glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter)
es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
glog.V(4).Infof("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err) glog.Errorf("flush sequence to file failed, %v", err)
} }
} }
ret := es.steps
es.steps += count
return ret, count
ret := es.currentSeqId
es.currentSeqId += count
return ret
} }
/** /**
@ -106,13 +113,13 @@ the max value should be saved in local config file and ETCD cluster
func (es *EtcdSequencer) SetMax(seenValue uint64) { func (es *EtcdSequencer) SetMax(seenValue uint64) {
es.sequenceLock.Lock() es.sequenceLock.Lock()
defer es.sequenceLock.Unlock() defer es.sequenceLock.Unlock()
if seenValue > es.maxCounter {
if seenValue > es.maxSeqId {
maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue) maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
if err != nil { if err != nil {
glog.Errorf("set Etcd Max sequence failed : %v", err) glog.Errorf("set Etcd Max sequence failed : %v", err)
return return
} }
es.steps, es.maxCounter = maxId, maxId
es.currentSeqId, es.maxSeqId = maxId, maxId
if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil { if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
glog.Errorf("flush sequence to file failed, %v", err) glog.Errorf("flush sequence to file failed, %v", err)
@ -121,11 +128,11 @@ func (es *EtcdSequencer) SetMax(seenValue uint64) {
} }
func (es *EtcdSequencer) GetMax() uint64 { func (es *EtcdSequencer) GetMax() uint64 {
return es.maxCounter
return es.maxSeqId
} }
func (es *EtcdSequencer) Peek() uint64 { func (es *EtcdSequencer) Peek() uint64 {
return es.steps
return es.currentSeqId
} }
func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) { func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
@ -164,8 +171,11 @@ func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error)
} }
/** /**
update the key of EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
until the value of EtcdKeySequence is equal to or larger than the maxSeq
update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
return the value of EtcdKeySequence in the ETCD cluster;
when the value of the EtcdKeySequence is less than the parameter maxSeq,
return the value of the parameter maxSeq
*/ */
func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) { func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
maxSeqStr := strconv.FormatUint(maxSeq, 10) maxSeqStr := strconv.FormatUint(maxSeq, 10)
@ -178,10 +188,10 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) { if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
_, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr) _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
if err == nil { if err == nil {
continue // create ETCD key success, retry get ETCD value
continue
} }
if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) { if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
continue // ETCD key exist, retry get ETCD value
continue
} }
return 0, err return 0, err
} else { } else {
@ -206,8 +216,6 @@ func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
return 0, err return 0, err
} }
} }
return maxSeq, nil
} }
func openSequenceFile(file string) (*os.File, error) { func openSequenceFile(file string) (*os.File, error) {
@ -227,7 +235,7 @@ func openSequenceFile(file string) (*os.File, error) {
} }
/* /*
sequence : step 以冒号分割
read sequence and step from sequence file
*/ */
func readSequenceFile(file *os.File) (uint64, uint64, error) { func readSequenceFile(file *os.File) (uint64, uint64, error) {
sequence := make([]byte, FileMaxSequenceLength) sequence := make([]byte, FileMaxSequenceLength)
@ -255,7 +263,7 @@ func readSequenceFile(file *os.File) (uint64, uint64, error) {
} }
/** /**
先不存放step到文件中
write the sequence and step to sequence file
*/ */
func writeSequenceFile(file *os.File, sequence, step uint64) error { func writeSequenceFile(file *os.File, sequence, step uint64) error {
_ = step _ = step
@ -276,103 +284,13 @@ func writeSequenceFile(file *os.File, sequence, step uint64) error {
return nil return nil
} }
func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
defer cancel()
_, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
if err != nil {
return err
}
return nil
}
//func (es *EtcdSequencer) Load() error {
// es.sequenceLock.Lock()
// defer es.sequenceLock.Unlock()
// reqSteps := DefaultEtcdSteps
// maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
// glog.V(4).Infof("get max sequence id from etcd, %d", maxId)
// if err != nil {
// glog.Error(err)
// return err
// }
// es.steps, es.maxCounter = maxId-reqSteps, maxId
// glog.V(4).Infof("current id : %d, max id : %d", es.steps, es.maxCounter)
//
// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
// glog.Errorf("flush sequence to file failed, %v", err)
// return err
// }
// return nil
//}
//func getEtcdKey(kv client.KeysAPI, key string) (string, error) {
// resp, err := kv.Get(context.Background(), key, &client.GetOptions{Recursive: false, Quorum: true})
// if err != nil {
// glog.Warningf("key:%s result:%v", EtcdKeySequence, err)
// return "", err
// }
// if resp.Node == nil {
// return "", fmt.Errorf("the key is not exist, %s", key)
// }
// return resp.Node.Value, nil
//}
//func (es *EtcdSequencer) setLocalSequence(maxValue uint64) {
// es.sequenceLock.Lock()
// defer es.sequenceLock.Unlock()
// if maxValue > es.maxCounter {
// es.maxCounter, es.steps = maxValue, maxValue-DefaultEtcdSteps
//
// if err := writeSequenceFile(es.seqFile, es.maxCounter, es.steps); err != nil {
// glog.Errorf("flush sequence to file failed, %v", err)
// }
// }
//}
//func getEtcdKeysApi(etcdUrls, user, passwd string) (client.KeysAPI, error) {
// cli, err := client.New(client.Config{
// Endpoints: strings.Split(etcdUrls, ","),
// Username: user,
// Password: passwd,
// })
// if err != nil {
// return nil, err
// }
// keysApi := client.NewKeysAPI(cli)
// return keysApi, nil
//}
//func (es *EtcdSequencer) asyncStartWatcher() {
// es.startWatcher(es.keysAPI, EtcdKeySequence, func(value string, index uint64) {
// newValue, err := strconv.ParseUint(value, 10, 64)
// if err != nil {
// glog.Warning(err)
// }
// es.setLocalSequence(newValue)
// })
//}
//func (es *EtcdSequencer) startWatcher(kvApi client.KeysAPI, key string, callback func(value string, index uint64)) {
// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
// defer cancel()
// ctx.Done()
//
// getResp, err := kvApi.Get(ctx, key, &client.GetOptions{Recursive: false, Quorum: true})
// if err != nil {
// return
// }
//
// watcher := kvApi.Watcher(key, &client.WatcherOptions{AfterIndex: getResp.Index, Recursive: false})
// go func(w client.Watcher) {
// for {
// resp, err := w.Next(context.Background())
// if err != nil {
// glog.Error(err)
// continue
// }
// callback(resp.Node.Value, resp.Index)
// }
// }(watcher)
// return
//}
// the UT helper method
// func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
// ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
// defer cancel()
// _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
// if err != nil {
// return err
// }
// return nil
// }

28
weed/server/master_server.go

@ -39,6 +39,9 @@ type MasterOption struct {
DisableHttp bool DisableHttp bool
MetricsAddress string MetricsAddress string
MetricsIntervalSec int MetricsIntervalSec int
sequencerType string
etcdUrls string
} }
type MasterServer struct { type MasterServer struct {
@ -87,7 +90,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
} }
ms.bounedLeaderChan = make(chan int, 16) ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
seq := ms.createSequencer(option)
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth() ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@ -230,3 +237,22 @@ func (ms *MasterServer) startAdminScripts() {
} }
}() }()
} }
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
glog.V(0).Infof("sequencer type [%s]", option.sequencerType)
switch strings.ToLower(option.sequencerType) {
case "memory":
seq = sequence.NewMemorySequencer()
case "etcd":
var err error
seq, err = sequence.NewEtcdSequencer(option.etcdUrls, option.MetaFolder)
if err != nil {
glog.Error(err)
seq = nil
}
default:
seq = sequence.NewMemorySequencer()
}
return seq
}
Loading…
Cancel
Save