You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
5.7 KiB

6 years ago
6 years ago
6 years ago
7 years ago
5 years ago
7 years ago
6 years ago
6 years ago
7 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/stats"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/filer2"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  34. "github.com/chrislusf/seaweedfs/weed/security"
  35. )
  36. type FilerOption struct {
  37. Masters []string
  38. Collection string
  39. DefaultReplication string
  40. DisableDirListing bool
  41. MaxMB int
  42. DirListingLimit int
  43. DataCenter string
  44. DefaultLevelDbDir string
  45. DisableHttp bool
  46. Host string
  47. Port uint32
  48. recursiveDelete bool
  49. Cipher bool
  50. Filers []string
  51. }
  52. type FilerServer struct {
  53. option *FilerOption
  54. secret security.SigningKey
  55. filer *filer2.Filer
  56. metaAggregator *filer2.MetaAggregator
  57. grpcDialOption grpc.DialOption
  58. // notifying clients
  59. listenersLock sync.Mutex
  60. listenersCond *sync.Cond
  61. brokers map[string]map[string]bool
  62. brokersLock sync.Mutex
  63. }
  64. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  65. fs = &FilerServer{
  66. option: option,
  67. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  68. brokers: make(map[string]map[string]bool),
  69. }
  70. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  71. if len(option.Masters) == 0 {
  72. glog.Fatal("master list is required!")
  73. }
  74. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
  75. fs.listenersCond.Broadcast()
  76. })
  77. fs.filer.Cipher = option.Cipher
  78. maybeStartMetrics(fs, option)
  79. go fs.filer.KeepConnectedToMaster()
  80. v := util.GetViper()
  81. if !util.LoadConfiguration("filer", false) {
  82. v.Set("leveldb2.enabled", true)
  83. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  84. _, err := os.Stat(option.DefaultLevelDbDir)
  85. if os.IsNotExist(err) {
  86. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  87. }
  88. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  89. }
  90. util.LoadConfiguration("notification", false)
  91. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  92. v.SetDefault("filer.options.buckets_folder", "/buckets")
  93. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  94. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  95. fs.filer.LoadConfiguration(v)
  96. notification.LoadConfiguration(v, "notification.")
  97. handleStaticResources(defaultMux)
  98. if !option.DisableHttp {
  99. defaultMux.HandleFunc("/", fs.filerHandler)
  100. }
  101. if defaultMux != readonlyMux {
  102. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  103. }
  104. // set peers
  105. if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 {
  106. glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers)
  107. }
  108. if len(option.Filers) == 0 {
  109. option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port))
  110. }
  111. fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption)
  112. fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano())
  113. fs.filer.LoadBuckets()
  114. grace.OnInterrupt(func() {
  115. fs.filer.Shutdown()
  116. })
  117. return fs, nil
  118. }
  119. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  120. for _, master := range option.Masters {
  121. _, err := pb.ParseFilerGrpcAddress(master)
  122. if err != nil {
  123. glog.Fatalf("invalid master address %s: %v", master, err)
  124. }
  125. }
  126. isConnected := false
  127. var metricsAddress string
  128. var metricsIntervalSec int
  129. var readErr error
  130. for !isConnected {
  131. for _, master := range option.Masters {
  132. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  133. if readErr == nil {
  134. isConnected = true
  135. } else {
  136. time.Sleep(7 * time.Second)
  137. }
  138. }
  139. }
  140. if metricsAddress == "" && metricsIntervalSec <= 0 {
  141. return
  142. }
  143. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  144. func() (addr string, intervalSeconds int) {
  145. return metricsAddress, metricsIntervalSec
  146. })
  147. }
  148. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  149. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  150. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  151. if err != nil {
  152. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  153. }
  154. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  155. return nil
  156. })
  157. return
  158. }