From 14dd97189011d2a8802d5c9cc1726802cf19f2b2 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 4 Apr 2022 17:51:51 +0500 Subject: [PATCH] hashicorp raft with state machine --- docker/Makefile | 3 ++ .../compose/local-hashicorp-raft-compose.yml | 20 ++++---- weed/command/master.go | 29 ++++++----- weed/server/master_server.go | 42 ++++++++++++---- weed/server/raft_hashicorp.go | 49 ++++++++++++------- weed/server/raft_server.go | 2 +- weed/topology/cluster_commands.go | 1 + weed/topology/topology.go | 25 ++++++++-- 8 files changed, 117 insertions(+), 54 deletions(-) diff --git a/docker/Makefile b/docker/Makefile index 446bb5b47..76cdf75c8 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -55,6 +55,9 @@ cluster: build 2clusters: build docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up +hashicorp_raft: build + docker-compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up + s3tests: build s3tests_build docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up diff --git a/docker/compose/local-hashicorp-raft-compose.yml b/docker/compose/local-hashicorp-raft-compose.yml index 5ed591027..850c1ae6f 100644 --- a/docker/compose/local-hashicorp-raft-compose.yml +++ b/docker/compose/local-hashicorp-raft-compose.yml @@ -6,9 +6,9 @@ services: ports: - 9333:9333 - 19333:19333 - command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master0 -port=9333 master1:9334,master2:9335 -mdir=/data" - volumes: - - ./master/0:/data + command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master0 -port=9333 master1:9334,master2:9335 -mdir=/data" + #volumes: + # - ./master/0:/data environment: WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 @@ -18,9 +18,9 @@ services: ports: - 9334:9334 - 19334:19334 - command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data" - volumes: - - ./master/0:/data + command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data" + #volumes: + # - ./master/0:/data environment: WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 @@ -30,9 +30,9 @@ services: ports: - 9335:9335 - 19335:19335 - command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data" - volumes: - - ./master/0:/data + command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data" + #volumes: + # - ./master/0:/data environment: WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 @@ -91,4 +91,4 @@ services: - master2 - volume1 - volume2 - - filer + - filer \ No newline at end of file diff --git a/weed/command/master.go b/weed/command/master.go index 459d3e1cb..bbf9b1de2 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,7 +1,6 @@ package command import ( - "context" "net/http" "os" "sort" @@ -166,13 +165,14 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { var raftServer *weed_server.RaftServer var err error if *m.raftHashicorp { - ctx := context.Background() - raftServer, err = weed_server.NewHashicorpRaftServer(ctx, raftServerOption) + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } } else { raftServer, err = weed_server.NewRaftServer(raftServerOption) - } - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") @@ -199,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b63e3a418..37e7f245c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -160,19 +160,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index caef42f62..2ee92331b 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -4,7 +4,6 @@ package weed_server // https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 import ( - "context" "fmt" transport "github.com/Jille/raft-grpc-transport" "github.com/chrislusf/seaweedfs/weed/glog" @@ -17,7 +16,7 @@ import ( "time" ) -func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*RaftServer, error) { +func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, serverAddr: option.ServerAddr, @@ -27,18 +26,20 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf c := raft.DefaultConfig() c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change + c.NoSnapshotRestoreOnStart = option.RaftResumeState c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) c.ElectionTimeout = option.ElectionTimeout + if c.LeaderLeaseTimeout > c.HeartbeatTimeout { + c.LeaderLeaseTimeout = c.HeartbeatTimeout + } if glog.V(4) { - c.Logger.SetLevel(1) - } else if glog.V(3) { - c.Logger.SetLevel(2) + c.LogLevel = "Debug" } else if glog.V(2) { - c.Logger.SetLevel(3) + c.LogLevel = "Info" } else if glog.V(1) { - c.Logger.SetLevel(4) + c.LogLevel = "Warn" } else if glog.V(0) { - c.Logger.SetLevel(5) + c.LogLevel = "Error" } baseDir := s.dataDir @@ -58,41 +59,55 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) } - // s.GrpcServer = raft.NewGrpcServer(s.raftServer) s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) stateMachine := StateMachine{topo: option.Topo} - r, err := raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) if err != nil { return nil, fmt.Errorf("raft.NewRaft: %v", err) } - if option.RaftBootstrap { cfg := raft.Configuration{ Servers: []raft.Server{ { Suffrage: raft.Voter, ID: c.LocalID, - Address: raft.ServerAddress(s.serverAddr), + Address: raft.ServerAddress(s.serverAddr.ToGrpcAddress()), }, }, } // Add known peers to bootstrap - for _, node := range option.Peers { - if node == option.ServerAddr { + for _, peer := range option.Peers { + if peer == option.ServerAddr { continue } cfg.Servers = append(cfg.Servers, raft.Server{ Suffrage: raft.Voter, - ID: raft.ServerID(node), - Address: raft.ServerAddress(node), + ID: raft.ServerID(peer), + Address: raft.ServerAddress(peer.ToGrpcAddress()), }) } - f := r.BootstrapCluster(cfg) + f := s.RaftHashicorp.BootstrapCluster(cfg) if err := f.Error(); err != nil { return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } + ticker := time.NewTicker(c.HeartbeatTimeout * 10) + if glog.V(4) { + go func() { + for { + select { + case <-ticker.C: + cfuture := s.RaftHashicorp.GetConfiguration() + if err = cfuture.Error(); err != nil { + glog.Fatalf("error getting config: %s", err) + } + configuration := cfuture.Configuration() + glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers) + } + } + }() + } return s, nil } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 5ed3724e2..3fedc843e 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -36,7 +36,7 @@ type RaftServerOption struct { type RaftServer struct { peers map[string]pb.ServerAddress // initial peers to join with raftServer raft.Server - raftHashicorp hashicorpRaft.Raft + RaftHashicorp *hashicorpRaft.Raft TransportManager *transport.Manager dataDir string serverAddr pb.ServerAddress diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 951de0711..1bcc6b449 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -23,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string { return "MaxVolumeId" } +// deprecatedCommandApply represents the old interface to apply a command to the server. func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { topo := server.Context().(*Topology) before := topo.GetMaxVolumeId() diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 207c89ad7..d636e8a9e 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -1,6 +1,7 @@ package topology import ( + "encoding/json" "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" @@ -10,6 +11,7 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -40,7 +42,8 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server + RaftServer raft.Server + HashicorpRaft *hashicorpRaft.Raft } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool { return true } } + } else if t.HashicorpRaft != nil { + if t.HashicorpRaft.State() == hashicorpRaft.Leader { + return true + } } return false } @@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) { for count := 0; count < 3; count++ { if t.RaftServer != nil { l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) } else { return "", errors.New("Raft Server not ready yet!") } @@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { - return 0, err + if t.RaftServer != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + } else if t.HashicorpRaft != nil { + b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + if err != nil { + return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) + } + if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil { + return 0, future.Error() + } } return next, nil }