You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
4.3 KiB

11 years ago
10 years ago
10 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "math/rand"
  4. "net/http"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
  10. "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
  11. "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
  12. "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/storage"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. type FilerServer struct {
  19. port string
  20. master string
  21. mnLock sync.RWMutex
  22. collection string
  23. defaultReplication string
  24. redirectOnRead bool
  25. disableDirListing bool
  26. secret security.Secret
  27. filer filer.Filer
  28. masterNodes *storage.MasterNodes
  29. }
  30. func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
  31. replication string, redirectOnRead bool, disableDirListing bool,
  32. secret string,
  33. cassandra_server string, cassandra_keyspace string,
  34. redis_server string, redis_password string, redis_database int,
  35. ) (fs *FilerServer, err error) {
  36. fs = &FilerServer{
  37. master: master,
  38. collection: collection,
  39. defaultReplication: replication,
  40. redirectOnRead: redirectOnRead,
  41. disableDirListing: disableDirListing,
  42. port: ip + ":" + strconv.Itoa(port),
  43. }
  44. if cassandra_server != "" {
  45. cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
  46. if err != nil {
  47. glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
  48. }
  49. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
  50. } else if redis_server != "" {
  51. redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
  52. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
  53. } else {
  54. if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
  55. glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
  56. return
  57. }
  58. r.HandleFunc("/admin/mv", fs.moveHandler)
  59. r.HandleFunc("/admin/register", fs.registerHandler)
  60. }
  61. r.HandleFunc("/", fs.filerHandler)
  62. go func() {
  63. connected := true
  64. fs.masterNodes = storage.NewMasterNodes(fs.master)
  65. glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
  66. //force initialize with all available master nodes
  67. for {
  68. _, err := fs.masterNodes.FindMaster()
  69. if err != nil {
  70. glog.Infof("filer server failed to get master cluster info:%s", err.Error())
  71. time.Sleep(3 * time.Second)
  72. } else {
  73. break
  74. }
  75. }
  76. for {
  77. glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
  78. master, err := fs.detectHealthyMaster(fs.getMasterNode())
  79. if err == nil {
  80. if !connected {
  81. connected = true
  82. if fs.getMasterNode() != master {
  83. fs.setMasterNode(master)
  84. }
  85. glog.V(0).Infoln("Filer Server Connected with master at", master)
  86. }
  87. } else {
  88. glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
  89. if connected {
  90. connected = false
  91. }
  92. }
  93. if connected {
  94. time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
  95. } else {
  96. time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
  97. }
  98. }
  99. }()
  100. return fs, nil
  101. }
  102. func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
  103. return security.GenJwt(fs.secret, fileId)
  104. }
  105. func (fs *FilerServer) getMasterNode() string {
  106. fs.mnLock.RLock()
  107. defer fs.mnLock.RUnlock()
  108. return fs.master
  109. }
  110. func (fs *FilerServer) setMasterNode(masterNode string) {
  111. fs.mnLock.Lock()
  112. defer fs.mnLock.Unlock()
  113. fs.master = masterNode
  114. }
  115. func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
  116. if e = checkMaster(masterNode); e != nil {
  117. fs.masterNodes.Reset()
  118. for i := 0; i <= 3; i++ {
  119. master, e = fs.masterNodes.FindMaster()
  120. if e != nil {
  121. continue
  122. } else {
  123. if e = checkMaster(master); e == nil {
  124. break
  125. }
  126. }
  127. }
  128. } else {
  129. master = masterNode
  130. }
  131. return
  132. }
  133. func checkMaster(masterNode string) error {
  134. statUrl := "http://" + masterNode + "/stats"
  135. glog.V(4).Infof("Connecting to %s ...", statUrl)
  136. _, e := util.Get(statUrl)
  137. return e
  138. }