Browse Source

leveldb2: support subscribing from peers

pull/1389/head
Chris Lu 5 years ago
parent
commit
5ba894bb72
  1. 2
      weed/filer2/filer.go
  2. 5
      weed/filer2/filerstore.go
  3. 47
      weed/filer2/leveldb2/leveldb2_local_store.go
  4. 38
      weed/filer2/meta_aggregator.go
  5. 37
      weed/filer2/meta_replay.go

2
weed/filer2/filer.go

@ -68,7 +68,7 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) {
filers = append(filers, self) filers = append(filers, self)
} }
f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
f.MetaAggregator.StartLoopSubscribe(time.Now().UnixNano())
f.MetaAggregator.StartLoopSubscribe(f, self, time.Now().UnixNano())
} }

5
weed/filer2/filerstore.go

@ -29,6 +29,11 @@ type FilerStore interface {
Shutdown() Shutdown()
} }
type FilerLocalStore interface {
UpdateOffset(filer string, lastTsNs int64) error
ReadOffset(filer string) (lastTsNs int64, err error)
}
type FilerStoreWrapper struct { type FilerStoreWrapper struct {
actualStore FilerStore actualStore FilerStore
} }

47
weed/filer2/leveldb2/leveldb2_local_store.go

@ -0,0 +1,47 @@
package leveldb
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
}
var (
_ = filer2.FilerLocalStore(&LevelDB2Store{})
)
func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
value := make([]byte, 8)
util.Uint64toBytes(value, uint64(lastTsNs))
err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
if err != nil {
return fmt.Errorf("UpdateOffset %s : %v", filer, err)
}
println("UpdateOffset", filer, "lastTsNs", lastTsNs)
return nil
}
func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
if err != nil {
return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
}
lastTsNs = int64(util.BytesToUint64(value))
println("ReadOffset", filer, "lastTsNs", lastTsNs)
return
}

38
weed/filer2/meta_aggregator.go

@ -37,13 +37,42 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
return t return t
} }
func (ma *MetaAggregator) StartLoopSubscribe(lastTsNs int64) {
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string, lastTsNs int64) {
for _, filer := range ma.filers { for _, filer := range ma.filers {
go ma.subscribeToOneFiler(filer, lastTsNs)
go ma.subscribeToOneFiler(f, self, filer, lastTsNs)
} }
} }
func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, filer string, self string, lastTsNs int64) {
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
changesSinceLastPersist := 0
MaxChangeLimit := 100
if localStore, ok := f.store.actualStore.(FilerLocalStore); ok {
if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
lastTsNs = prevTsNs
}
if self != filer {
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
if err := Replay(f.store.actualStore, event); err != nil {
glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
return
}
changesSinceLastPersist++
if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
lastPersistTime = time.Now()
changesSinceLastPersist = 0
} else {
glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
}
}
}
}
}
processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
data, err := proto.Marshal(event) data, err := proto.Marshal(event)
@ -54,6 +83,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
dir := event.Directory dir := event.Directory
// println("received meta change", dir, "size", len(data)) // println("received meta change", dir, "size", len(data))
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data) ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
return nil return nil
} }

37
weed/filer2/meta_replay.go

@ -0,0 +1,37 @@
package filer2
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
var oldPath util.FullPath
var newEntry *Entry
if message.OldEntry != nil {
oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil {
return err
}
}
if message.NewEntry != nil {
dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
newEntry = FromPbEntry(dir, message.NewEntry)
if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil {
return err
}
}
return nil
}
Loading…
Cancel
Save