diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 518212437..d313b7ba3 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -42,11 +42,6 @@ type FilerStore interface { Shutdown() } -type FilerLocalStore interface { - UpdateOffset(filer string, lastTsNs int64) error - ReadOffset(filer string) (lastTsNs int64, err error) -} - type FilerStoreWrapper struct { ActualStore FilerStore } diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go deleted file mode 100644 index faae25c45..000000000 --- a/weed/filer/leveldb2/leveldb2_local_store.go +++ /dev/null @@ -1,43 +0,0 @@ -package leveldb - -import ( - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ = filer.FilerLocalStore(&LevelDB2Store{}) -) - -func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { - - value := make([]byte, 8) - util.Uint64toBytes(value, uint64(lastTsNs)) - - err := store.dbs[0].Put([]byte("meta"+filer), value, nil) - - if err != nil { - return fmt.Errorf("UpdateOffset %s : %v", filer, err) - } - - println("UpdateOffset", filer, "lastTsNs", lastTsNs) - - return nil -} - -func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { - - value, err := store.dbs[0].Get([]byte("meta"+filer), nil) - - if err != nil { - return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) - } - - lastTsNs = int64(util.BytesToUint64(value)) - - println("ReadOffset", filer, "lastTsNs", lastTsNs) - - return -} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 18049ee04..7a5329d74 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "io" "sync" "time" @@ -64,31 +65,33 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin MaxChangeLimit := 100 - if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok { - if self != filer { + isSameFilerStore, err := ma.isSameFilerStore(f, filer) + for err != nil { + glog.V(0).Infof("connecting to peer filer %s: %v", filer, err) + time.Sleep(1357 * time.Millisecond) + isSameFilerStore, err = ma.isSameFilerStore(f, filer) + } - if prevTsNs, err := localStore.ReadOffset(filer); err == nil { - lastTsNs = prevTsNs - } + if !isSameFilerStore{ + if prevTsNs, err := ma.readOffset(f, filer); err == nil { + lastTsNs = prevTsNs + } - glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) - maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { - if err := Replay(f.Store.ActualStore, event); err != nil { - glog.Errorf("failed to reply metadata change from %v: %v", filer, err) - return - } - changesSinceLastPersist++ - if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { - if err := localStore.UpdateOffset(filer, event.TsNs); err == nil { - lastPersistTime = time.Now() - changesSinceLastPersist = 0 - } else { - glog.V(0).Infof("failed to update offset for %v: %v", filer, err) - } + glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) + maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { + if err := Replay(f.Store.ActualStore, event); err != nil { + glog.Errorf("failed to reply metadata change from %v: %v", filer, err) + return + } + changesSinceLastPersist++ + if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { + if err := ma.updateOffset(f, filer, event.TsNs); err == nil { + lastPersistTime = time.Now() + changesSinceLastPersist = 0 + } else { + glog.V(0).Infof("failed to update offset for %v: %v", filer, err) } } - } else { - glog.V(0).Infof("skipping following self: %v", self) } } @@ -151,3 +154,34 @@ func (ma *MetaAggregator) isSameFilerStore(f *Filer, peer string) (isSame bool, }) return } + +func (ma *MetaAggregator) readOffset(f *Filer, peer string) (lastTsNs int64, err error) { + + value, err := f.Store.KvGet(context.Background(), []byte("meta"+peer)) + + if err != nil { + return 0, fmt.Errorf("readOffset %s : %v", peer, err) + } + + lastTsNs = int64(util.BytesToUint64(value)) + + glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs) + + return +} + +func (ma *MetaAggregator) updateOffset(f *Filer, peer string, lastTsNs int64) (err error) { + + value := make([]byte, 8) + util.Uint64toBytes(value, uint64(lastTsNs)) + + err = f.Store.KvPut(context.Background(), []byte("meta"+peer), value) + + if err != nil { + return fmt.Errorf("updateOffset %s : %v", peer, err) + } + + glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs) + + return +} diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 9ba45edfe..72e2b355b 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -63,21 +63,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - if _, ok := fs.filer.Store.ActualStore.(filer.FilerLocalStore); ok { - // println("reading from persisted logs ...") - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } + // println("reading from persisted logs ...") + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) } + glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) // println("reading from in memory logs ...") - err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock()