diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 83c8a945d..863f5c3e9 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -57,7 +57,7 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { if update.IsAdd { // every filer should subscribe to a new filer if ma.setActive(address, true) { - go ma.subscribeToOneFiler(ma.filer, ma.self, address) + go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address) } } else { ma.setActive(address, false) @@ -89,7 +89,21 @@ func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { return count > 0 && isActive } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { +func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { + for { + err := ma.doSubscribeToOneFiler(f, self, peer) + if !ma.isActive(peer) { + glog.V(0).Infof("stop subscribing remote %s meta change", peer) + return + } + if err != nil { + glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) + } + time.Sleep(1733 * time.Millisecond) + } +} + +func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) error { /* Each filer reads the "filer.store.id", which is the store's signature when filer starts. @@ -117,6 +131,15 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p lastTsNs = 0 if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { lastTsNs = prevTsNs + defer func(prevTsNs int64) { + if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() { + if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil { + glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) + } else { + glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) + } + } + }(prevTsNs) } glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) @@ -160,48 +183,39 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p return nil } - for { - glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) - err := pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + string(self), - PathPrefix: "/", - SinceNs: lastTsNs, - ClientId: int32(ma.filer.UniqueFileId), - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } + glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) + err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "filer:" + string(self), + PathPrefix: "/", + SinceNs: lastTsNs, + ClientId: int32(ma.filer.UniqueFileId), + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } - if err := processEventFn(resp); err != nil { - return fmt.Errorf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs + if err := processEventFn(resp); err != nil { + return fmt.Errorf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs - f.onMetadataChangeEvent(resp) + f.onMetadataChangeEvent(resp) - } - }) - if !ma.isActive(peer) { - glog.V(0).Infof("stop subscribing remote %s meta change", peer) - return - } - if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) - time.Sleep(1733 * time.Millisecond) } - } + }) + return err } func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {