You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.1 KiB

7 years ago
7 years ago
7 years ago
6 years ago
7 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  34. "github.com/chrislusf/seaweedfs/weed/security"
  35. )
  36. type FilerOption struct {
  37. Masters []string
  38. Collection string
  39. DefaultReplication string
  40. DisableDirListing bool
  41. MaxMB int
  42. DirListingLimit int
  43. DataCenter string
  44. Rack string
  45. DefaultLevelDbDir string
  46. DisableHttp bool
  47. Host string
  48. Port uint32
  49. recursiveDelete bool
  50. Cipher bool
  51. Filers []string
  52. }
  53. type FilerServer struct {
  54. option *FilerOption
  55. secret security.SigningKey
  56. filer *filer.Filer
  57. grpcDialOption grpc.DialOption
  58. // metrics read from the master
  59. metricsAddress string
  60. metricsIntervalSec int
  61. // notifying clients
  62. listenersLock sync.Mutex
  63. listenersCond *sync.Cond
  64. brokers map[string]map[string]bool
  65. brokersLock sync.Mutex
  66. }
  67. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  68. fs = &FilerServer{
  69. option: option,
  70. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  71. brokers: make(map[string]map[string]bool),
  72. }
  73. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  74. if len(option.Masters) == 0 {
  75. glog.Fatal("master list is required!")
  76. }
  77. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
  78. fs.listenersCond.Broadcast()
  79. })
  80. fs.filer.Cipher = option.Cipher
  81. fs.checkWithMaster()
  82. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  83. go fs.filer.KeepConnectedToMaster()
  84. v := util.GetViper()
  85. if !util.LoadConfiguration("filer", false) {
  86. v.Set("leveldb2.enabled", true)
  87. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  88. _, err := os.Stat(option.DefaultLevelDbDir)
  89. if os.IsNotExist(err) {
  90. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  91. }
  92. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  93. }
  94. util.LoadConfiguration("notification", false)
  95. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  96. v.SetDefault("filer.options.buckets_folder", "/buckets")
  97. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  98. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  99. fs.filer.LoadConfiguration(v)
  100. notification.LoadConfiguration(v, "notification.")
  101. handleStaticResources(defaultMux)
  102. if !option.DisableHttp {
  103. defaultMux.HandleFunc("/", fs.filerHandler)
  104. }
  105. if defaultMux != readonlyMux {
  106. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  107. }
  108. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  109. fs.filer.LoadBuckets()
  110. grace.OnInterrupt(func() {
  111. fs.filer.Shutdown()
  112. })
  113. return fs, nil
  114. }
  115. func (fs *FilerServer) checkWithMaster() {
  116. for _, master := range fs.option.Masters {
  117. _, err := pb.ParseFilerGrpcAddress(master)
  118. if err != nil {
  119. glog.Fatalf("invalid master address %s: %v", master, err)
  120. }
  121. }
  122. isConnected := false
  123. for !isConnected {
  124. for _, master := range fs.option.Masters {
  125. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  126. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  127. if err != nil {
  128. return fmt.Errorf("get master %s configuration: %v", master, err)
  129. }
  130. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  131. if fs.option.DefaultReplication == "" {
  132. fs.option.DefaultReplication = resp.DefaultReplication
  133. }
  134. return nil
  135. })
  136. if readErr == nil {
  137. isConnected = true
  138. } else {
  139. time.Sleep(7 * time.Second)
  140. }
  141. }
  142. }
  143. }