You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
5.8 KiB

7 years ago
7 years ago
4 years ago
3 years ago
4 years ago
7 years ago
6 years ago
7 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "net/http"
  7. "os"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/util/grace"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/filer"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  30. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  31. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  32. _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
  33. _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
  34. "github.com/chrislusf/seaweedfs/weed/glog"
  35. "github.com/chrislusf/seaweedfs/weed/notification"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  37. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  38. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  39. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  40. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  41. "github.com/chrislusf/seaweedfs/weed/security"
  42. )
  43. type FilerOption struct {
  44. Masters []pb.ServerAddress
  45. Collection string
  46. DefaultReplication string
  47. DisableDirListing bool
  48. MaxMB int
  49. DirListingLimit int
  50. DataCenter string
  51. Rack string
  52. DefaultLevelDbDir string
  53. DisableHttp bool
  54. Host pb.ServerAddress
  55. recursiveDelete bool
  56. Cipher bool
  57. SaveToFilerLimit int64
  58. ConcurrentUploadLimit int64
  59. }
  60. type FilerServer struct {
  61. filer_pb.UnimplementedSeaweedFilerServer
  62. option *FilerOption
  63. secret security.SigningKey
  64. filer *filer.Filer
  65. grpcDialOption grpc.DialOption
  66. // metrics read from the master
  67. metricsAddress string
  68. metricsIntervalSec int
  69. // notifying clients
  70. listenersLock sync.Mutex
  71. listenersCond *sync.Cond
  72. brokers map[string]map[string]bool
  73. brokersLock sync.Mutex
  74. inFlightDataSize int64
  75. inFlightDataLimitCond *sync.Cond
  76. }
  77. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  78. fs = &FilerServer{
  79. option: option,
  80. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  81. brokers: make(map[string]map[string]bool),
  82. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  83. }
  84. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  85. if len(option.Masters) == 0 {
  86. glog.Fatal("master list is required!")
  87. }
  88. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  89. fs.listenersCond.Broadcast()
  90. })
  91. fs.filer.Cipher = option.Cipher
  92. fs.checkWithMaster()
  93. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  94. go fs.filer.KeepMasterClientConnected()
  95. v := util.GetViper()
  96. if !util.LoadConfiguration("filer", false) {
  97. v.Set("leveldb2.enabled", true)
  98. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  99. _, err := os.Stat(option.DefaultLevelDbDir)
  100. if os.IsNotExist(err) {
  101. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  102. }
  103. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  104. } else {
  105. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  106. }
  107. util.LoadConfiguration("notification", false)
  108. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  109. v.SetDefault("filer.options.buckets_folder", "/buckets")
  110. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  111. // TODO deprecated, will be be removed after 2020-12-31
  112. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  113. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  114. fs.filer.LoadConfiguration(v)
  115. notification.LoadConfiguration(v, "notification.")
  116. handleStaticResources(defaultMux)
  117. if !option.DisableHttp {
  118. defaultMux.HandleFunc("/", fs.filerHandler)
  119. }
  120. if defaultMux != readonlyMux {
  121. handleStaticResources(readonlyMux)
  122. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  123. }
  124. fs.filer.AggregateFromPeers(option.Host)
  125. fs.filer.LoadBuckets()
  126. fs.filer.LoadFilerConf()
  127. fs.filer.LoadRemoteStorageConfAndMapping()
  128. grace.OnInterrupt(func() {
  129. fs.filer.Shutdown()
  130. })
  131. return fs, nil
  132. }
  133. func (fs *FilerServer) checkWithMaster() {
  134. isConnected := false
  135. for !isConnected {
  136. for _, master := range fs.option.Masters {
  137. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  138. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  139. if err != nil {
  140. return fmt.Errorf("get master %s configuration: %v", master, err)
  141. }
  142. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  143. if fs.option.DefaultReplication == "" {
  144. fs.option.DefaultReplication = resp.DefaultReplication
  145. }
  146. return nil
  147. })
  148. if readErr == nil {
  149. isConnected = true
  150. } else {
  151. time.Sleep(7 * time.Second)
  152. }
  153. }
  154. }
  155. }