|
@ -76,6 +76,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string |
|
|
|
|
|
|
|
|
glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) |
|
|
glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) |
|
|
var counter int64 |
|
|
var counter int64 |
|
|
|
|
|
var synced bool |
|
|
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { |
|
|
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { |
|
|
if err := Replay(f.Store.ActualStore, event); err != nil { |
|
|
if err := Replay(f.Store.ActualStore, event); err != nil { |
|
|
glog.Errorf("failed to reply metadata change from %v: %v", peer, err) |
|
|
glog.Errorf("failed to reply metadata change from %v: %v", peer, err) |
|
@ -86,6 +87,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string |
|
|
if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { |
|
|
if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { |
|
|
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { |
|
|
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { |
|
|
glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0) |
|
|
glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0) |
|
|
|
|
|
} else if !synced{ |
|
|
|
|
|
synced = true |
|
|
|
|
|
glog.V(0).Infof("synced with %s", peer) |
|
|
} |
|
|
} |
|
|
lastPersistTime = time.Now() |
|
|
lastPersistTime = time.Now() |
|
|
counter = 0 |
|
|
counter = 0 |
|
|