You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.9 KiB

6 years ago
6 years ago
6 years ago
7 years ago
5 years ago
7 years ago
6 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/filer2"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  25. "github.com/chrislusf/seaweedfs/weed/glog"
  26. "github.com/chrislusf/seaweedfs/weed/notification"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  32. "github.com/chrislusf/seaweedfs/weed/security"
  33. )
  34. type FilerOption struct {
  35. Masters []string
  36. Collection string
  37. DefaultReplication string
  38. DisableDirListing bool
  39. MaxMB int
  40. DirListingLimit int
  41. DataCenter string
  42. DefaultLevelDbDir string
  43. DisableHttp bool
  44. Host string
  45. Port uint32
  46. recursiveDelete bool
  47. Cipher bool
  48. }
  49. type FilerServer struct {
  50. option *FilerOption
  51. secret security.SigningKey
  52. filer *filer2.Filer
  53. grpcDialOption grpc.DialOption
  54. // notifying clients
  55. listenersLock sync.Mutex
  56. listenersCond *sync.Cond
  57. }
  58. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  59. fs = &FilerServer{
  60. option: option,
  61. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  62. }
  63. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  64. if len(option.Masters) == 0 {
  65. glog.Fatal("master list is required!")
  66. }
  67. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
  68. fs.filer.Cipher = option.Cipher
  69. maybeStartMetrics(fs, option)
  70. go fs.filer.KeepConnectedToMaster()
  71. v := util.GetViper()
  72. if !util.LoadConfiguration("filer", false) {
  73. v.Set("leveldb2.enabled", true)
  74. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  75. _, err := os.Stat(option.DefaultLevelDbDir)
  76. if os.IsNotExist(err) {
  77. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  78. }
  79. }
  80. util.LoadConfiguration("notification", false)
  81. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  82. v.SetDefault("filer.options.buckets_folder", "/buckets")
  83. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  84. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  85. fs.filer.LoadConfiguration(v)
  86. notification.LoadConfiguration(v, "notification.")
  87. handleStaticResources(defaultMux)
  88. if !option.DisableHttp {
  89. defaultMux.HandleFunc("/", fs.filerHandler)
  90. }
  91. if defaultMux != readonlyMux {
  92. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  93. }
  94. fs.filer.LoadBuckets()
  95. util.OnInterrupt(func() {
  96. fs.filer.Shutdown()
  97. })
  98. return fs, nil
  99. }
  100. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  101. for _, master := range option.Masters {
  102. _, err := pb.ParseFilerGrpcAddress(master)
  103. if err != nil {
  104. glog.Fatalf("invalid master address %s: %v", master, err)
  105. }
  106. }
  107. isConnected := false
  108. var metricsAddress string
  109. var metricsIntervalSec int
  110. var readErr error
  111. for !isConnected {
  112. for _, master := range option.Masters {
  113. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  114. if readErr == nil {
  115. isConnected = true
  116. } else {
  117. time.Sleep(7 * time.Second)
  118. }
  119. }
  120. }
  121. if metricsAddress == "" && metricsIntervalSec <= 0 {
  122. return
  123. }
  124. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  125. func() (addr string, intervalSeconds int) {
  126. return metricsAddress, metricsIntervalSec
  127. })
  128. }
  129. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  130. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  131. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  132. if err != nil {
  133. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  134. }
  135. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  136. return nil
  137. })
  138. return
  139. }