|
|
package weed_server
import ( "context" "fmt" "net/http" "os" "sync" "time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" "github.com/chrislusf/seaweedfs/weed/security" )
type FilerOption struct { Masters []string Collection string DefaultReplication string DisableDirListing bool MaxMB int DirListingLimit int DataCenter string DefaultLevelDbDir string DisableHttp bool Host string Port uint32 recursiveDelete bool Cipher bool Filers []string }
type FilerServer struct { option *FilerOption secret security.SigningKey filer *filer.Filer grpcDialOption grpc.DialOption
// notifying clients
listenersLock sync.Mutex listenersCond *sync.Cond
brokers map[string]map[string]bool brokersLock sync.Mutex }
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), brokers: make(map[string]map[string]bool), } fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 { glog.Fatal("master list is required!") }
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher
maybeStartMetrics(fs, option)
go fs.filer.KeepConnectedToMaster()
v := util.GetViper() if !util.LoadConfiguration("filer", false) { v.Set("leveldb2.enabled", true) v.Set("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) } glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) } util.LoadConfiguration("notification", false)
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.SetDefault("filer.options.buckets_folder", "/buckets") fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux) if !option.DisableHttp { defaultMux.HandleFunc("/", fs.filerHandler) } if defaultMux != readonlyMux { readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) }
fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
fs.filer.LoadBuckets()
grace.OnInterrupt(func() { fs.filer.Shutdown() })
return fs, nil }
func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
for _, master := range option.Masters { _, err := pb.ParseFilerGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) } }
isConnected := false var metricsAddress string var metricsIntervalSec int var readErr error for !isConnected { for _, master := range option.Masters { metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) if readErr == nil { isConnected = true } else { time.Sleep(7 * time.Second) } } } if metricsAddress == "" && metricsIntervalSec <= 0 { return } go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, metricsAddress, metricsIntervalSec) }
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) } metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) return nil }) return }
|