From 55e40b08fc5f796c8d315b100a9c29dcf3a092e6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Jul 2020 15:43:06 -0700 Subject: [PATCH] refactoring --- weed/filer2/filer_notify.go | 30 +++---- .../broker/broker_grpc_server_subscribe.go | 4 +- weed/server/filer_grpc_server_sub_meta.go | 78 ++++++++++--------- 3 files changed, 60 insertions(+), 52 deletions(-) diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 0bde61035..ceb3677ce 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -83,7 +83,7 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { } } -func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { +func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) @@ -93,13 +93,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366) if listDayErr != nil { - return fmt.Errorf("fail to list log by day: %v", listDayErr) + return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) } for _, dayEntry := range dayEntries { // println("checking day", dayEntry.FullPath) hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60) if listHourMinuteErr != nil { - return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) + return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } for _, hourMinuteEntry := range hourMinuteEntries { // println("checking hh-mm", hourMinuteEntry.FullPath) @@ -110,49 +110,51 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( } // println("processing", hourMinuteEntry.FullPath) chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) - if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { break } - return fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) + return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) } chunkedFileReader.Close() } } - return nil + return lastTsNs, nil } -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { for { n, err := r.Read(sizeBuf) if err != nil { - return err + return lastTsNs, err } if n != 4 { - return fmt.Errorf("size %d bytes, expected 4 bytes", n) + return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n) } size := util.BytesToUint32(sizeBuf) // println("entry size", size) entryData := make([]byte, size) n, err = r.Read(entryData) if err != nil { - return err + return lastTsNs, err } if n != int(size) { - return fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) + return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) } logEntry := &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { - return err + return lastTsNs, err } if logEntry.TsNs <= ns { - return nil + return lastTsNs, nil } // println("each log: ", logEntry.TsNs) if err := eachLogEntryFn(logEntry); err != nil { - return err + return lastTsNs, err + } else { + lastTsNs = logEntry.TsNs } } } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 9538d3063..9a7d653b5 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -100,7 +100,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return nil } - if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { if err != io.EOF { // println("stopping from persisted logs", err.Error()) return err @@ -148,7 +148,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name) chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) defer chunkedFileReader.Close() - if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if _, err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { return err diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 848a1fc3a..d069a45c2 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -23,9 +23,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - var processedTsNs int64 - eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + + eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + + err = fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + + return err + +} + +func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { + return func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { + return err + } + + return nil + } +} + +func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { // get complete path to the file or directory var entryName string @@ -57,40 +97,6 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } return nil } - - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - event := &filer_pb.SubscribeMetadataResponse{} - if err := proto.Unmarshal(logEntry.Data, event); err != nil { - glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - } - - if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { - return err - } - - processedTsNs = logEntry.TsNs - - return nil - } - - if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - - err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) - - return err - } func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {