From 4f6096c7f08d1a2468451684d44675310512524d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Jul 2020 22:55:28 -0700 Subject: [PATCH] add reading from persisted logs for local filer store --- weed/filer2/meta_aggregator.go | 4 ++-- weed/server/filer_grpc_server_sub_meta.go | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go index a4f0c3007..8ce0d2dab 100644 --- a/weed/filer2/meta_aggregator.go +++ b/weed/filer2/meta_aggregator.go @@ -59,7 +59,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin lastTsNs = prevTsNs } - glog.V(0).Infof("follow filer: %v, lastTsNs=%d", filer, lastTsNs) + glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { if err := Replay(f.Store.ActualStore, event); err != nil { glog.Errorf("failed to reply metadata change from %v: %v", filer, err) @@ -98,7 +98,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin for { err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer", + ClientName: "filer:"+self, PathPrefix: "/", SinceNs: lastTsNs, }) diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 4fd38abe5..4341f2091 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -63,6 +63,20 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok { + // println("reading from persisted logs ...") + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + } + + // println("reading from in memory logs ...") err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() @@ -117,6 +131,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file EventNotification: eventNotification, TsNs: tsNs, } + // println("sending", dirPath, entryName) if err := stream.Send(message); err != nil { glog.V(0).Infof("=> client %v: %+v", clientName, err) return err