Browse Source

hashicorp raft with state machine

pull/2868/head
Konstantin Lebedev 3 years ago
parent
commit
14dd971890
  1. 3
      docker/Makefile
  2. 18
      docker/compose/local-hashicorp-raft-compose.yml
  3. 13
      weed/command/master.go
  4. 26
      weed/server/master_server.go
  5. 49
      weed/server/raft_hashicorp.go
  6. 2
      weed/server/raft_server.go
  7. 1
      weed/topology/cluster_commands.go
  8. 19
      weed/topology/topology.go

3
docker/Makefile

@ -55,6 +55,9 @@ cluster: build
2clusters: build 2clusters: build
docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up
hashicorp_raft: build
docker-compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up
s3tests: build s3tests_build s3tests: build s3tests_build
docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up

18
docker/compose/local-hashicorp-raft-compose.yml

@ -6,9 +6,9 @@ services:
ports: ports:
- 9333:9333 - 9333:9333
- 19333:19333 - 19333:19333
command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master0 -port=9333 master1:9334,master2:9335 -mdir=/data"
volumes:
- ./master/0:/data
command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master0 -port=9333 master1:9334,master2:9335 -mdir=/data"
#volumes:
# - ./master/0:/data
environment: environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
@ -18,9 +18,9 @@ services:
ports: ports:
- 9334:9334 - 9334:9334
- 19334:19334 - 19334:19334
command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data"
volumes:
- ./master/0:/data
command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data"
#volumes:
# - ./master/0:/data
environment: environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
@ -30,9 +30,9 @@ services:
ports: ports:
- 9335:9335 - 9335:9335
- 19335:19335 - 19335:19335
command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data"
volumes:
- ./master/0:/data
command: "-v=3 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -raftBootstrap=true -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data"
#volumes:
# - ./master/0:/data
environment: environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 WEED_MASTER_VOLUME_GROWTH_COPY_2: 2

13
weed/command/master.go

@ -1,7 +1,6 @@
package command package command
import ( import (
"context"
"net/http" "net/http"
"os" "os"
"sort" "sort"
@ -166,14 +165,15 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
var raftServer *weed_server.RaftServer var raftServer *weed_server.RaftServer
var err error var err error
if *m.raftHashicorp { if *m.raftHashicorp {
ctx := context.Background()
raftServer, err = weed_server.NewHashicorpRaftServer(ctx, raftServerOption)
if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
glog.Fatalf("NewHashicorpRaftServer: %s", err)
}
} else { } else {
raftServer, err = weed_server.NewRaftServer(raftServerOption) raftServer, err = weed_server.NewRaftServer(raftServerOption)
}
if raftServer == nil { if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
} }
}
ms.SetRaftServer(raftServer) ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
if *m.raftHashicorp { if *m.raftHashicorp {
@ -199,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
} }
go grpcS.Serve(grpcL) go grpcS.Serve(grpcL)
timeSleep := 1500 * time.Millisecond
if !*m.raftHashicorp {
go func() { go func() {
time.Sleep(1500 * time.Millisecond)
time.Sleep(timeSleep)
if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
raftServer.DoJoinCommand() raftServer.DoJoinCommand()
} }
} }
}() }()
}
go ms.MasterClient.KeepConnectedToMaster() go ms.MasterClient.KeepConnectedToMaster()

26
weed/server/master_server.go

@ -160,6 +160,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
} }
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
var raftServerName string
if raftServer.raftServer != nil {
ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
@ -168,11 +170,31 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
} }
}) })
raftServerName = ms.Topo.RaftServer.Name()
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
leaderCh := raftServer.RaftHashicorp.LeaderCh()
prevLeader := ms.Topo.HashicorpRaft.Leader()
go func() {
for {
select {
case isLeader := <-leaderCh:
leader := ms.Topo.HashicorpRaft.Leader()
glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
prevLeader = leader
}
}
}()
raftServerName = ms.Topo.HashicorpRaft.String()
}
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else { } else {
if ms.Topo.RaftServer.Leader() != "" {
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
} }
} }
} }

49
weed/server/raft_hashicorp.go

