From 7597831cacb020a24a1598f553be8fdf27ddb4ec Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Jul 2020 00:05:20 -0700 Subject: [PATCH] filer: leveldb2 supports peers also --- weed/filer2/filer.go | 5 +---- weed/filer2/leveldb2/leveldb2_local_store.go | 4 ---- weed/filer2/meta_aggregator.go | 18 ++++++++++++------ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 8c2b1b33a..485571913 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -61,14 +61,11 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, func (f *Filer) AggregateFromPeers(self string, filers []string) { // set peers - if strings.HasPrefix(f.GetStore().GetName(), "leveldb") && len(filers) > 0 { - glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(filers), filers) - } if len(filers) == 0 { filers = append(filers, self) } f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self, time.Now().UnixNano()) + f.MetaAggregator.StartLoopSubscribe(f, self) } diff --git a/weed/filer2/leveldb2/leveldb2_local_store.go b/weed/filer2/leveldb2/leveldb2_local_store.go index 86aa54471..3625abf9e 100644 --- a/weed/filer2/leveldb2/leveldb2_local_store.go +++ b/weed/filer2/leveldb2/leveldb2_local_store.go @@ -7,10 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func init() { - filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) -} - var ( _ = filer2.FilerLocalStore(&LevelDB2Store{}) ) diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go index 478337ae7..d33538d60 100644 --- a/weed/filer2/meta_aggregator.go +++ b/weed/filer2/meta_aggregator.go @@ -37,25 +37,29 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg return t } -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string, lastTsNs int64) { +func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { for _, filer := range ma.filers { - go ma.subscribeToOneFiler(f, self, filer, lastTsNs) + go ma.subscribeToOneFiler(f, self, filer) } } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self string, lastTsNs int64) { +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) lastPersistTime := time.Now() changesSinceLastPersist := 0 + lastTsNs := int64(0) MaxChangeLimit := 100 if localStore, ok := f.store.actualStore.(FilerLocalStore); ok { - if prevTsNs, err := localStore.ReadOffset(filer); err == nil { - lastTsNs = prevTsNs - } if self != filer { + + if prevTsNs, err := localStore.ReadOffset(filer); err == nil { + lastTsNs = prevTsNs + } + + glog.V(0).Infof("follow filer: %v, lastTsNs=%d", filer, lastTsNs) maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { if err := Replay(f.store.actualStore, event); err != nil { glog.Errorf("failed to reply metadata change from %v: %v", filer, err) @@ -71,6 +75,8 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self strin } } } + } else { + glog.V(0).Infof("skipping following self: %v", self) } }