You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.6 KiB

11 years ago
10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "math/rand"
  4. "net/http"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
  9. "github.com/chrislusf/seaweedfs/weed/filer/postgres_store"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/storage"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/filer2"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  17. )
  18. type filerConf struct {
  19. MysqlConf []mysql_store.MySqlConf `json:"mysql"`
  20. mysql_store.ShardingConf
  21. PostgresConf *postgres_store.PostgresConf `json:"postgres"`
  22. }
  23. type FilerServer struct {
  24. port string
  25. master string
  26. mnLock sync.RWMutex
  27. collection string
  28. defaultReplication string
  29. redirectOnRead bool
  30. disableDirListing bool
  31. secret security.Secret
  32. filer *filer2.Filer
  33. maxMB int
  34. masterNodes *storage.MasterNodes
  35. }
  36. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, collection string,
  37. replication string, redirectOnRead bool, disableDirListing bool,
  38. maxMB int,
  39. secret string,
  40. ) (fs *FilerServer, err error) {
  41. fs = &FilerServer{
  42. master: master,
  43. collection: collection,
  44. defaultReplication: replication,
  45. redirectOnRead: redirectOnRead,
  46. disableDirListing: disableDirListing,
  47. maxMB: maxMB,
  48. port: ip + ":" + strconv.Itoa(port),
  49. }
  50. fs.filer = filer2.NewFiler(master)
  51. fs.filer.LoadConfiguration()
  52. defaultMux.HandleFunc("/admin/register", fs.registerHandler)
  53. defaultMux.HandleFunc("/", fs.filerHandler)
  54. if defaultMux != readonlyMux {
  55. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  56. }
  57. go func() {
  58. connected := true
  59. fs.masterNodes = storage.NewMasterNodes(fs.master)
  60. glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
  61. for {
  62. glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
  63. master, err := fs.detectHealthyMaster(fs.getMasterNode())
  64. if err == nil {
  65. if !connected {
  66. connected = true
  67. if fs.getMasterNode() != master {
  68. fs.setMasterNode(master)
  69. }
  70. glog.V(0).Infoln("Filer Server Connected with master at", master)
  71. }
  72. } else {
  73. glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
  74. if connected {
  75. connected = false
  76. }
  77. }
  78. if connected {
  79. time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
  80. } else {
  81. time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
  82. }
  83. }
  84. }()
  85. return fs, nil
  86. }
  87. func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
  88. return security.GenJwt(fs.secret, fileId)
  89. }
  90. func (fs *FilerServer) getMasterNode() string {
  91. fs.mnLock.RLock()
  92. defer fs.mnLock.RUnlock()
  93. return fs.master
  94. }
  95. func (fs *FilerServer) setMasterNode(masterNode string) {
  96. fs.mnLock.Lock()
  97. defer fs.mnLock.Unlock()
  98. fs.master = masterNode
  99. }
  100. func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
  101. if e = checkMaster(masterNode); e != nil {
  102. fs.masterNodes.Reset()
  103. for i := 0; i <= 3; i++ {
  104. master, e = fs.masterNodes.FindMaster()
  105. if e != nil {
  106. continue
  107. } else {
  108. if e = checkMaster(master); e == nil {
  109. break
  110. }
  111. }
  112. }
  113. } else {
  114. master = masterNode
  115. }
  116. return
  117. }
  118. func checkMaster(masterNode string) error {
  119. statUrl := "http://" + masterNode + "/stats/health"
  120. glog.V(4).Infof("Connecting to %s ...", statUrl)
  121. _, e := util.Get(statUrl)
  122. return e
  123. }