diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 8fa3eec2d..db78b3d3d 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -87,7 +87,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica } -func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { +func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) { if len(buf) == 0 { return @@ -114,7 +114,7 @@ var ( VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`) ) -func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) { +func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) { startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute()) @@ -177,7 +177,7 @@ func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, return lastTsNs, isDone, nil } -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) { for { n, err := r.Read(sizeBuf) if err != nil { @@ -207,7 +207,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, ea return lastTsNs, err } // println("each log: ", logEntry.TsNs) - if err := eachLogEntryFn(logEntry); err != nil { + if _, err := eachLogEntryFn(logEntry); err != nil { return lastTsNs, err } else { lastTsNs = logEntry.TsNs diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index da61ed05d..ade94caf3 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -105,7 +105,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest } return true - }, func(logEntry *filer_pb.LogEntry) error { + }, func(logEntry *filer_pb.LogEntry) (bool, error) { // reset the sleep interval count sleepIntervalCount = 0 @@ -118,10 +118,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }, }}); err != nil { glog.Errorf("Error sending setup response: %v", err) - return err + return false, err } counter++ - return nil + return false, nil }) } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 0f53c28c3..4ebb62000 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -19,7 +19,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) - return func(startTime, stopTime time.Time, buf []byte) { + return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) { if len(buf) == 0 { return } @@ -75,7 +75,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p return } - if err = eachLogEntryFn(logEntry); err != nil { + if _, err = eachLogEntryFn(logEntry); err != nil { err = fmt.Errorf("process log entry %v: %v", logEntry, err) return } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 22929879e..8e8b4e5c4 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -178,19 +178,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } -func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { - return func(logEntry *filer_pb.LogEntry) error { +func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType { + return func(logEntry *filer_pb.LogEntry) (bool, error) { event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) } if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { - return err + return false, err } - return nil + return false, nil } } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 273df5593..cfd6c94cd 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -22,8 +22,8 @@ type dataToFlush struct { data *bytes.Buffer } -type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) error -type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte) +type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) +type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { @@ -146,7 +146,7 @@ func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) - logBuffer.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here logBuffer.lastFlushTime = d.stopTime diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 8a4d2d851..5529a6691 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -30,7 +30,7 @@ func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition { } func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64, - waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadPosition MessagePosition, isDone bool, err error) { + waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer var batchIndex int64 @@ -69,6 +69,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition if waitForDataFn() { continue } else { + isDone = true return } } @@ -101,10 +102,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) - if err = eachLogDataFn(logEntry); err != nil { + if isDone, err = eachLogDataFn(logEntry); err != nil { glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) return } + if isDone { + return + } pos += 4 + int(size) batchSize++