You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
5.3 KiB

11 years ago
10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "math/rand"
  5. "net/http"
  6. "os"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
  12. "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
  13. "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
  14. "github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
  15. "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/security"
  18. "github.com/chrislusf/seaweedfs/weed/storage"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. )
  21. type filerConf struct {
  22. MysqlConf []mysql_store.MySqlConf `json:"mysql"`
  23. mysql_store.ShardingConf
  24. }
  25. func parseConfFile(confPath string) (*filerConf, error) {
  26. var setting filerConf
  27. configFile, err := os.Open(confPath)
  28. defer configFile.Close()
  29. if err != nil {
  30. return nil, err
  31. }
  32. jsonParser := json.NewDecoder(configFile)
  33. if err = jsonParser.Decode(&setting); err != nil {
  34. return nil, err
  35. }
  36. return &setting, nil
  37. }
  38. type FilerServer struct {
  39. port string
  40. master string
  41. mnLock sync.RWMutex
  42. collection string
  43. defaultReplication string
  44. redirectOnRead bool
  45. disableDirListing bool
  46. secret security.Secret
  47. filer filer.Filer
  48. maxMB int
  49. masterNodes *storage.MasterNodes
  50. }
  51. func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
  52. replication string, redirectOnRead bool, disableDirListing bool,
  53. confFile string,
  54. maxMB int,
  55. secret string,
  56. cassandra_server string, cassandra_keyspace string,
  57. redis_server string, redis_password string, redis_database int,
  58. ) (fs *FilerServer, err error) {
  59. fs = &FilerServer{
  60. master: master,
  61. collection: collection,
  62. defaultReplication: replication,
  63. redirectOnRead: redirectOnRead,
  64. disableDirListing: disableDirListing,
  65. maxMB: maxMB,
  66. port: ip + ":" + strconv.Itoa(port),
  67. }
  68. var setting *filerConf
  69. if confFile != "" {
  70. setting, err = parseConfFile(confFile)
  71. if err != nil {
  72. return nil, err
  73. }
  74. } else {
  75. setting = new(filerConf)
  76. }
  77. if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
  78. mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
  79. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
  80. } else if cassandra_server != "" {
  81. cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
  82. if err != nil {
  83. glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
  84. }
  85. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
  86. } else if redis_server != "" {
  87. redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
  88. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
  89. } else {
  90. if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
  91. glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
  92. return
  93. }
  94. r.HandleFunc("/admin/mv", fs.moveHandler)
  95. r.HandleFunc("/admin/register", fs.registerHandler)
  96. }
  97. r.HandleFunc("/", fs.filerHandler)
  98. go func() {
  99. connected := true
  100. fs.masterNodes = storage.NewMasterNodes(fs.master)
  101. glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
  102. //force initialize with all available master nodes
  103. for {
  104. _, err := fs.masterNodes.FindMaster()
  105. if err != nil {
  106. glog.Infof("filer server failed to get master cluster info:%s", err.Error())
  107. time.Sleep(3 * time.Second)
  108. } else {
  109. break
  110. }
  111. }
  112. for {
  113. glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
  114. master, err := fs.detectHealthyMaster(fs.getMasterNode())
  115. if err == nil {
  116. if !connected {
  117. connected = true
  118. if fs.getMasterNode() != master {
  119. fs.setMasterNode(master)
  120. }
  121. glog.V(0).Infoln("Filer Server Connected with master at", master)
  122. }
  123. } else {
  124. glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
  125. if connected {
  126. connected = false
  127. }
  128. }
  129. if connected {
  130. time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
  131. } else {
  132. time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
  133. }
  134. }
  135. }()
  136. return fs, nil
  137. }
  138. func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
  139. return security.GenJwt(fs.secret, fileId)
  140. }
  141. func (fs *FilerServer) getMasterNode() string {
  142. fs.mnLock.RLock()
  143. defer fs.mnLock.RUnlock()
  144. return fs.master
  145. }
  146. func (fs *FilerServer) setMasterNode(masterNode string) {
  147. fs.mnLock.Lock()
  148. defer fs.mnLock.Unlock()
  149. fs.master = masterNode
  150. }
  151. func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
  152. if e = checkMaster(masterNode); e != nil {
  153. fs.masterNodes.Reset()
  154. for i := 0; i <= 3; i++ {
  155. master, e = fs.masterNodes.FindMaster()
  156. if e != nil {
  157. continue
  158. } else {
  159. if e = checkMaster(master); e == nil {
  160. break
  161. }
  162. }
  163. }
  164. } else {
  165. master = masterNode
  166. }
  167. return
  168. }
  169. func checkMaster(masterNode string) error {
  170. statUrl := "http://" + masterNode + "/stats"
  171. glog.V(4).Infof("Connecting to %s ...", statUrl)
  172. _, e := util.Get(statUrl)
  173. return e
  174. }