You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
5.1 KiB

6 years ago
6 years ago
6 years ago
7 years ago
5 years ago
7 years ago
6 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/util/grace"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer2"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  26. "github.com/chrislusf/seaweedfs/weed/glog"
  27. "github.com/chrislusf/seaweedfs/weed/notification"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  33. "github.com/chrislusf/seaweedfs/weed/security"
  34. )
  35. type FilerOption struct {
  36. Masters []string
  37. Collection string
  38. DefaultReplication string
  39. DisableDirListing bool
  40. MaxMB int
  41. DirListingLimit int
  42. DataCenter string
  43. DefaultLevelDbDir string
  44. DisableHttp bool
  45. Host string
  46. Port uint32
  47. recursiveDelete bool
  48. Cipher bool
  49. }
  50. type FilerServer struct {
  51. option *FilerOption
  52. secret security.SigningKey
  53. filer *filer2.Filer
  54. grpcDialOption grpc.DialOption
  55. // notifying clients
  56. listenersLock sync.Mutex
  57. listenersCond *sync.Cond
  58. brokers map[string]map[string]bool
  59. brokersLock sync.Mutex
  60. }
  61. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  62. fs = &FilerServer{
  63. option: option,
  64. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  65. brokers: make(map[string]map[string]bool),
  66. }
  67. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  68. if len(option.Masters) == 0 {
  69. glog.Fatal("master list is required!")
  70. }
  71. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
  72. fs.filer.Cipher = option.Cipher
  73. maybeStartMetrics(fs, option)
  74. go fs.filer.KeepConnectedToMaster()
  75. v := util.GetViper()
  76. if !util.LoadConfiguration("filer", false) {
  77. v.Set("leveldb2.enabled", true)
  78. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  79. _, err := os.Stat(option.DefaultLevelDbDir)
  80. if os.IsNotExist(err) {
  81. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  82. }
  83. }
  84. util.LoadConfiguration("notification", false)
  85. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  86. v.SetDefault("filer.options.buckets_folder", "/buckets")
  87. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  88. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  89. fs.filer.LoadConfiguration(v)
  90. notification.LoadConfiguration(v, "notification.")
  91. handleStaticResources(defaultMux)
  92. if !option.DisableHttp {
  93. defaultMux.HandleFunc("/", fs.filerHandler)
  94. }
  95. if defaultMux != readonlyMux {
  96. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  97. }
  98. fs.filer.LoadBuckets()
  99. grace.OnInterrupt(func() {
  100. fs.filer.Shutdown()
  101. })
  102. return fs, nil
  103. }
  104. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  105. for _, master := range option.Masters {
  106. _, err := pb.ParseFilerGrpcAddress(master)
  107. if err != nil {
  108. glog.Fatalf("invalid master address %s: %v", master, err)
  109. }
  110. }
  111. isConnected := false
  112. var metricsAddress string
  113. var metricsIntervalSec int
  114. var readErr error
  115. for !isConnected {
  116. for _, master := range option.Masters {
  117. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  118. if readErr == nil {
  119. isConnected = true
  120. } else {
  121. time.Sleep(7 * time.Second)
  122. }
  123. }
  124. }
  125. if metricsAddress == "" && metricsIntervalSec <= 0 {
  126. return
  127. }
  128. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  129. func() (addr string, intervalSeconds int) {
  130. return metricsAddress, metricsIntervalSec
  131. })
  132. }
  133. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  134. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  135. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  136. if err != nil {
  137. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  138. }
  139. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  140. return nil
  141. })
  142. return
  143. }