You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
4.4 KiB

11 years ago
10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "math/rand"
  4. "net/http"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
  10. "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
  11. "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
  12. "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/storage"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. type FilerServer struct {
  19. port string
  20. master string
  21. mnLock sync.RWMutex
  22. collection string
  23. defaultReplication string
  24. redirectOnRead bool
  25. disableDirListing bool
  26. secret security.Secret
  27. filer filer.Filer
  28. maxMB int
  29. masterNodes *storage.MasterNodes
  30. }
  31. func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
  32. replication string, redirectOnRead bool, disableDirListing bool,
  33. maxMB int,
  34. secret string,
  35. cassandra_server string, cassandra_keyspace string,
  36. redis_server string, redis_password string, redis_database int,
  37. ) (fs *FilerServer, err error) {
  38. fs = &FilerServer{
  39. master: master,
  40. collection: collection,
  41. defaultReplication: replication,
  42. redirectOnRead: redirectOnRead,
  43. disableDirListing: disableDirListing,
  44. maxMB: maxMB,
  45. port: ip + ":" + strconv.Itoa(port),
  46. }
  47. if cassandra_server != "" {
  48. cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
  49. if err != nil {
  50. glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
  51. }
  52. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
  53. } else if redis_server != "" {
  54. redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
  55. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
  56. } else {
  57. if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
  58. glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
  59. return
  60. }
  61. r.HandleFunc("/admin/mv", fs.moveHandler)
  62. r.HandleFunc("/admin/register", fs.registerHandler)
  63. }
  64. r.HandleFunc("/", fs.filerHandler)
  65. go func() {
  66. connected := true
  67. fs.masterNodes = storage.NewMasterNodes(fs.master)
  68. glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
  69. //force initialize with all available master nodes
  70. for {
  71. _, err := fs.masterNodes.FindMaster()
  72. if err != nil {
  73. glog.Infof("filer server failed to get master cluster info:%s", err.Error())
  74. time.Sleep(3 * time.Second)
  75. } else {
  76. break
  77. }
  78. }
  79. for {
  80. glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
  81. master, err := fs.detectHealthyMaster(fs.getMasterNode())
  82. if err == nil {
  83. if !connected {
  84. connected = true
  85. if fs.getMasterNode() != master {
  86. fs.setMasterNode(master)
  87. }
  88. glog.V(0).Infoln("Filer Server Connected with master at", master)
  89. }
  90. } else {
  91. glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
  92. if connected {
  93. connected = false
  94. }
  95. }
  96. if connected {
  97. time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
  98. } else {
  99. time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
  100. }
  101. }
  102. }()
  103. return fs, nil
  104. }
  105. func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
  106. return security.GenJwt(fs.secret, fileId)
  107. }
  108. func (fs *FilerServer) getMasterNode() string {
  109. fs.mnLock.RLock()
  110. defer fs.mnLock.RUnlock()
  111. return fs.master
  112. }
  113. func (fs *FilerServer) setMasterNode(masterNode string) {
  114. fs.mnLock.Lock()
  115. defer fs.mnLock.Unlock()
  116. fs.master = masterNode
  117. }
  118. func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
  119. if e = checkMaster(masterNode); e != nil {
  120. fs.masterNodes.Reset()
  121. for i := 0; i <= 3; i++ {
  122. master, e = fs.masterNodes.FindMaster()
  123. if e != nil {
  124. continue
  125. } else {
  126. if e = checkMaster(master); e == nil {
  127. break
  128. }
  129. }
  130. }
  131. } else {
  132. master = masterNode
  133. }
  134. return
  135. }
  136. func checkMaster(masterNode string) error {
  137. statUrl := "http://" + masterNode + "/stats"
  138. glog.V(4).Infof("Connecting to %s ...", statUrl)
  139. _, e := util.Get(statUrl)
  140. return e
  141. }