You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
13 KiB

3 years ago
7 years ago
6 years ago
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
3 years ago
5 years ago
3 years ago
6 years ago
6 years ago
4 years ago
3 years ago
6 years ago
6 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/stats"
  6. "net/http"
  7. "net/http/httputil"
  8. "net/url"
  9. "os"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/cluster"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/raft"
  18. "github.com/gorilla/mux"
  19. hashicorpRaft "github.com/hashicorp/raft"
  20. "google.golang.org/grpc"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  23. "github.com/chrislusf/seaweedfs/weed/security"
  24. "github.com/chrislusf/seaweedfs/weed/sequence"
  25. "github.com/chrislusf/seaweedfs/weed/shell"
  26. "github.com/chrislusf/seaweedfs/weed/topology"
  27. "github.com/chrislusf/seaweedfs/weed/util"
  28. "github.com/chrislusf/seaweedfs/weed/wdclient"
  29. )
  30. const (
  31. SequencerType = "master.sequencer.type"
  32. SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
  33. RaftServerRemovalTime = 72 * time.Minute
  34. )
  35. type MasterOption struct {
  36. Master pb.ServerAddress
  37. MetaFolder string
  38. VolumeSizeLimitMB uint32
  39. VolumePreallocate bool
  40. // PulseSeconds int
  41. DefaultReplicaPlacement string
  42. GarbageThreshold float64
  43. WhiteList []string
  44. DisableHttp bool
  45. MetricsAddress string
  46. MetricsIntervalSec int
  47. IsFollower bool
  48. }
  49. type MasterServer struct {
  50. master_pb.UnimplementedSeaweedServer
  51. option *MasterOption
  52. guard *security.Guard
  53. preallocateSize int64
  54. Topo *topology.Topology
  55. vg *topology.VolumeGrowth
  56. vgCh chan *topology.VolumeGrowRequest
  57. boundedLeaderChan chan int
  58. onPeerUpdateDoneCn chan string
  59. onPeerUpdateGoroutineCount uint32
  60. // notifying clients
  61. clientChansLock sync.RWMutex
  62. clientChans map[string]chan *master_pb.KeepConnectedResponse
  63. grpcDialOption grpc.DialOption
  64. MasterClient *wdclient.MasterClient
  65. adminLocks *AdminLocks
  66. Cluster *cluster.Cluster
  67. }
  68. func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
  69. v := util.GetViper()
  70. signingKey := v.GetString("jwt.signing.key")
  71. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  72. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  73. readSigningKey := v.GetString("jwt.signing.read.key")
  74. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  75. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  76. v.SetDefault("master.replication.treat_replication_as_minimums", false)
  77. replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
  78. v.SetDefault("master.volume_growth.copy_1", 7)
  79. v.SetDefault("master.volume_growth.copy_2", 6)
  80. v.SetDefault("master.volume_growth.copy_3", 3)
  81. v.SetDefault("master.volume_growth.copy_other", 1)
  82. v.SetDefault("master.volume_growth.threshold", 0.9)
  83. var preallocateSize int64
  84. if option.VolumePreallocate {
  85. preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
  86. }
  87. grpcDialOption := security.LoadClientTLS(v, "grpc.master")
  88. ms := &MasterServer{
  89. option: option,
  90. preallocateSize: preallocateSize,
  91. vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
  92. clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
  93. grpcDialOption: grpcDialOption,
  94. MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers),
  95. adminLocks: NewAdminLocks(),
  96. Cluster: cluster.NewCluster(),
  97. }
  98. ms.boundedLeaderChan = make(chan int, 16)
  99. ms.onPeerUpdateDoneCn = make(chan string)
  100. ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
  101. seq := ms.createSequencer(option)
  102. if nil == seq {
  103. glog.Fatalf("create sequencer failed.")
  104. }
  105. ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
  106. ms.vg = topology.NewDefaultVolumeGrowth()
  107. glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
  108. ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  109. handleStaticResources2(r)
  110. r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
  111. r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
  112. if !ms.option.DisableHttp {
  113. r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
  114. r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
  115. r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
  116. r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
  117. r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
  118. r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
  119. r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
  120. r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
  121. /*
  122. r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
  123. r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
  124. r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
  125. */
  126. r.HandleFunc("/{fileId}", ms.redirectHandler)
  127. }
  128. ms.Topo.StartRefreshWritableVolumes(
  129. ms.grpcDialOption,
  130. ms.option.GarbageThreshold,
  131. v.GetFloat64("master.volume_growth.threshold"),
  132. ms.preallocateSize,
  133. )
  134. ms.ProcessGrowRequest()
  135. if !option.IsFollower {
  136. ms.startAdminScripts()
  137. }
  138. return ms
  139. }
  140. func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
  141. var raftServerName string
  142. if raftServer.raftServer != nil {
  143. ms.Topo.RaftServer = raftServer.raftServer
  144. ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
  145. glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
  146. stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
  147. if ms.Topo.RaftServer.Leader() != "" {
  148. glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
  149. }
  150. })
  151. raftServerName = ms.Topo.RaftServer.Name()
  152. } else if raftServer.RaftHashicorp != nil {
  153. ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
  154. leaderCh := raftServer.RaftHashicorp.LeaderCh()
  155. prevLeader := ms.Topo.HashicorpRaft.Leader()
  156. go func() {
  157. for {
  158. select {
  159. case isLeader := <-leaderCh:
  160. leader := ms.Topo.HashicorpRaft.Leader()
  161. glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
  162. stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
  163. prevLeader = leader
  164. }
  165. }
  166. }()
  167. raftServerName = ms.Topo.HashicorpRaft.String()
  168. }
  169. if ms.Topo.IsLeader() {
  170. glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
  171. } else {
  172. if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
  173. glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
  174. } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
  175. glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
  176. }
  177. }
  178. }
  179. func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
  180. return func(w http.ResponseWriter, r *http.Request) {
  181. if ms.Topo.IsLeader() {
  182. f(w, r)
  183. return
  184. }
  185. var raftServerLeader string
  186. if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
  187. raftServerLeader = ms.Topo.RaftServer.Leader()
  188. } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
  189. raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
  190. }
  191. if raftServerLeader == "" {
  192. f(w, r)
  193. return
  194. }
  195. ms.boundedLeaderChan <- 1
  196. defer func() { <-ms.boundedLeaderChan }()
  197. targetUrl, err := url.Parse("http://" + raftServerLeader)
  198. if err != nil {
  199. writeJsonError(w, r, http.StatusInternalServerError,
  200. fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
  201. return
  202. }
  203. glog.V(4).Infoln("proxying to leader", raftServerLeader)
  204. proxy := httputil.NewSingleHostReverseProxy(targetUrl)
  205. director := proxy.Director
  206. proxy.Director = func(req *http.Request) {
  207. actualHost, err := security.GetActualRemoteHost(req)
  208. if err == nil {
  209. req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
  210. }
  211. director(req)
  212. }
  213. proxy.Transport = util.Transport
  214. proxy.ServeHTTP(w, r)
  215. }
  216. }
  217. func (ms *MasterServer) startAdminScripts() {
  218. v := util.GetViper()
  219. adminScripts := v.GetString("master.maintenance.scripts")
  220. if adminScripts == "" {
  221. return
  222. }
  223. glog.V(0).Infof("adminScripts: %v", adminScripts)
  224. v.SetDefault("master.maintenance.sleep_minutes", 17)
  225. sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
  226. scriptLines := strings.Split(adminScripts, "\n")
  227. if !strings.Contains(adminScripts, "lock") {
  228. scriptLines = append(append([]string{}, "lock"), scriptLines...)
  229. scriptLines = append(scriptLines, "unlock")
  230. }
  231. masterAddress := string(ms.option.Master)
  232. var shellOptions shell.ShellOptions
  233. shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
  234. shellOptions.Masters = &masterAddress
  235. shellOptions.Directory = "/"
  236. emptyFilerGroup := ""
  237. shellOptions.FilerGroup = &emptyFilerGroup
  238. commandEnv := shell.NewCommandEnv(&shellOptions)
  239. reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
  240. go commandEnv.MasterClient.KeepConnectedToMaster()
  241. go func() {
  242. commandEnv.MasterClient.WaitUntilConnected()
  243. for {
  244. time.Sleep(time.Duration(sleepMinutes) * time.Minute)
  245. if ms.Topo.IsLeader() {
  246. shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup))
  247. if shellOptions.FilerAddress == "" {
  248. continue
  249. }
  250. for _, line := range scriptLines {
  251. for _, c := range strings.Split(line, ";") {
  252. processEachCmd(reg, c, commandEnv)
  253. }
  254. }
  255. }
  256. }
  257. }()
  258. }
  259. func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
  260. cmds := reg.FindAllString(line, -1)
  261. if len(cmds) == 0 {
  262. return
  263. }
  264. args := make([]string, len(cmds[1:]))
  265. for i := range args {
  266. args[i] = strings.Trim(string(cmds[1+i]), "\"'")
  267. }
  268. cmd := strings.ToLower(cmds[0])
  269. for _, c := range shell.Commands {
  270. if c.Name() == cmd {
  271. glog.V(0).Infof("executing: %s %v", cmd, args)
  272. if err := c.Do(args, commandEnv, os.Stdout); err != nil {
  273. glog.V(0).Infof("error: %v", err)
  274. }
  275. }
  276. }
  277. }
  278. func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
  279. var seq sequence.Sequencer
  280. v := util.GetViper()
  281. seqType := strings.ToLower(v.GetString(SequencerType))
  282. glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
  283. switch strings.ToLower(seqType) {
  284. case "snowflake":
  285. var err error
  286. snowflakeId := v.GetInt(SequencerSnowflakeId)
  287. seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
  288. if err != nil {
  289. glog.Error(err)
  290. seq = nil
  291. }
  292. default:
  293. seq = sequence.NewMemorySequencer()
  294. }
  295. return seq
  296. }
  297. func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  298. if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
  299. return
  300. }
  301. glog.V(4).Infof("OnPeerUpdate: %+v", update)
  302. peerAddress := pb.ServerAddress(update.Address)
  303. peerName := string(peerAddress)
  304. isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
  305. if update.IsAdd {
  306. if isLeader {
  307. raftServerFound := false
  308. for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
  309. if string(server.ID) == peerName {
  310. raftServerFound = true
  311. }
  312. }
  313. if !raftServerFound {
  314. glog.V(0).Infof("adding new raft server: %s", peerName)
  315. ms.Topo.HashicorpRaft.AddVoter(
  316. hashicorpRaft.ServerID(peerName),
  317. hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
  318. }
  319. }
  320. if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 {
  321. ms.onPeerUpdateDoneCn <- peerName
  322. }
  323. } else if isLeader {
  324. go func(peerName string) {
  325. raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
  326. raftServerPingTicker := time.NewTicker(5 * time.Minute)
  327. atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1)
  328. defer func() {
  329. atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1)
  330. }()
  331. for {
  332. select {
  333. case <-raftServerPingTicker.C:
  334. err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  335. _, err := client.Ping(context.Background(), &master_pb.PingRequest{
  336. Target: peerName,
  337. TargetType: cluster.MasterType,
  338. })
  339. return err
  340. })
  341. if err != nil {
  342. glog.Warningf("raft server %s ping failed %+v", peerName, err)
  343. } else {
  344. glog.V(0).Infof("raft server %s remove canceled on ping success", peerName)
  345. return
  346. }
  347. case <-raftServerRemovalTimeAfter:
  348. err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  349. _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
  350. Id: peerName,
  351. Force: false,
  352. })
  353. return err
  354. })
  355. if err != nil {
  356. glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
  357. return
  358. }
  359. glog.V(0).Infof("old raft server %s removed", peerName)
  360. return
  361. case peerDone := <-ms.onPeerUpdateDoneCn:
  362. if peerName == peerDone {
  363. glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName)
  364. return
  365. }
  366. }
  367. }
  368. }(peerName)
  369. glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName)
  370. }
  371. }