From ac494ff5e809af93cd7819794d665b7ece97891e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 6 Sep 2020 00:29:16 -0700 Subject: [PATCH] Update meta_aggregator.go --- weed/filer/meta_aggregator.go | 48 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 367c0d150..ab202694c 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -46,7 +46,7 @@ func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { } } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) { /* Each filer reads the "filer.store.id", which is the store's signature when filer starts. @@ -62,32 +62,32 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin lastPersistTime := time.Now() lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() - isSameFilerStore, err := ma.isSameFilerStore(f, filer) + peerSignature, err := ma.readFilerStoreSignature(peer) for err != nil { - glog.V(0).Infof("connecting to peer filer %s: %v", filer, err) + glog.V(0).Infof("connecting to peer filer %s: %v", peer, err) time.Sleep(1357 * time.Millisecond) - isSameFilerStore, err = ma.isSameFilerStore(f, filer) + peerSignature, err = ma.readFilerStoreSignature(peer) } - if !isSameFilerStore { - if prevTsNs, err := ma.readOffset(f, filer); err == nil { + if peerSignature != f.Signature { + if prevTsNs, err := ma.readOffset(f, file, peerSignature); err == nil { lastTsNs = prevTsNs } - glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) + glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { if err := Replay(f.Store.ActualStore, event); err != nil { - glog.Errorf("failed to reply metadata change from %v: %v", filer, err) + glog.Errorf("failed to reply metadata change from %v: %v", peer, err) return } if lastPersistTime.Add(time.Minute).Before(time.Now()) { - if err := ma.updateOffset(f, filer, event.TsNs); err == nil { + if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { - glog.V(0).Infof("sync with %s progressed to: %v", filer, time.Unix(0, event.TsNs).UTC()) + glog.V(0).Infof("sync with %s progressed to: %v", peer, time.Unix(0, event.TsNs)) } lastPersistTime = time.Now() } else { - glog.V(0).Infof("failed to update offset for %v: %v", filer, err) + glog.V(0).Infof("failed to update offset for %v: %v", peer, err) } } } @@ -109,7 +109,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } for { - err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ ClientName: "filer:" + self, PathPrefix: "/", @@ -135,27 +135,34 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } }) if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err) + glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) time.Sleep(1733 * time.Millisecond) } } } -func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool, err error) { +func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) { err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err } - isSame = f.Signature == resp.Signature + sig = resp.Signature return nil }) return } -func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err error) { +const( + MetaOffsetPrefix = "Meta" +) + +func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) { - value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer)) + key := []byte(MetaOffsetPrefix+"xxxx") + util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + + value, err := f.Store.KvGet(context.Background(), key) if err == ErrKvNotFound { glog.Warningf("readOffset %s not found", peer) @@ -173,12 +180,15 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err return } -func (ma *MetaAggregator) updateOffset(f *Filer, peer string, lastTsNs int64) (err error) { +func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) { + + key := []byte(MetaOffsetPrefix+"xxxx") + util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) value := make([]byte, 8) util.Uint64toBytes(value, uint64(lastTsNs)) - err = f.Store.KvPut(context.Background(), []byte("meta"+peer), value) + err = f.Store.KvPut(context.Background(), key, value) if err != nil { return fmt.Errorf("updateOffset %s : %v", peer, err)