You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.9 KiB

7 years ago
7 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/joeslay/seaweedfs/weed/operation"
  9. "github.com/joeslay/seaweedfs/weed/pb/master_pb"
  10. "github.com/joeslay/seaweedfs/weed/stats"
  11. "github.com/joeslay/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "github.com/joeslay/seaweedfs/weed/filer2"
  14. _ "github.com/joeslay/seaweedfs/weed/filer2/cassandra"
  15. _ "github.com/joeslay/seaweedfs/weed/filer2/etcd"
  16. _ "github.com/joeslay/seaweedfs/weed/filer2/leveldb"
  17. _ "github.com/joeslay/seaweedfs/weed/filer2/leveldb2"
  18. _ "github.com/joeslay/seaweedfs/weed/filer2/memdb"
  19. _ "github.com/joeslay/seaweedfs/weed/filer2/mysql"
  20. _ "github.com/joeslay/seaweedfs/weed/filer2/postgres"
  21. _ "github.com/joeslay/seaweedfs/weed/filer2/redis"
  22. "github.com/joeslay/seaweedfs/weed/glog"
  23. "github.com/joeslay/seaweedfs/weed/notification"
  24. _ "github.com/joeslay/seaweedfs/weed/notification/aws_sqs"
  25. _ "github.com/joeslay/seaweedfs/weed/notification/gocdk_pub_sub"
  26. _ "github.com/joeslay/seaweedfs/weed/notification/google_pub_sub"
  27. _ "github.com/joeslay/seaweedfs/weed/notification/kafka"
  28. _ "github.com/joeslay/seaweedfs/weed/notification/log"
  29. "github.com/joeslay/seaweedfs/weed/security"
  30. "github.com/spf13/viper"
  31. )
  32. type FilerOption struct {
  33. Masters []string
  34. Collection string
  35. DefaultReplication string
  36. RedirectOnRead bool
  37. DisableDirListing bool
  38. MaxMB int
  39. DirListingLimit int
  40. DataCenter string
  41. DefaultLevelDbDir string
  42. DisableHttp bool
  43. Port int
  44. }
  45. type FilerServer struct {
  46. option *FilerOption
  47. secret security.SigningKey
  48. filer *filer2.Filer
  49. grpcDialOption grpc.DialOption
  50. }
  51. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  52. fs = &FilerServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
  55. }
  56. if len(option.Masters) == 0 {
  57. glog.Fatal("master list is required!")
  58. }
  59. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
  60. go fs.filer.KeepConnectedToMaster()
  61. v := viper.GetViper()
  62. if !util.LoadConfiguration("filer", false) {
  63. v.Set("leveldb2.enabled", true)
  64. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  65. _, err := os.Stat(option.DefaultLevelDbDir)
  66. if os.IsNotExist(err) {
  67. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  68. }
  69. }
  70. util.LoadConfiguration("notification", false)
  71. fs.filer.LoadConfiguration(v)
  72. notification.LoadConfiguration(v.Sub("notification"))
  73. handleStaticResources(defaultMux)
  74. if !option.DisableHttp {
  75. defaultMux.HandleFunc("/", fs.filerHandler)
  76. }
  77. if defaultMux != readonlyMux {
  78. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  79. }
  80. maybeStartMetrics(fs, option)
  81. return fs, nil
  82. }
  83. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  84. isConnected := false
  85. var metricsAddress string
  86. var metricsIntervalSec int
  87. var readErr error
  88. for !isConnected {
  89. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
  90. if readErr == nil {
  91. isConnected = true
  92. } else {
  93. time.Sleep(7 * time.Second)
  94. }
  95. }
  96. if metricsAddress == "" && metricsIntervalSec <= 0 {
  97. return
  98. }
  99. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  100. func() (addr string, intervalSeconds int) {
  101. return metricsAddress, metricsIntervalSec
  102. })
  103. }
  104. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  105. err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  106. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  107. if err != nil {
  108. return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
  109. }
  110. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  111. return nil
  112. })
  113. return
  114. }