You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
4.8 KiB

6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
6 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer2"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  24. "github.com/chrislusf/seaweedfs/weed/glog"
  25. "github.com/chrislusf/seaweedfs/weed/notification"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  31. "github.com/chrislusf/seaweedfs/weed/security"
  32. )
  33. type FilerOption struct {
  34. Masters []string
  35. Collection string
  36. DefaultReplication string
  37. DisableDirListing bool
  38. MaxMB int
  39. DirListingLimit int
  40. DataCenter string
  41. DefaultLevelDbDir string
  42. DisableHttp bool
  43. Port uint32
  44. recursiveDelete bool
  45. Cipher bool
  46. }
  47. type FilerServer struct {
  48. option *FilerOption
  49. secret security.SigningKey
  50. filer *filer2.Filer
  51. grpcDialOption grpc.DialOption
  52. // notifying clients
  53. clientChansLock sync.RWMutex
  54. clientChans map[string]chan *filer_pb.FullEventNotification
  55. }
  56. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  57. fs = &FilerServer{
  58. option: option,
  59. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  60. }
  61. if len(option.Masters) == 0 {
  62. glog.Fatal("master list is required!")
  63. }
  64. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000)
  65. fs.filer.Cipher = option.Cipher
  66. maybeStartMetrics(fs, option)
  67. go fs.filer.KeepConnectedToMaster()
  68. v := util.GetViper()
  69. if !util.LoadConfiguration("filer", false) {
  70. v.Set("leveldb2.enabled", true)
  71. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  72. _, err := os.Stat(option.DefaultLevelDbDir)
  73. if os.IsNotExist(err) {
  74. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  75. }
  76. }
  77. util.LoadConfiguration("notification", false)
  78. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  79. v.Set("filer.option.buckets_folder", "/buckets")
  80. v.Set("filer.option.queues_folder", "/queues")
  81. fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder")
  82. fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder")
  83. fs.filer.LoadConfiguration(v)
  84. notification.LoadConfiguration(v, "notification.")
  85. handleStaticResources(defaultMux)
  86. if !option.DisableHttp {
  87. defaultMux.HandleFunc("/", fs.filerHandler)
  88. }
  89. if defaultMux != readonlyMux {
  90. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  91. }
  92. fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
  93. util.OnInterrupt(func() {
  94. fs.filer.Shutdown()
  95. })
  96. return fs, nil
  97. }
  98. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  99. for _, master := range option.Masters {
  100. _, err := pb.ParseFilerGrpcAddress(master)
  101. if err != nil {
  102. glog.Fatalf("invalid master address %s: %v", master, err)
  103. }
  104. }
  105. isConnected := false
  106. var metricsAddress string
  107. var metricsIntervalSec int
  108. var readErr error
  109. for !isConnected {
  110. for _, master := range option.Masters {
  111. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  112. if readErr == nil {
  113. isConnected = true
  114. } else {
  115. time.Sleep(7 * time.Second)
  116. }
  117. }
  118. }
  119. if metricsAddress == "" && metricsIntervalSec <= 0 {
  120. return
  121. }
  122. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  123. func() (addr string, intervalSeconds int) {
  124. return metricsAddress, metricsIntervalSec
  125. })
  126. }
  127. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  128. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  129. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  130. if err != nil {
  131. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  132. }
  133. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  134. return nil
  135. })
  136. return
  137. }