You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.9 KiB

7 years ago
7 years ago
6 years ago
7 years ago
7 years ago
7 years ago
6 years ago
7 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/filer2"
  14. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/notification"
  22. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  23. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  24. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  25. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  27. "github.com/chrislusf/seaweedfs/weed/security"
  28. "github.com/spf13/viper"
  29. )
  30. type FilerOption struct {
  31. Masters []string
  32. Collection string
  33. DefaultReplication string
  34. RedirectOnRead bool
  35. DisableDirListing bool
  36. MaxMB int
  37. DirListingLimit int
  38. DataCenter string
  39. DefaultLevelDbDir string
  40. DisableHttp bool
  41. Port int
  42. }
  43. type FilerServer struct {
  44. option *FilerOption
  45. secret security.SigningKey
  46. filer *filer2.Filer
  47. grpcDialOption grpc.DialOption
  48. }
  49. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  50. fs = &FilerServer{
  51. option: option,
  52. grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
  53. }
  54. if len(option.Masters) == 0 {
  55. glog.Fatal("master list is required!")
  56. }
  57. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
  58. go fs.filer.KeepConnectedToMaster()
  59. v := viper.GetViper()
  60. if !util.LoadConfiguration("filer", false) {
  61. v.Set("leveldb.enabled", true)
  62. v.Set("leveldb.dir", option.DefaultLevelDbDir)
  63. _, err := os.Stat(option.DefaultLevelDbDir)
  64. if os.IsNotExist(err) {
  65. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  66. }
  67. }
  68. util.LoadConfiguration("notification", false)
  69. fs.filer.LoadConfiguration(v)
  70. notification.LoadConfiguration(v.Sub("notification"))
  71. handleStaticResources(defaultMux)
  72. if !option.DisableHttp {
  73. defaultMux.HandleFunc("/", fs.filerHandler)
  74. }
  75. if defaultMux != readonlyMux {
  76. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  77. }
  78. maybeStartMetrics(fs, option)
  79. return fs, nil
  80. }
  81. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  82. isConnected := false
  83. var metricsAddress string
  84. var metricsIntervalSec int
  85. var readErr error
  86. for !isConnected {
  87. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
  88. if readErr == nil {
  89. isConnected = true
  90. } else {
  91. time.Sleep(7 * time.Second)
  92. }
  93. }
  94. if metricsAddress == "" && metricsIntervalSec <= 0 {
  95. return
  96. }
  97. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  98. func() (addr string, intervalSeconds int) {
  99. return metricsAddress, metricsIntervalSec
  100. })
  101. }
  102. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  103. err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  104. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  105. if err != nil {
  106. return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
  107. }
  108. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  109. return nil
  110. })
  111. return
  112. }