You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
3.9 KiB

6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
6 years ago
6 years ago
7 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/filer2"
  14. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/notification"
  23. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  24. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  25. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  28. "github.com/chrislusf/seaweedfs/weed/security"
  29. "github.com/spf13/viper"
  30. )
  31. type FilerOption struct {
  32. Masters []string
  33. Collection string
  34. DefaultReplication string
  35. RedirectOnRead bool
  36. DisableDirListing bool
  37. MaxMB int
  38. DirListingLimit int
  39. DataCenter string
  40. DefaultLevelDbDir string
  41. DisableHttp bool
  42. Port int
  43. }
  44. type FilerServer struct {
  45. option *FilerOption
  46. secret security.SigningKey
  47. filer *filer2.Filer
  48. grpcDialOption grpc.DialOption
  49. }
  50. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  51. fs = &FilerServer{
  52. option: option,
  53. grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
  54. }
  55. if len(option.Masters) == 0 {
  56. glog.Fatal("master list is required!")
  57. }
  58. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
  59. go fs.filer.KeepConnectedToMaster()
  60. v := viper.GetViper()
  61. if !util.LoadConfiguration("filer", false) {
  62. v.Set("leveldb2.enabled", true)
  63. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  64. _, err := os.Stat(option.DefaultLevelDbDir)
  65. if os.IsNotExist(err) {
  66. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  67. }
  68. }
  69. util.LoadConfiguration("notification", false)
  70. fs.filer.LoadConfiguration(v)
  71. notification.LoadConfiguration(v.Sub("notification"))
  72. handleStaticResources(defaultMux)
  73. if !option.DisableHttp {
  74. defaultMux.HandleFunc("/", fs.filerHandler)
  75. }
  76. if defaultMux != readonlyMux {
  77. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  78. }
  79. maybeStartMetrics(fs, option)
  80. return fs, nil
  81. }
  82. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  83. isConnected := false
  84. var metricsAddress string
  85. var metricsIntervalSec int
  86. var readErr error
  87. for !isConnected {
  88. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
  89. if readErr == nil {
  90. isConnected = true
  91. } else {
  92. time.Sleep(7 * time.Second)
  93. }
  94. }
  95. if metricsAddress == "" && metricsIntervalSec <= 0 {
  96. return
  97. }
  98. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  99. func() (addr string, intervalSeconds int) {
  100. return metricsAddress, metricsIntervalSec
  101. })
  102. }
  103. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  104. err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  105. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  106. if err != nil {
  107. return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
  108. }
  109. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  110. return nil
  111. })
  112. return
  113. }