You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
5.3 KiB

7 years ago
7 years ago
6 years ago
7 years ago
6 years ago
7 years ago
5 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/util/grace"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  34. "github.com/chrislusf/seaweedfs/weed/security"
  35. )
  36. type FilerOption struct {
  37. Masters []string
  38. Collection string
  39. DefaultReplication string
  40. DisableDirListing bool
  41. MaxMB int
  42. DirListingLimit int
  43. DataCenter string
  44. DefaultLevelDbDir string
  45. DisableHttp bool
  46. Host string
  47. Port uint32
  48. recursiveDelete bool
  49. Cipher bool
  50. Filers []string
  51. }
  52. type FilerServer struct {
  53. option *FilerOption
  54. secret security.SigningKey
  55. filer *filer.Filer
  56. grpcDialOption grpc.DialOption
  57. // notifying clients
  58. listenersLock sync.Mutex
  59. listenersCond *sync.Cond
  60. brokers map[string]map[string]bool
  61. brokersLock sync.Mutex
  62. }
  63. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  64. fs = &FilerServer{
  65. option: option,
  66. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  67. brokers: make(map[string]map[string]bool),
  68. }
  69. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  70. if len(option.Masters) == 0 {
  71. glog.Fatal("master list is required!")
  72. }
  73. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
  74. fs.listenersCond.Broadcast()
  75. })
  76. fs.filer.Cipher = option.Cipher
  77. maybeStartMetrics(fs, option)
  78. go fs.filer.KeepConnectedToMaster()
  79. v := util.GetViper()
  80. if !util.LoadConfiguration("filer", false) {
  81. v.Set("leveldb2.enabled", true)
  82. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  83. _, err := os.Stat(option.DefaultLevelDbDir)
  84. if os.IsNotExist(err) {
  85. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  86. }
  87. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  88. }
  89. util.LoadConfiguration("notification", false)
  90. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  91. v.SetDefault("filer.options.buckets_folder", "/buckets")
  92. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  93. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  94. fs.filer.LoadConfiguration(v)
  95. notification.LoadConfiguration(v, "notification.")
  96. handleStaticResources(defaultMux)
  97. if !option.DisableHttp {
  98. defaultMux.HandleFunc("/", fs.filerHandler)
  99. }
  100. if defaultMux != readonlyMux {
  101. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  102. }
  103. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  104. fs.filer.LoadBuckets()
  105. grace.OnInterrupt(func() {
  106. fs.filer.Shutdown()
  107. })
  108. return fs, nil
  109. }
  110. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  111. for _, master := range option.Masters {
  112. _, err := pb.ParseFilerGrpcAddress(master)
  113. if err != nil {
  114. glog.Fatalf("invalid master address %s: %v", master, err)
  115. }
  116. }
  117. isConnected := false
  118. var metricsAddress string
  119. var metricsIntervalSec int
  120. var readErr error
  121. for !isConnected {
  122. for _, master := range option.Masters {
  123. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  124. if readErr == nil {
  125. isConnected = true
  126. } else {
  127. time.Sleep(7 * time.Second)
  128. }
  129. }
  130. }
  131. if metricsAddress == "" && metricsIntervalSec <= 0 {
  132. return
  133. }
  134. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  135. func() (addr string, intervalSeconds int) {
  136. return metricsAddress, metricsIntervalSec
  137. })
  138. }
  139. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  140. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  141. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  142. if err != nil {
  143. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  144. }
  145. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  146. return nil
  147. })
  148. return
  149. }