|
|
@ -75,17 +75,20 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) |
|
|
|
var counter int64 |
|
|
|
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { |
|
|
|
if err := Replay(f.Store.ActualStore, event); err != nil { |
|
|
|
glog.Errorf("failed to reply metadata change from %v: %v", peer, err) |
|
|
|
return |
|
|
|
} |
|
|
|
counter++ |
|
|
|
if lastPersistTime.Add(time.Minute).Before(time.Now()) { |
|
|
|
if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { |
|
|
|
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { |
|
|
|
glog.V(0).Infof("sync with %s progressed to: %v", peer, time.Unix(0, event.TsNs)) |
|
|
|
glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0) |
|
|
|
} |
|
|
|
lastPersistTime = time.Now() |
|
|
|
counter = 0 |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("failed to update offset for %v: %v", peer, err) |
|
|
|
} |
|
|
|