Browse Source

log reading adds delay between retries

pull/2174/head
Chris Lu 4 years ago
parent
commit
2420c60fc4
  1. 57
      weed/server/filer_grpc_server_sub_meta.go
  2. 2
      weed/util/log_buffer/log_buffer.go

57
weed/server/filer_grpc_server_sub_meta.go

@ -30,44 +30,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64 var processedTsNs int64
var err error
var readPersistedLogErr error
var readInMemoryLogErr error
for { for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
} }
if processedTsNs != 0 { if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs) lastReadTime = time.Unix(0, processedTsNs)
} else {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
continue
}
} }
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock() fs.filer.MetaAggregator.ListenersLock.Unlock()
return true return true
}, eachLogEntryFn) }, eachLogEntryFn)
if err != nil {
if err == log_buffer.ResumeFromDiskError {
time.Sleep(5127 * time.Millisecond)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue continue
} }
glog.Errorf("processed to %v: %v", lastReadTime, err)
if err != log_buffer.ResumeError {
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
if readInMemoryLogErr != log_buffer.ResumeError {
break break
} }
} }
time.Sleep(5127 * time.Millisecond)
time.Sleep(1127 * time.Millisecond)
} }
return err
return readInMemoryLogErr
} }
@ -87,41 +92,47 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64 var processedTsNs int64
var err error
var readPersistedLogErr error
var readInMemoryLogErr error
for { for {
// println("reading from persisted logs ...") // println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
} }
if processedTsNs != 0 { if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs) lastReadTime = time.Unix(0, processedTsNs)
} else {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
continue
}
} }
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
fs.listenersLock.Lock() fs.listenersLock.Lock()
fs.listenersCond.Wait() fs.listenersCond.Wait()
fs.listenersLock.Unlock() fs.listenersLock.Unlock()
return true return true
}, eachLogEntryFn) }, eachLogEntryFn)
if err != nil {
if err == log_buffer.ResumeFromDiskError {
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue continue
} }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
time.Sleep(1127 * time.Millisecond)
if readInMemoryLogErr != log_buffer.ResumeError {
break break
} }
} }
} }
return err
return readInMemoryLogErr
} }

2
weed/util/log_buffer/log_buffer.go

@ -170,6 +170,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
m.lastFlushTime = m.stopTime m.lastFlushTime = m.stopTime
} }
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
m.startTime = time.Unix(0,0)
m.stopTime = time.Unix(0,0)
m.pos = 0 m.pos = 0
m.idx = m.idx[:0] m.idx = m.idx[:0]
return d return d

Loading…
Cancel
Save