Browse Source

filer.sync: fix checkpoint not being saved properly (#7719)

* filer.sync: fix race condition on first checkpoint save

Initialize lastWriteTime to time.Now() instead of zero time to prevent
the first checkpoint save from being triggered immediately when the
first event arrives. This gives async jobs time to complete and update
the watermark before the checkpoint is saved.

Previously, the zero time caused lastWriteTime.Add(3s).Before(now) to
be true on the first event, triggering an immediate checkpoint save
attempt. But since jobs are processed asynchronously, the watermark
was still 0 (initial value), causing the save to be skipped due to
the 'if offsetTsNs == 0 { return nil }' check.

Fixes #7717

* filer.sync: save checkpoint on graceful shutdown

Add graceful shutdown handling to save the final checkpoint when
filer.sync is terminated. Previously, any sync progress within the
last 3-second checkpoint interval would be lost on shutdown.

Changes:
- Add syncState struct to track current processor and offset save info
- Add atomic pointers syncStateA2B and syncStateB2A for both directions
- Register grace.OnInterrupt hook to save checkpoints on shutdown
- Modify doSubscribeFilerMetaChanges to update sync state atomically

This ensures that when filer.sync is restarted, it resumes from the
correct position instead of potentially replaying old events.

Fixes #7717
pull/7726/head
Chris Lu 4 weeks ago
committed by GitHub
parent
commit
84b8a8e010
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 52
      weed/command/filer_sync.go
  2. 2
      weed/pb/filer_pb_tail.go

52
weed/command/filer_sync.go

@ -60,10 +60,22 @@ const (
DefaultConcurrencyLimit = 32
)
// syncState tracks the current sync state for graceful shutdown checkpoint saving
type syncState struct {
processor *MetadataProcessor
grpcDialOption grpc.DialOption
targetFiler pb.ServerAddress
sourcePath string
sourceFilerSignature int32
}
var (
syncOptions SyncOptions
syncCpuProfile *string
syncMemProfile *string
// atomic pointers to current sync states for graceful shutdown
syncStateA2B atomic.Pointer[syncState]
syncStateB2A atomic.Pointer[syncState]
)
func init() {
@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
// register graceful shutdown hook to save checkpoints
grace.OnInterrupt(func() {
saveCheckpoint := func(name string, state *syncState) {
if state == nil || state.processor == nil {
return
}
offsetTsNs := state.processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return
}
if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil {
glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err)
} else {
glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs))
}
}
saveCheckpoint("A->B", syncStateA2B.Load())
saveCheckpoint("B->A", syncStateB2A.Load())
})
go func() {
// a->b
// set synchronization start timestamp to offset
@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.concurrency,
*syncOptions.bDoDeleteFiles,
aFilerSignature,
bFilerSignature)
bFilerSignature,
&syncStateA2B)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.concurrency,
*syncOptions.aDoDeleteFiles,
bFilerSignature,
aFilerSignature)
aFilerSignature,
&syncStateB2A)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error {
// if first time, start from now
// if has previously synced, resume from that point of time
@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
// update sync state for graceful shutdown checkpoint saving
if statePtr != nil {
statePtr.Store(&syncState{
processor: processor,
grpcDialOption: grpcDialOption,
targetFiler: targetFiler,
sourcePath: sourcePath,
sourceFilerSignature: sourceFilerSignature,
})
}
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {

2
weed/pb/filer_pb_tail.go

@ -110,7 +110,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
var counter int64
var lastWriteTime time.Time
var lastWriteTime = time.Now()
return func(resp *filer_pb.SubscribeMetadataResponse) error {
if err := processEventFn(resp); err != nil {
return err

Loading…
Cancel
Save