You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
5.1 KiB

6 years ago
6 years ago
6 years ago
6 years ago
7 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/stats"
  6. "net/http"
  7. "os"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  34. "github.com/chrislusf/seaweedfs/weed/security"
  35. )
  36. type FilerOption struct {
  37. Masters []string
  38. Collection string
  39. DefaultReplication string
  40. DisableDirListing bool
  41. MaxMB int
  42. DirListingLimit int
  43. DataCenter string
  44. DefaultLevelDbDir string
  45. DisableHttp bool
  46. Host string
  47. Port uint32
  48. recursiveDelete bool
  49. Cipher bool
  50. Filers []string
  51. }
  52. type FilerServer struct {
  53. option *FilerOption
  54. secret security.SigningKey
  55. filer *filer.Filer
  56. grpcDialOption grpc.DialOption
  57. // metrics read from the master
  58. metricsAddress string
  59. metricsIntervalSec int
  60. // notifying clients
  61. listenersLock sync.Mutex
  62. listenersCond *sync.Cond
  63. brokers map[string]map[string]bool
  64. brokersLock sync.Mutex
  65. }
  66. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  67. fs = &FilerServer{
  68. option: option,
  69. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  70. brokers: make(map[string]map[string]bool),
  71. }
  72. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  73. if len(option.Masters) == 0 {
  74. glog.Fatal("master list is required!")
  75. }
  76. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
  77. fs.listenersCond.Broadcast()
  78. })
  79. fs.filer.Cipher = option.Cipher
  80. fs.checkWithMaster()
  81. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  82. go fs.filer.KeepConnectedToMaster()
  83. v := util.GetViper()
  84. if !util.LoadConfiguration("filer", false) {
  85. v.Set("leveldb2.enabled", true)
  86. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  87. _, err := os.Stat(option.DefaultLevelDbDir)
  88. if os.IsNotExist(err) {
  89. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  90. }
  91. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  92. }
  93. util.LoadConfiguration("notification", false)
  94. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  95. v.SetDefault("filer.options.buckets_folder", "/buckets")
  96. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  97. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  98. fs.filer.LoadConfiguration(v)
  99. notification.LoadConfiguration(v, "notification.")
  100. handleStaticResources(defaultMux)
  101. if !option.DisableHttp {
  102. defaultMux.HandleFunc("/", fs.filerHandler)
  103. }
  104. if defaultMux != readonlyMux {
  105. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  106. }
  107. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  108. fs.filer.LoadBuckets()
  109. grace.OnInterrupt(func() {
  110. fs.filer.Shutdown()
  111. })
  112. return fs, nil
  113. }
  114. func (fs *FilerServer) checkWithMaster() {
  115. for _, master := range fs.option.Masters {
  116. _, err := pb.ParseFilerGrpcAddress(master)
  117. if err != nil {
  118. glog.Fatalf("invalid master address %s: %v", master, err)
  119. }
  120. }
  121. isConnected := false
  122. for !isConnected {
  123. for _, master := range fs.option.Masters {
  124. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  125. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  126. if err != nil {
  127. return fmt.Errorf("get master %s configuration: %v", master, err)
  128. }
  129. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  130. if fs.option.DefaultReplication == "" {
  131. fs.option.DefaultReplication = resp.DefaultReplication
  132. }
  133. return nil
  134. })
  135. if readErr == nil {
  136. isConnected = true
  137. } else {
  138. time.Sleep(7 * time.Second)
  139. }
  140. }
  141. }
  142. }