You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
5.4 KiB

7 years ago
7 years ago
7 years ago
6 years ago
7 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  34. "github.com/chrislusf/seaweedfs/weed/security"
  35. )
  36. type FilerOption struct {
  37. Masters []string
  38. Collection string
  39. DefaultReplication string
  40. DisableDirListing bool
  41. MaxMB int
  42. DirListingLimit int
  43. DataCenter string
  44. Rack string
  45. DefaultLevelDbDir string
  46. DisableHttp bool
  47. Host string
  48. Port uint32
  49. recursiveDelete bool
  50. Cipher bool
  51. CacheToFilerLimit int64
  52. Filers []string
  53. }
  54. type FilerServer struct {
  55. option *FilerOption
  56. secret security.SigningKey
  57. filer *filer.Filer
  58. grpcDialOption grpc.DialOption
  59. // metrics read from the master
  60. metricsAddress string
  61. metricsIntervalSec int
  62. // notifying clients
  63. listenersLock sync.Mutex
  64. listenersCond *sync.Cond
  65. brokers map[string]map[string]bool
  66. brokersLock sync.Mutex
  67. }
  68. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  69. fs = &FilerServer{
  70. option: option,
  71. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  72. brokers: make(map[string]map[string]bool),
  73. }
  74. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  75. if len(option.Masters) == 0 {
  76. glog.Fatal("master list is required!")
  77. }
  78. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  79. fs.listenersCond.Broadcast()
  80. })
  81. fs.filer.Cipher = option.Cipher
  82. fs.checkWithMaster()
  83. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  84. go fs.filer.KeepConnectedToMaster()
  85. v := util.GetViper()
  86. if !util.LoadConfiguration("filer", false) {
  87. v.Set("leveldb2.enabled", true)
  88. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  89. _, err := os.Stat(option.DefaultLevelDbDir)
  90. if os.IsNotExist(err) {
  91. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  92. }
  93. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  94. } else {
  95. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  96. }
  97. util.LoadConfiguration("notification", false)
  98. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  99. v.SetDefault("filer.options.buckets_folder", "/buckets")
  100. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  101. // TODO deprecated, will be be removed after 2020-12-31
  102. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  103. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  104. fs.filer.LoadConfiguration(v)
  105. notification.LoadConfiguration(v, "notification.")
  106. handleStaticResources(defaultMux)
  107. if !option.DisableHttp {
  108. defaultMux.HandleFunc("/", fs.filerHandler)
  109. }
  110. if defaultMux != readonlyMux {
  111. handleStaticResources(readonlyMux)
  112. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  113. }
  114. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  115. fs.filer.LoadBuckets()
  116. fs.filer.LoadFilerConf()
  117. grace.OnInterrupt(func() {
  118. fs.filer.Shutdown()
  119. })
  120. return fs, nil
  121. }
  122. func (fs *FilerServer) checkWithMaster() {
  123. for _, master := range fs.option.Masters {
  124. _, err := pb.ParseFilerGrpcAddress(master)
  125. if err != nil {
  126. glog.Fatalf("invalid master address %s: %v", master, err)
  127. }
  128. }
  129. isConnected := false
  130. for !isConnected {
  131. for _, master := range fs.option.Masters {
  132. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  133. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  134. if err != nil {
  135. return fmt.Errorf("get master %s configuration: %v", master, err)
  136. }
  137. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  138. if fs.option.DefaultReplication == "" {
  139. fs.option.DefaultReplication = resp.DefaultReplication
  140. }
  141. return nil
  142. })
  143. if readErr == nil {
  144. isConnected = true
  145. } else {
  146. time.Sleep(7 * time.Second)
  147. }
  148. }
  149. }
  150. }