You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
5.5 KiB

7 years ago
7 years ago
4 years ago
7 years ago
6 years ago
7 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  29. "github.com/chrislusf/seaweedfs/weed/glog"
  30. "github.com/chrislusf/seaweedfs/weed/notification"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  35. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  36. "github.com/chrislusf/seaweedfs/weed/security"
  37. )
  38. type FilerOption struct {
  39. Masters []string
  40. Collection string
  41. DefaultReplication string
  42. DisableDirListing bool
  43. MaxMB int
  44. DirListingLimit int
  45. DataCenter string
  46. Rack string
  47. DefaultLevelDbDir string
  48. DisableHttp bool
  49. Host string
  50. Port uint32
  51. recursiveDelete bool
  52. Cipher bool
  53. SaveToFilerLimit int
  54. Filers []string
  55. }
  56. type FilerServer struct {
  57. option *FilerOption
  58. secret security.SigningKey
  59. filer *filer.Filer
  60. grpcDialOption grpc.DialOption
  61. // metrics read from the master
  62. metricsAddress string
  63. metricsIntervalSec int
  64. // notifying clients
  65. listenersLock sync.Mutex
  66. listenersCond *sync.Cond
  67. brokers map[string]map[string]bool
  68. brokersLock sync.Mutex
  69. }
  70. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  71. fs = &FilerServer{
  72. option: option,
  73. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  74. brokers: make(map[string]map[string]bool),
  75. }
  76. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  77. if len(option.Masters) == 0 {
  78. glog.Fatal("master list is required!")
  79. }
  80. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  81. fs.listenersCond.Broadcast()
  82. })
  83. fs.filer.Cipher = option.Cipher
  84. fs.checkWithMaster()
  85. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  86. go fs.filer.KeepConnectedToMaster()
  87. v := util.GetViper()
  88. if !util.LoadConfiguration("filer", false) {
  89. v.Set("leveldb2.enabled", true)
  90. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  91. _, err := os.Stat(option.DefaultLevelDbDir)
  92. if os.IsNotExist(err) {
  93. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  94. }
  95. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  96. } else {
  97. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  98. }
  99. util.LoadConfiguration("notification", false)
  100. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  101. v.SetDefault("filer.options.buckets_folder", "/buckets")
  102. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  103. // TODO deprecated, will be be removed after 2020-12-31
  104. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  105. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  106. fs.filer.LoadConfiguration(v)
  107. notification.LoadConfiguration(v, "notification.")
  108. handleStaticResources(defaultMux)
  109. if !option.DisableHttp {
  110. defaultMux.HandleFunc("/", fs.filerHandler)
  111. }
  112. if defaultMux != readonlyMux {
  113. handleStaticResources(readonlyMux)
  114. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  115. }
  116. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  117. fs.filer.LoadBuckets()
  118. fs.filer.LoadFilerConf()
  119. grace.OnInterrupt(func() {
  120. fs.filer.Shutdown()
  121. })
  122. return fs, nil
  123. }
  124. func (fs *FilerServer) checkWithMaster() {
  125. for _, master := range fs.option.Masters {
  126. _, err := pb.ParseFilerGrpcAddress(master)
  127. if err != nil {
  128. glog.Fatalf("invalid master address %s: %v", master, err)
  129. }
  130. }
  131. isConnected := false
  132. for !isConnected {
  133. for _, master := range fs.option.Masters {
  134. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  135. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  136. if err != nil {
  137. return fmt.Errorf("get master %s configuration: %v", master, err)
  138. }
  139. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  140. if fs.option.DefaultReplication == "" {
  141. fs.option.DefaultReplication = resp.DefaultReplication
  142. }
  143. return nil
  144. })
  145. if readErr == nil {
  146. isConnected = true
  147. } else {
  148. time.Sleep(7 * time.Second)
  149. }
  150. }
  151. }
  152. }