From 590f02179d8837af4e81d36882395d7f728333eb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Nov 2020 14:06:03 -0800 Subject: [PATCH] filer: load filer conf when starting --- weed/filer/filer.go | 2 ++ weed/filer/filer_conf.go | 60 ++++++++++++++++++++++++++----- weed/filer/filer_conf_test.go | 6 ++-- weed/filer/filer_on_meta_event.go | 31 +++++++++------- weed/server/filer_server.go | 2 ++ 5 files changed, 77 insertions(+), 24 deletions(-) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 0c4d610c5..105c8e04f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -41,6 +41,7 @@ type Filer struct { metaLogReplication string MetaAggregator *MetaAggregator Signature int32 + FilerConf *FilerConf } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, + FilerConf: NewFilerConf(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 079794def..737b1aeea 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -1,48 +1,92 @@ package filer import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "github.com/viant/ptrie" ) + +const ( + DirectoryEtc = "/etc" + FilerConfName = "filer.conf" +) + type FilerConf struct { rules ptrie.Trie } -func NewFilerConf(data []byte) (fc *FilerConf) { +func NewFilerConf() (fc *FilerConf) { fc = &FilerConf{ rules: ptrie.New(), } + return fc +} + +func (fc *FilerConf) loadFromFiler(filer *Filer) (err error){ + filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName) + entry, err := filer.FindEntry(context.Background(), filerConfPath) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + glog.Errorf("read filer conf entry %s: %v", filerConfPath, err) + return + } + + return fc.loadFromChunks(filer, entry.Chunks) +} + +func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) { + data, err := filer.readEntry(chunks) + if err != nil { + glog.Errorf("read filer conf content: %v", err) + return + } + + return fc.loadFromBytes(data) +} + +func (fc *FilerConf) loadFromBytes(data []byte) (err error) { conf := &filer_pb.FilerConf{} - err := proto.UnmarshalText(string(data), conf) + err = proto.UnmarshalText(string(data), conf) if err != nil { glog.Errorf("unable to parse filer conf: %v", err) - return + // this is not recoverable + return nil } - fc.doLoadConf(conf) - return fc + return fc.doLoadConf(conf) } -func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) { +func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) { for _, location := range conf.Locations { - err := fc.rules.Put([]byte(location.LocationPrefix), location) + err = fc.rules.Put([]byte(location.LocationPrefix), location) if err != nil { glog.Errorf("put location prefix: %v", err) + // this is not recoverable + return nil } } + return nil } +var ( + EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{} +) + func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf){ fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { pathConf = value.(*filer_pb.FilerConf_PathConf) return true }) if pathConf == nil { - return &filer_pb.FilerConf_PathConf{} + return EmptyFilerConfPathConf } return pathConf } diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go index 6a7b875d3..1bfe8bcfe 100644 --- a/weed/filer/filer_conf_test.go +++ b/weed/filer/filer_conf_test.go @@ -5,14 +5,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/stretchr/testify/assert" - "github.com/viant/ptrie" ) func TestFilerConf(t *testing.T) { - fc := &FilerConf{ - rules: ptrie.New(), - } + fc := NewFilerConf() + conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{ { LocationPrefix: "/buckets/abc", diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index f0b64f4e2..3de27da6e 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -6,10 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -const ( - DirectoryEtc = "/etc" + "github.com/chrislusf/seaweedfs/weed/util" ) // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers @@ -26,15 +23,15 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) } glog.V(0).Infof("procesing %v", event) - if entry.Name == "filer.conf" { + if entry.Name == FilerConfName { f.reloadFilerConfiguration(entry) } } -func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){ +func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { var buf bytes.Buffer - err := StreamContent(f.MasterClient, &buf, entry.Chunks, 0, math.MaxInt64) + err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) if err != nil { return nil, err } @@ -42,13 +39,23 @@ func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){ } func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) { - data, err := f.readEntry(entry) + fc := NewFilerConf() + err := fc.loadFromChunks(f, entry.Chunks) if err != nil { - glog.Warningf("read entry %s: %v", entry.Name, err) + glog.Errorf("read filer conf chunks: %v", err) return - } + f.FilerConf = fc +} - println(string(data)) - +func (f *Filer) LoadFilerConf() { + fc := NewFilerConf() + err := util.Retry("loadFilerConf", func() error { + return fc.loadFromFiler(f) + }) + if err != nil { + glog.Errorf("read filer conf: %v", err) + return + } + f.FilerConf = fc } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index dc93ae062..ba2d9989c 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -131,6 +131,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.LoadBuckets() + fs.filer.LoadFilerConf() + grace.OnInterrupt(func() { fs.filer.Shutdown() })