Browse Source

read peer filer from start

pull/1455/head
Chris Lu 4 years ago
parent
commit
c9f8f25ba5
  1. 17
      weed/filer/meta_aggregator.go

17
weed/filer/meta_aggregator.go

@ -60,11 +60,8 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now() lastPersistTime := time.Now()
changesSinceLastPersist := 0
lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
MaxChangeLimit := 100
isSameFilerStore, err := ma.isSameFilerStore(f, filer) isSameFilerStore, err := ma.isSameFilerStore(f, filer)
for err != nil { for err != nil {
glog.V(0).Infof("connecting to peer filer %s: %v", filer, err) glog.V(0).Infof("connecting to peer filer %s: %v", filer, err)
@ -72,7 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
isSameFilerStore, err = ma.isSameFilerStore(f, filer) isSameFilerStore, err = ma.isSameFilerStore(f, filer)
} }
if !isSameFilerStore{
if !isSameFilerStore {
if prevTsNs, err := ma.readOffset(f, filer); err == nil { if prevTsNs, err := ma.readOffset(f, filer); err == nil {
lastTsNs = prevTsNs lastTsNs = prevTsNs
} }
@ -83,11 +80,12 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
glog.Errorf("failed to reply metadata change from %v: %v", filer, err) glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
return return
} }
changesSinceLastPersist++
if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
if lastPersistTime.Add(time.Minute).Before(time.Now()) {
if err := ma.updateOffset(f, filer, event.TsNs); err == nil { if err := ma.updateOffset(f, filer, event.TsNs); err == nil {
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
glog.V(0).Infof("sync with %s progressed to: %v", filer, time.Unix(0, event.TsNs).UTC())
}
lastPersistTime = time.Now() lastPersistTime = time.Now()
changesSinceLastPersist = 0
} else { } else {
glog.V(0).Infof("failed to update offset for %v: %v", filer, err) glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
} }
@ -159,6 +157,11 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err
value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer)) value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer))
if err == ErrKvNotFound {
glog.Warningf("readOffset %s not found", peer)
return 0, nil
}
if err != nil { if err != nil {
return 0, fmt.Errorf("readOffset %s : %v", peer, err) return 0, fmt.Errorf("readOffset %s : %v", peer, err)
} }

Loading…
Cancel
Save