|
|
@ -34,6 +34,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
|
|
|
|
|
for { |
|
|
|
|
|
|
|
glog.V(0).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
|
|
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("reading from persisted logs: %v", err) |
|
|
@ -43,6 +45,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
|
lastReadTime = time.Unix(0, processedTsNs) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
|
|
lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { |
|
|
|
fs.filer.MetaAggregator.ListenersLock.Lock() |
|
|
|
fs.filer.MetaAggregator.ListenersCond.Wait() |
|
|
@ -85,6 +89,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq |
|
|
|
|
|
|
|
for { |
|
|
|
// println("reading from persisted logs ...")
|
|
|
|
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("reading from persisted logs: %v", err) |
|
|
@ -93,9 +98,8 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq |
|
|
|
if processedTsNs != 0 { |
|
|
|
lastReadTime = time.Unix(0, processedTsNs) |
|
|
|
} |
|
|
|
// glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
|
|
|
|
|
|
|
// println("reading from in memory logs ...")
|
|
|
|
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
|
|
lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { |
|
|
|
fs.listenersLock.Lock() |
|
|
|