You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
4.9 KiB

7 years ago
7 years ago
6 years ago
7 years ago
5 years ago
7 years ago
7 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/util/grace"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer2"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  26. "github.com/chrislusf/seaweedfs/weed/glog"
  27. "github.com/chrislusf/seaweedfs/weed/notification"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  33. "github.com/chrislusf/seaweedfs/weed/security"
  34. )
  35. type FilerOption struct {
  36. Masters []string
  37. Collection string
  38. DefaultReplication string
  39. DisableDirListing bool
  40. MaxMB int
  41. DirListingLimit int
  42. DataCenter string
  43. DefaultLevelDbDir string
  44. DisableHttp bool
  45. Host string
  46. Port uint32
  47. recursiveDelete bool
  48. Cipher bool
  49. }
  50. type FilerServer struct {
  51. option *FilerOption
  52. secret security.SigningKey
  53. filer *filer2.Filer
  54. grpcDialOption grpc.DialOption
  55. // notifying clients
  56. listenersLock sync.Mutex
  57. listenersCond *sync.Cond
  58. }
  59. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  60. fs = &FilerServer{
  61. option: option,
  62. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  63. }
  64. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  65. if len(option.Masters) == 0 {
  66. glog.Fatal("master list is required!")
  67. }
  68. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
  69. fs.filer.Cipher = option.Cipher
  70. maybeStartMetrics(fs, option)
  71. go fs.filer.KeepConnectedToMaster()
  72. v := util.GetViper()
  73. if !util.LoadConfiguration("filer", false) {
  74. v.Set("leveldb2.enabled", true)
  75. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  76. _, err := os.Stat(option.DefaultLevelDbDir)
  77. if os.IsNotExist(err) {
  78. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  79. }
  80. }
  81. util.LoadConfiguration("notification", false)
  82. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  83. v.SetDefault("filer.options.buckets_folder", "/buckets")
  84. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  85. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  86. fs.filer.LoadConfiguration(v)
  87. notification.LoadConfiguration(v, "notification.")
  88. handleStaticResources(defaultMux)
  89. if !option.DisableHttp {
  90. defaultMux.HandleFunc("/", fs.filerHandler)
  91. }
  92. if defaultMux != readonlyMux {
  93. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  94. }
  95. fs.filer.LoadBuckets()
  96. grace.OnInterrupt(func() {
  97. fs.filer.Shutdown()
  98. })
  99. return fs, nil
  100. }
  101. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  102. for _, master := range option.Masters {
  103. _, err := pb.ParseFilerGrpcAddress(master)
  104. if err != nil {
  105. glog.Fatalf("invalid master address %s: %v", master, err)
  106. }
  107. }
  108. isConnected := false
  109. var metricsAddress string
  110. var metricsIntervalSec int
  111. var readErr error
  112. for !isConnected {
  113. for _, master := range option.Masters {
  114. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  115. if readErr == nil {
  116. isConnected = true
  117. } else {
  118. time.Sleep(7 * time.Second)
  119. }
  120. }
  121. }
  122. if metricsAddress == "" && metricsIntervalSec <= 0 {
  123. return
  124. }
  125. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  126. func() (addr string, intervalSeconds int) {
  127. return metricsAddress, metricsIntervalSec
  128. })
  129. }
  130. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  131. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  132. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  133. if err != nil {
  134. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  135. }
  136. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  137. return nil
  138. })
  139. return
  140. }