Browse Source

avoid possible metadata subscription data loss

Previous implementation append filer logs into one file. So one file is not always sorted, which can lead to miss reading some entries, especially when different filers have different write throughput.
pull/2348/head
Chris Lu 3 years ago
parent
commit
2baed2e1e9
  1. 2
      weed/filer/filer.go
  2. 12
      weed/filer/filer_notify.go
  3. 6
      weed/messaging/broker/broker_grpc_server_subscribe.go
  4. 8
      weed/util/file_util.go

2
weed/filer/filer.go

@ -44,6 +44,7 @@ type Filer struct {
Signature int32 Signature int32
FilerConf *FilerConf FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage RemoteStorage *FilerRemoteStorage
UniqueFileId uint32
} }
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(), FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(), RemoteStorage: NewFilerRemoteStorage(),
UniqueFileId: uint32(util.RandomInt32()),
} }
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection f.metaLogCollection = collection

12
weed/filer/filer_notify.go

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math"
"strings" "strings"
"time" "time"
@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
startTime, stopTime = startTime.UTC(), stopTime.UTC() startTime, stopTime = startTime.UTC(), stopTime.UTC()
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
// startTime.Second(), startTime.Nanosecond(), // startTime.Second(), startTime.Nanosecond(),
) )
@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
startTime = startTime.UTC() startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4) sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano() startTsNs := startTime.UnixNano()
@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
} }
for _, dayEntry := range dayEntries { for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath) // println("checking day", dayEntry.FullPath)
hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil { if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
} }
for _, hourMinuteEntry := range hourMinuteEntries { for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath) // println("checking hh-mm", hourMinuteEntry.FullPath)
if dayEntry.Name() == startDate { if dayEntry.Name() == startDate {
if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
hourMinute := util.FileNameBase(hourMinuteEntry.Name())
if strings.Compare(hourMinute, startHourMinute) < 0 {
continue continue
} }
} }

6
weed/messaging/broker/broker_grpc_server_subscribe.go

@ -2,6 +2,7 @@ package broker
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer" "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"io" "io"
"strings" "strings"
@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startTime = startTime.UTC() startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4) sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano() startTsNs := startTime.UnixNano()
@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
if dayEntry.Name == startDate { if dayEntry.Name == startDate {
if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
hourMinute := util.FileNameBase(hourMinuteEntry.Name)
if strings.Compare(hourMinute, startHourMinute) < 0 {
return nil return nil
} }
} }

8
weed/util/file_util.go

@ -87,3 +87,11 @@ func ResolvePath(path string) string {
return path return path
} }
func FileNameBase(filename string) string {
lastDotIndex := strings.LastIndex(filename, ".")
if lastDotIndex < 0 {
return filename
}
return filename[:lastDotIndex]
}
Loading…
Cancel
Save