You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
5.6 KiB

7 years ago
7 years ago
4 years ago
7 years ago
6 years ago
7 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  30. "github.com/chrislusf/seaweedfs/weed/glog"
  31. "github.com/chrislusf/seaweedfs/weed/notification"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  35. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  37. "github.com/chrislusf/seaweedfs/weed/security"
  38. )
  39. type FilerOption struct {
  40. Masters []string
  41. Collection string
  42. DefaultReplication string
  43. DisableDirListing bool
  44. MaxMB int
  45. DirListingLimit int
  46. DataCenter string
  47. Rack string
  48. DefaultLevelDbDir string
  49. DisableHttp bool
  50. Host string
  51. Port uint32
  52. recursiveDelete bool
  53. Cipher bool
  54. SaveToFilerLimit int
  55. Filers []string
  56. }
  57. type FilerServer struct {
  58. option *FilerOption
  59. secret security.SigningKey
  60. filer *filer.Filer
  61. grpcDialOption grpc.DialOption
  62. // metrics read from the master
  63. metricsAddress string
  64. metricsIntervalSec int
  65. // notifying clients
  66. listenersLock sync.Mutex
  67. listenersCond *sync.Cond
  68. brokers map[string]map[string]bool
  69. brokersLock sync.Mutex
  70. }
  71. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  72. fs = &FilerServer{
  73. option: option,
  74. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  75. brokers: make(map[string]map[string]bool),
  76. }
  77. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  78. if len(option.Masters) == 0 {
  79. glog.Fatal("master list is required!")
  80. }
  81. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  82. fs.listenersCond.Broadcast()
  83. })
  84. fs.filer.Cipher = option.Cipher
  85. fs.checkWithMaster()
  86. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  87. go fs.filer.KeepConnectedToMaster()
  88. v := util.GetViper()
  89. if !util.LoadConfiguration("filer", false) {
  90. v.Set("leveldb2.enabled", true)
  91. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  92. _, err := os.Stat(option.DefaultLevelDbDir)
  93. if os.IsNotExist(err) {
  94. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  95. }
  96. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  97. } else {
  98. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  99. }
  100. util.LoadConfiguration("notification", false)
  101. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  102. v.SetDefault("filer.options.buckets_folder", "/buckets")
  103. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  104. // TODO deprecated, will be be removed after 2020-12-31
  105. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  106. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  107. fs.filer.LoadConfiguration(v)
  108. notification.LoadConfiguration(v, "notification.")
  109. handleStaticResources(defaultMux)
  110. if !option.DisableHttp {
  111. defaultMux.HandleFunc("/", fs.filerHandler)
  112. }
  113. if defaultMux != readonlyMux {
  114. handleStaticResources(readonlyMux)
  115. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  116. }
  117. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  118. fs.filer.LoadBuckets()
  119. fs.filer.LoadFilerConf()
  120. grace.OnInterrupt(func() {
  121. fs.filer.Shutdown()
  122. })
  123. return fs, nil
  124. }
  125. func (fs *FilerServer) checkWithMaster() {
  126. for _, master := range fs.option.Masters {
  127. _, err := pb.ParseFilerGrpcAddress(master)
  128. if err != nil {
  129. glog.Fatalf("invalid master address %s: %v", master, err)
  130. }
  131. }
  132. isConnected := false
  133. for !isConnected {
  134. for _, master := range fs.option.Masters {
  135. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  136. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  137. if err != nil {
  138. return fmt.Errorf("get master %s configuration: %v", master, err)
  139. }
  140. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  141. if fs.option.DefaultReplication == "" {
  142. fs.option.DefaultReplication = resp.DefaultReplication
  143. }
  144. return nil
  145. })
  146. if readErr == nil {
  147. isConnected = true
  148. } else {
  149. time.Sleep(7 * time.Second)
  150. }
  151. }
  152. }
  153. }