|
@ -37,25 +37,29 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg |
|
|
return t |
|
|
return t |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string, lastTsNs int64) { |
|
|
|
|
|
|
|
|
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { |
|
|
for _, filer := range ma.filers { |
|
|
for _, filer := range ma.filers { |
|
|
go ma.subscribeToOneFiler(f, self, filer, lastTsNs) |
|
|
|
|
|
|
|
|
go ma.subscribeToOneFiler(f, self, filer) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self string, lastTsNs int64) { |
|
|
|
|
|
|
|
|
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { |
|
|
|
|
|
|
|
|
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) |
|
|
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) |
|
|
lastPersistTime := time.Now() |
|
|
lastPersistTime := time.Now() |
|
|
changesSinceLastPersist := 0 |
|
|
changesSinceLastPersist := 0 |
|
|
|
|
|
lastTsNs := int64(0) |
|
|
|
|
|
|
|
|
MaxChangeLimit := 100 |
|
|
MaxChangeLimit := 100 |
|
|
|
|
|
|
|
|
if localStore, ok := f.store.actualStore.(FilerLocalStore); ok { |
|
|
if localStore, ok := f.store.actualStore.(FilerLocalStore); ok { |
|
|
if prevTsNs, err := localStore.ReadOffset(filer); err == nil { |
|
|
|
|
|
lastTsNs = prevTsNs |
|
|
|
|
|
} |
|
|
|
|
|
if self != filer { |
|
|
if self != filer { |
|
|
|
|
|
|
|
|
|
|
|
if prevTsNs, err := localStore.ReadOffset(filer); err == nil { |
|
|
|
|
|
lastTsNs = prevTsNs |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
glog.V(0).Infof("follow filer: %v, lastTsNs=%d", filer, lastTsNs) |
|
|
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { |
|
|
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { |
|
|
if err := Replay(f.store.actualStore, event); err != nil { |
|
|
if err := Replay(f.store.actualStore, event); err != nil { |
|
|
glog.Errorf("failed to reply metadata change from %v: %v", filer, err) |
|
|
glog.Errorf("failed to reply metadata change from %v: %v", filer, err) |
|
@ -71,6 +75,8 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self strin |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(0).Infof("skipping following self: %v", self) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|