@ -4,7 +4,6 @@ package weed_server
// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 // https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
import ( import (
"context"
"fmt" "fmt"
transport "github.com/Jille/raft-grpc-transport" transport "github.com/Jille/raft-grpc-transport"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -17,7 +16,7 @@ import (
"time" "time"
) )
func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*RaftServer, error) {
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{ s := &RaftServer{
peers: option.Peers, peers: option.Peers,
serverAddr: option.ServerAddr, serverAddr: option.ServerAddr,
@ -27,18 +26,20 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf
c := raft.DefaultConfig() c := raft.DefaultConfig()
c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
c.NoSnapshotRestoreOnStart = option.RaftResumeState
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
c.ElectionTimeout = option.ElectionTimeout c.ElectionTimeout = option.ElectionTimeout
if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
c.LeaderLeaseTimeout = c.HeartbeatTimeout
}
if glog.V(4) { if glog.V(4) {
c.Logger.SetLevel(1)
} else if glog.V(3) {
c.Logger.SetLevel(2)
c.LogLevel = "Debug"
} else if glog.V(2) { } else if glog.V(2) {
c.Logger.SetLevel(3)
c.LogLevel = "Info"
} else if glog.V(1) { } else if glog.V(1) {
c.Logger.SetLevel(4)
c.LogLevel = "Warn"
} else if glog.V(0) { } else if glog.V(0) {
c.Logger.SetLevel(5)
c.LogLevel = "Error"
} }
baseDir := s.dataDir baseDir := s.dataDir
@ -58,41 +59,55 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
} }
// s.GrpcServer = raft.NewGrpcServer(s.raftServer)
s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
stateMachine := StateMachine{topo: option.Topo} stateMachine := StateMachine{topo: option.Topo}
r, err := raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
if err != nil { if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err) return nil, fmt.Errorf("raft.NewRaft: %v", err)
} }
if option.RaftBootstrap { if option.RaftBootstrap {
cfg := raft.Configuration{ cfg := raft.Configuration{
Servers: []raft.Server{ Servers: []raft.Server{
{ {
Suffrage: raft.Voter, Suffrage: raft.Voter,
ID: c.LocalID, ID: c.LocalID,
Address: raft.ServerAddress(s.serverAddr),
Address: raft.ServerAddress(s.serverAddr.ToGrpcAddress()),
}, },
}, },
} }
// Add known peers to bootstrap // Add known peers to bootstrap
for _, node := range option.Peers {
if node == option.ServerAddr {
for _, peer := range option.Peers {
if peer == option.ServerAddr {
continue continue
} }
cfg.Servers = append(cfg.Servers, raft.Server{ cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: raft.Voter, Suffrage: raft.Voter,
ID: raft.ServerID(node),
Address: raft.ServerAddress(node),
ID: raft.ServerID(peer),
Address: raft.ServerAddress(peer.ToGrpcAddress()),
}) })
} }
f := r.BootstrapCluster(cfg)
f := s.RaftHashicorp.BootstrapCluster(cfg)
if err := f.Error(); err != nil { if err := f.Error(); err != nil {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
} }
} }
ticker := time.NewTicker(c.HeartbeatTimeout * 10)
if glog.V(4) {
go func() {
for {
select {
case <-ticker.C:
cfuture := s.RaftHashicorp.GetConfiguration()
if err = cfuture.Error(); err != nil {
glog.Fatalf("error getting config: %s", err)
}
configuration := cfuture.Configuration()
glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
}
}
}()
}
return s, nil return s, nil
} }

2
weed/server/raft_server.go

@ -36,7 +36,7 @@ type RaftServerOption struct {
type RaftServer struct { type RaftServer struct {
peers map[string]pb.ServerAddress // initial peers to join with peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server raftServer raft.Server
raftHashicorp hashicorpRaft.Raft
RaftHashicorp *hashicorpRaft.Raft
TransportManager *transport.Manager TransportManager *transport.Manager
dataDir string dataDir string
serverAddr pb.ServerAddress serverAddr pb.ServerAddress

1
weed/topology/cluster_commands.go

@ -23,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId" return "MaxVolumeId"
} }
// deprecatedCommandApply represents the old interface to apply a command to the server.
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology) topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId() before := topo.GetMaxVolumeId()

19
weed/topology/topology.go

@ -1,6 +1,7 @@
package topology package topology
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
@ -10,6 +11,7 @@ import (
"time" "time"
"github.com/chrislusf/raft" "github.com/chrislusf/raft"
hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -41,6 +43,7 @@ type Topology struct {
Configuration *Configuration Configuration *Configuration
RaftServer raft.Server RaftServer raft.Server
HashicorpRaft *hashicorpRaft.Raft
} }
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool {
return true return true
} }
} }
} else if t.HashicorpRaft != nil {
if t.HashicorpRaft.State() == hashicorpRaft.Leader {
return true
}
} }
return false return false
} }
@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) {
for count := 0; count < 3; count++ { for count := 0; count < 3; count++ {
if t.RaftServer != nil { if t.RaftServer != nil {
l = pb.ServerAddress(t.RaftServer.Leader()) l = pb.ServerAddress(t.RaftServer.Leader())
} else if t.HashicorpRaft != nil {
l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else { } else {
return "", errors.New("Raft Server not ready yet!") return "", errors.New("Raft Server not ready yet!")
} }
@ -124,9 +133,19 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) { func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId() vid := t.GetMaxVolumeId()
next := vid.Next() next := vid.Next()
if t.RaftServer != nil {
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
return 0, err return 0, err
} }
} else if t.HashicorpRaft != nil {
b, err := json.Marshal(NewMaxVolumeIdCommand(next))
if err != nil {
return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
}
if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
return 0, future.Error()
}
}
return next, nil return next, nil
} }

Loading…
Cancel
Save