From 2baed2e1e995ad331985a7b8c359e732b223ad3a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 Sep 2021 01:18:44 -0700 Subject: [PATCH] avoid possible metadata subscription data loss Previous implementation append filer logs into one file. So one file is not always sorted, which can lead to miss reading some entries, especially when different filers have different write throughput. --- weed/filer/filer.go | 2 ++ weed/filer/filer_notify.go | 12 +++++++----- .../messaging/broker/broker_grpc_server_subscribe.go | 6 ++++-- weed/util/file_util.go | 8 ++++++++ 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index f13782031..76d2f3f47 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -44,6 +44,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + UniqueFileId uint32 } func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, @@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), + UniqueFileId: uint32(util.RandomInt32()), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 7ab101102..e44ddfd59 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "strings" "time" @@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { startTime, stopTime = startTime.UTC(), stopTime.UTC() - targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId, // startTime.Second(), startTime.Nanosecond(), ) @@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( } for _, dayEntry := range dayEntries { // println("checking day", dayEntry.FullPath) - hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "") + hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") if listHourMinuteErr != nil { return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } for _, hourMinuteEntry := range hourMinuteEntries { // println("checking hh-mm", hourMinuteEntry.FullPath) if dayEntry.Name() == startDate { - if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name()) + if strings.Compare(hourMinute, startHourMinute) < 0 { continue } } diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index d21fb351f..f07a961db 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -2,6 +2,7 @@ package broker import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "io" "strings" @@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() @@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { if dayEntry.Name == startDate { - if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + hourMinute := util.FileNameBase(hourMinuteEntry.Name) + if strings.Compare(hourMinute, startHourMinute) < 0 { return nil } } diff --git a/weed/util/file_util.go b/weed/util/file_util.go index f83f80265..f9cc4f70b 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -87,3 +87,11 @@ func ResolvePath(path string) string { return path } + +func FileNameBase(filename string) string { + lastDotIndex := strings.LastIndex(filename, ".") + if lastDotIndex < 0 { + return filename + } + return filename[:lastDotIndex] +}