You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.5 KiB

7 years ago
7 years ago
4 years ago
7 years ago
6 years ago
7 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  28. "github.com/chrislusf/seaweedfs/weed/glog"
  29. "github.com/chrislusf/seaweedfs/weed/notification"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  35. "github.com/chrislusf/seaweedfs/weed/security"
  36. )
  37. type FilerOption struct {
  38. Masters []string
  39. Collection string
  40. DefaultReplication string
  41. DisableDirListing bool
  42. MaxMB int
  43. DirListingLimit int
  44. DataCenter string
  45. Rack string
  46. DefaultLevelDbDir string
  47. DisableHttp bool
  48. Host string
  49. Port uint32
  50. recursiveDelete bool
  51. Cipher bool
  52. SaveToFilerLimit int
  53. Filers []string
  54. }
  55. type FilerServer struct {
  56. option *FilerOption
  57. secret security.SigningKey
  58. filer *filer.Filer
  59. grpcDialOption grpc.DialOption
  60. // metrics read from the master
  61. metricsAddress string
  62. metricsIntervalSec int
  63. // notifying clients
  64. listenersLock sync.Mutex
  65. listenersCond *sync.Cond
  66. brokers map[string]map[string]bool
  67. brokersLock sync.Mutex
  68. }
  69. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  70. fs = &FilerServer{
  71. option: option,
  72. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  73. brokers: make(map[string]map[string]bool),
  74. }
  75. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  76. if len(option.Masters) == 0 {
  77. glog.Fatal("master list is required!")
  78. }
  79. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  80. fs.listenersCond.Broadcast()
  81. })
  82. fs.filer.Cipher = option.Cipher
  83. fs.checkWithMaster()
  84. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  85. go fs.filer.KeepConnectedToMaster()
  86. v := util.GetViper()
  87. if !util.LoadConfiguration("filer", false) {
  88. v.Set("leveldb2.enabled", true)
  89. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  90. _, err := os.Stat(option.DefaultLevelDbDir)
  91. if os.IsNotExist(err) {
  92. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  93. }
  94. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  95. } else {
  96. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  97. }
  98. util.LoadConfiguration("notification", false)
  99. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  100. v.SetDefault("filer.options.buckets_folder", "/buckets")
  101. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  102. // TODO deprecated, will be be removed after 2020-12-31
  103. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  104. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  105. fs.filer.LoadConfiguration(v)
  106. notification.LoadConfiguration(v, "notification.")
  107. handleStaticResources(defaultMux)
  108. if !option.DisableHttp {
  109. defaultMux.HandleFunc("/", fs.filerHandler)
  110. }
  111. if defaultMux != readonlyMux {
  112. handleStaticResources(readonlyMux)
  113. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  114. }
  115. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  116. fs.filer.LoadBuckets()
  117. fs.filer.LoadFilerConf()
  118. grace.OnInterrupt(func() {
  119. fs.filer.Shutdown()
  120. })
  121. return fs, nil
  122. }
  123. func (fs *FilerServer) checkWithMaster() {
  124. for _, master := range fs.option.Masters {
  125. _, err := pb.ParseFilerGrpcAddress(master)
  126. if err != nil {
  127. glog.Fatalf("invalid master address %s: %v", master, err)
  128. }
  129. }
  130. isConnected := false
  131. for !isConnected {
  132. for _, master := range fs.option.Masters {
  133. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  134. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  135. if err != nil {
  136. return fmt.Errorf("get master %s configuration: %v", master, err)
  137. }
  138. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  139. if fs.option.DefaultReplication == "" {
  140. fs.option.DefaultReplication = resp.DefaultReplication
  141. }
  142. return nil
  143. })
  144. if readErr == nil {
  145. isConnected = true
  146. } else {
  147. time.Sleep(7 * time.Second)
  148. }
  149. }
  150. }
  151. }