|
@ -22,6 +22,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
defer fs.deleteClient(clientName) |
|
|
defer fs.deleteClient(clientName) |
|
|
|
|
|
|
|
|
lastReadTime := time.Unix(0, req.SinceNs) |
|
|
lastReadTime := time.Unix(0, req.SinceNs) |
|
|
|
|
|
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) |
|
|
|
|
|
var processedTsNs int64 |
|
|
|
|
|
|
|
|
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { |
|
|
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { |
|
|
|
|
|
|
|
@ -67,6 +69,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
processedTsNs = logEntry.TsNs |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -74,6 +78,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, |
|
|
return fmt.Errorf("reading from persisted logs: %v", err) |
|
|
return fmt.Errorf("reading from persisted logs: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if processedTsNs != 0 { |
|
|
|
|
|
lastReadTime = time.Unix(0, processedTsNs) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
_, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { |
|
|
_, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { |
|
|
fs.listenersLock.Lock() |
|
|
fs.listenersLock.Lock() |
|
|
fs.listenersCond.Wait() |
|
|
fs.listenersCond.Wait() |
|
|