diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go index 115a925e9..ac2c763e6 100644 --- a/weed/filer/filer_notify_read.go +++ b/weed/filer/filer_notify_read.go @@ -4,15 +4,16 @@ import ( "container/heap" "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "github.com/seaweedfs/seaweedfs/weed/wdclient" - "google.golang.org/protobuf/proto" "io" "math" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "google.golang.org/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -39,6 +40,19 @@ func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePositi } +func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) { + startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) + dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "") + + if listDayErr != nil { + return false, fmt.Errorf("fail to list log by day: %v", listDayErr) + } + if len(dayEntries) == 0 { + return false, nil + } + return true, nil +} + // ---------- type LogEntryItem struct { Entry *filer_pb.LogEntry @@ -103,7 +117,7 @@ func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) { if nextErr != nil { if nextErr == io.EOF { // do nothing since the filer has no more log entries - }else { + } else { return nil, fmt.Errorf("failed to get next log entry: %v", nextErr) } } else { @@ -230,7 +244,7 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) { if nextErr != nil { if nextErr == io.EOF { // do nothing since the filer has no more log entries - }else { + } else { return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err) } } else { diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 436d6746a..f4c6bfe9d 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,11 +2,12 @@ package weed_server import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "strings" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } + glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs) if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) + } else { + nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %v", err) + } + if found { + lastReadTime = position + } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() - if !fs.hasClient(req.ClientId, req.ClientEpoch) { - return false - } - return true + return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index efe42176e..30498f92d 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,7 +2,6 @@ package log_buffer import ( "bytes" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "sync" "sync/atomic" "time" @@ -11,11 +10,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" ) -const BufferSize = 4 * 1024 * 1024 -const PreviousBufferCount = 3 +const BufferSize = 8 * 1024 * 1024 +const PreviousBufferCount = 32 type dataToFlush struct { startTime time.Time diff --git a/weed/util/time.go b/weed/util/time.go new file mode 100644 index 000000000..8e237b72d --- /dev/null +++ b/weed/util/time.go @@ -0,0 +1,13 @@ +package util + +import ( + "time" +) + +func GetNextDayTsNano(curTs int64) int64 { + curTime := time.Unix(0, curTs) + nextDay := curTime.AddDate(0, 0, 1).Truncate(24 * time.Hour) + nextDayNano := nextDay.UnixNano() + + return nextDayNano +}