You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.9 KiB

7 years ago
7 years ago
6 years ago
7 years ago
7 years ago
7 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/filer2"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  25. "github.com/chrislusf/seaweedfs/weed/glog"
  26. "github.com/chrislusf/seaweedfs/weed/notification"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  32. "github.com/chrislusf/seaweedfs/weed/security"
  33. )
  34. type FilerOption struct {
  35. Masters []string
  36. Collection string
  37. DefaultReplication string
  38. DisableDirListing bool
  39. MaxMB int
  40. DirListingLimit int
  41. DataCenter string
  42. DefaultLevelDbDir string
  43. DisableHttp bool
  44. Port uint32
  45. recursiveDelete bool
  46. Cipher bool
  47. }
  48. type FilerServer struct {
  49. option *FilerOption
  50. secret security.SigningKey
  51. filer *filer2.Filer
  52. grpcDialOption grpc.DialOption
  53. // notifying clients
  54. listenersLock sync.Mutex
  55. listenersCond *sync.Cond
  56. }
  57. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  58. fs = &FilerServer{
  59. option: option,
  60. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  61. }
  62. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  63. if len(option.Masters) == 0 {
  64. glog.Fatal("master list is required!")
  65. }
  66. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
  67. fs.filer.Cipher = option.Cipher
  68. maybeStartMetrics(fs, option)
  69. go fs.filer.KeepConnectedToMaster()
  70. v := util.GetViper()
  71. if !util.LoadConfiguration("filer", false) {
  72. v.Set("leveldb2.enabled", true)
  73. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  74. _, err := os.Stat(option.DefaultLevelDbDir)
  75. if os.IsNotExist(err) {
  76. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  77. }
  78. }
  79. util.LoadConfiguration("notification", false)
  80. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  81. v.SetDefault("filer.options.buckets_folder", "/buckets")
  82. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  83. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  84. fs.filer.LoadConfiguration(v)
  85. notification.LoadConfiguration(v, "notification.")
  86. handleStaticResources(defaultMux)
  87. if !option.DisableHttp {
  88. defaultMux.HandleFunc("/", fs.filerHandler)
  89. }
  90. if defaultMux != readonlyMux {
  91. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  92. }
  93. fs.filer.LoadBuckets()
  94. util.OnInterrupt(func() {
  95. fs.filer.Shutdown()
  96. })
  97. return fs, nil
  98. }
  99. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  100. for _, master := range option.Masters {
  101. _, err := pb.ParseFilerGrpcAddress(master)
  102. if err != nil {
  103. glog.Fatalf("invalid master address %s: %v", master, err)
  104. }
  105. }
  106. isConnected := false
  107. var metricsAddress string
  108. var metricsIntervalSec int
  109. var readErr error
  110. for !isConnected {
  111. for _, master := range option.Masters {
  112. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  113. if readErr == nil {
  114. isConnected = true
  115. } else {
  116. time.Sleep(7 * time.Second)
  117. }
  118. }
  119. }
  120. if metricsAddress == "" && metricsIntervalSec <= 0 {
  121. return
  122. }
  123. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  124. func() (addr string, intervalSeconds int) {
  125. return metricsAddress, metricsIntervalSec
  126. })
  127. }
  128. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  129. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  130. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  131. if err != nil {
  132. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  133. }
  134. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  135. return nil
  136. })
  137. return
  138. }