You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
5.9 KiB

7 years ago
7 years ago
4 years ago
7 years ago
6 years ago
7 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  30. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  31. "github.com/chrislusf/seaweedfs/weed/glog"
  32. "github.com/chrislusf/seaweedfs/weed/notification"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  35. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  37. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  38. "github.com/chrislusf/seaweedfs/weed/security"
  39. )
  40. type FilerOption struct {
  41. Masters []string
  42. Collection string
  43. DefaultReplication string
  44. DisableDirListing bool
  45. MaxMB int
  46. DirListingLimit int
  47. DataCenter string
  48. Rack string
  49. DefaultLevelDbDir string
  50. DisableHttp bool
  51. Host string
  52. Port uint32
  53. recursiveDelete bool
  54. Cipher bool
  55. SaveToFilerLimit int64
  56. Filers []string
  57. ConcurrentUploadLimit int64
  58. }
  59. type FilerServer struct {
  60. option *FilerOption
  61. secret security.SigningKey
  62. filer *filer.Filer
  63. grpcDialOption grpc.DialOption
  64. // metrics read from the master
  65. metricsAddress string
  66. metricsIntervalSec int
  67. // notifying clients
  68. listenersLock sync.Mutex
  69. listenersCond *sync.Cond
  70. brokers map[string]map[string]bool
  71. brokersLock sync.Mutex
  72. inFlightDataSize int64
  73. inFlightDataLimitCond *sync.Cond
  74. }
  75. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  76. fs = &FilerServer{
  77. option: option,
  78. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  79. brokers: make(map[string]map[string]bool),
  80. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  81. }
  82. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  83. if len(option.Masters) == 0 {
  84. glog.Fatal("master list is required!")
  85. }
  86. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  87. fs.listenersCond.Broadcast()
  88. })
  89. fs.filer.Cipher = option.Cipher
  90. fs.checkWithMaster()
  91. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  92. go fs.filer.KeepConnectedToMaster()
  93. v := util.GetViper()
  94. if !util.LoadConfiguration("filer", false) {
  95. v.Set("leveldb2.enabled", true)
  96. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  97. _, err := os.Stat(option.DefaultLevelDbDir)
  98. if os.IsNotExist(err) {
  99. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  100. }
  101. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  102. } else {
  103. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  104. }
  105. util.LoadConfiguration("notification", false)
  106. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  107. v.SetDefault("filer.options.buckets_folder", "/buckets")
  108. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  109. // TODO deprecated, will be be removed after 2020-12-31
  110. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  111. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  112. fs.filer.LoadConfiguration(v)
  113. notification.LoadConfiguration(v, "notification.")
  114. handleStaticResources(defaultMux)
  115. if !option.DisableHttp {
  116. defaultMux.HandleFunc("/", fs.filerHandler)
  117. }
  118. if defaultMux != readonlyMux {
  119. handleStaticResources(readonlyMux)
  120. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  121. }
  122. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  123. fs.filer.LoadBuckets()
  124. fs.filer.LoadFilerConf()
  125. grace.OnInterrupt(func() {
  126. fs.filer.Shutdown()
  127. })
  128. return fs, nil
  129. }
  130. func (fs *FilerServer) checkWithMaster() {
  131. for _, master := range fs.option.Masters {
  132. _, err := pb.ParseServerToGrpcAddress(master)
  133. if err != nil {
  134. glog.Fatalf("invalid master address %s: %v", master, err)
  135. }
  136. }
  137. isConnected := false
  138. for !isConnected {
  139. for _, master := range fs.option.Masters {
  140. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  141. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  142. if err != nil {
  143. return fmt.Errorf("get master %s configuration: %v", master, err)
  144. }
  145. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  146. if fs.option.DefaultReplication == "" {
  147. fs.option.DefaultReplication = resp.DefaultReplication
  148. }
  149. return nil
  150. })
  151. if readErr == nil {
  152. isConnected = true
  153. } else {
  154. time.Sleep(7 * time.Second)
  155. }
  156. }
  157. }
  158. }