Browse Source

bootstrap filer from one peer

pull/5724/head
chrislu 7 months ago
parent
commit
c030cb3ce9
  1. 12
      other/java/client/src/main/proto/filer.proto
  2. 24
      weed/filer/filer.go
  3. 12
      weed/pb/filer.proto
  4. 1052
      weed/pb/filer_pb/filer.pb.go
  5. 36
      weed/pb/filer_pb/filer_client_bfs.go
  6. 68
      weed/pb/filer_pb/filer_grpc.pb.go
  7. 84
      weed/server/filer_grpc_server_traverse_meta.go
  8. 31
      weed/server/filer_grpc_server_traverse_meta_test.go
  9. 2
      weed/server/filer_server.go
  10. 30
      weed/util/queue.go

12
other/java/client/src/main/proto/filer.proto

@ -54,6 +54,9 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
rpc TraverseBfsMetadata (TraverseBfsMetadataRequest) returns (stream TraverseBfsMetadataResponse) {
}
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
@ -360,6 +363,15 @@ message SubscribeMetadataResponse {
int64 ts_ns = 3;
}
message TraverseBfsMetadataRequest {
string directory = 1;
repeated string excluded_prefixes = 2;
}
message TraverseBfsMetadataResponse {
string directory = 1;
Entry entry = 2;
}
message LogEntry {
int64 ts_ns = 1;
int32 partition_key_hash = 2;

24
weed/filer/filer.go

@ -78,7 +78,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
return f
}
func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
func (f *Filer) MaybeBootstrapFromOnePeer(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
if len(existingNodes) == 0 {
return
}
@ -91,25 +91,13 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
}
glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId)
f.UniqueFilerEpoch++
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "bootstrap",
ClientId: f.UniqueFilerId,
ClientEpoch: f.UniqueFilerEpoch,
SelfSignature: f.Signature,
PathPrefix: "/",
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: 0,
StopTsNs: snapshotTime.UnixNano(),
EventErrorType: pb.FatalOnError,
}
err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
return pb.WithFilerClient(false, f.UniqueFilerId, pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.StreamBfs(client, "/", snapshotTime.UnixNano(), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
return f.Store.InsertEntry(context.Background(), FromPbEntry(string(parentPath), entry))
})
})
return
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {

12
weed/pb/filer.proto

@ -54,6 +54,9 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
rpc TraverseBfsMetadata (TraverseBfsMetadataRequest) returns (stream TraverseBfsMetadataResponse) {
}
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
@ -360,6 +363,15 @@ message SubscribeMetadataResponse {
int64 ts_ns = 3;
}
message TraverseBfsMetadataRequest {
string directory = 1;
repeated string excluded_prefixes = 2;
}
message TraverseBfsMetadataResponse {
string directory = 1;
Entry entry = 2;
}
message LogEntry {
int64 ts_ns = 1;
int32 partition_key_hash = 2;

1052
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

36
weed/pb/filer_pb/filer_client_bfs.go

@ -1,7 +1,10 @@
package filer_pb
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"io"
"sync"
"time"
@ -12,7 +15,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
K := 5
var jobQueueWg sync.WaitGroup
queue := util.NewQueue()
queue := util.NewQueue[util.FullPath]()
jobQueueWg.Add(1)
queue.Enqueue(parentPath)
terminates := make([]chan bool, K)
@ -26,11 +29,11 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
return
default:
t := queue.Dequeue()
if t == nil {
if t == "" {
time.Sleep(329 * time.Millisecond)
continue
}
dir := t.(util.FullPath)
dir := t
processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
if processErr != nil {
err = processErr
@ -47,7 +50,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
return
}
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
@ -65,3 +68,28 @@ func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queu
})
}
func StreamBfs(client SeaweedFilerClient, dir util.FullPath, olderThanTsNs int64, fn func(parentPath util.FullPath, entry *Entry)error) (err error) {
glog.V(0).Infof("TraverseBfsMetadata %v if before %v", dir, time.Unix(0, olderThanTsNs))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.TraverseBfsMetadata(ctx, &TraverseBfsMetadataRequest{
Directory: string(dir),
})
if err != nil {
return fmt.Errorf("traverse bfs metadata: %v", err)
}
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("traverse bfs metadata: %v", err)
}
if err := fn(util.FullPath(resp.Directory), resp.Entry); err != nil {
return err
}
}
return nil
}

68
weed/pb/filer_pb/filer_grpc.pb.go

@ -34,6 +34,7 @@ const (
SeaweedFiler_Statistics_FullMethodName = "/filer_pb.SeaweedFiler/Statistics"
SeaweedFiler_Ping_FullMethodName = "/filer_pb.SeaweedFiler/Ping"
SeaweedFiler_GetFilerConfiguration_FullMethodName = "/filer_pb.SeaweedFiler/GetFilerConfiguration"
SeaweedFiler_TraverseBfsMetadata_FullMethodName = "/filer_pb.SeaweedFiler/TraverseBfsMetadata"
SeaweedFiler_SubscribeMetadata_FullMethodName = "/filer_pb.SeaweedFiler/SubscribeMetadata"
SeaweedFiler_SubscribeLocalMetadata_FullMethodName = "/filer_pb.SeaweedFiler/SubscribeLocalMetadata"
SeaweedFiler_KvGet_FullMethodName = "/filer_pb.SeaweedFiler/KvGet"
@ -64,6 +65,7 @@ type SeaweedFilerClient interface {
Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
GetFilerConfiguration(ctx context.Context, in *GetFilerConfigurationRequest, opts ...grpc.CallOption) (*GetFilerConfigurationResponse, error)
TraverseBfsMetadata(ctx context.Context, in *TraverseBfsMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_TraverseBfsMetadataClient, error)
SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error)
SubscribeLocalMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeLocalMetadataClient, error)
KvGet(ctx context.Context, in *KvGetRequest, opts ...grpc.CallOption) (*KvGetResponse, error)
@ -265,8 +267,40 @@ func (c *seaweedFilerClient) GetFilerConfiguration(ctx context.Context, in *GetF
return out, nil
}
func (c *seaweedFilerClient) TraverseBfsMetadata(ctx context.Context, in *TraverseBfsMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_TraverseBfsMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[2], SeaweedFiler_TraverseBfsMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &seaweedFilerTraverseBfsMetadataClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedFiler_TraverseBfsMetadataClient interface {
Recv() (*TraverseBfsMetadataResponse, error)
grpc.ClientStream
}
type seaweedFilerTraverseBfsMetadataClient struct {
grpc.ClientStream
}
func (x *seaweedFilerTraverseBfsMetadataClient) Recv() (*TraverseBfsMetadataResponse, error) {
m := new(TraverseBfsMetadataResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedFilerClient) SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[2], SeaweedFiler_SubscribeMetadata_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[3], SeaweedFiler_SubscribeMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
@ -298,7 +332,7 @@ func (x *seaweedFilerSubscribeMetadataClient) Recv() (*SubscribeMetadataResponse
}
func (c *seaweedFilerClient) SubscribeLocalMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeLocalMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[3], SeaweedFiler_SubscribeLocalMetadata_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[4], SeaweedFiler_SubscribeLocalMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
@ -411,6 +445,7 @@ type SeaweedFilerServer interface {
Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error)
TraverseBfsMetadata(*TraverseBfsMetadataRequest, SeaweedFiler_TraverseBfsMetadataServer) error
SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error
SubscribeLocalMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeLocalMetadataServer) error
KvGet(context.Context, *KvGetRequest) (*KvGetResponse, error)
@ -473,6 +508,9 @@ func (UnimplementedSeaweedFilerServer) Ping(context.Context, *PingRequest) (*Pin
func (UnimplementedSeaweedFilerServer) GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFilerConfiguration not implemented")
}
func (UnimplementedSeaweedFilerServer) TraverseBfsMetadata(*TraverseBfsMetadataRequest, SeaweedFiler_TraverseBfsMetadataServer) error {
return status.Errorf(codes.Unimplemented, "method TraverseBfsMetadata not implemented")
}
func (UnimplementedSeaweedFilerServer) SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeMetadata not implemented")
}
@ -789,6 +827,27 @@ func _SeaweedFiler_GetFilerConfiguration_Handler(srv interface{}, ctx context.Co
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_TraverseBfsMetadata_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TraverseBfsMetadataRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedFilerServer).TraverseBfsMetadata(m, &seaweedFilerTraverseBfsMetadataServer{stream})
}
type SeaweedFiler_TraverseBfsMetadataServer interface {
Send(*TraverseBfsMetadataResponse) error
grpc.ServerStream
}
type seaweedFilerTraverseBfsMetadataServer struct {
grpc.ServerStream
}
func (x *seaweedFilerTraverseBfsMetadataServer) Send(m *TraverseBfsMetadataResponse) error {
return x.ServerStream.SendMsg(m)
}
func _SeaweedFiler_SubscribeMetadata_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeMetadataRequest)
if err := stream.RecvMsg(m); err != nil {
@ -1056,6 +1115,11 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
Handler: _SeaweedFiler_StreamRenameEntry_Handler,
ServerStreams: true,
},
{
StreamName: "TraverseBfsMetadata",
Handler: _SeaweedFiler_TraverseBfsMetadata_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeMetadata",
Handler: _SeaweedFiler_SubscribeMetadata_Handler,

84
weed/server/filer_grpc_server_traverse_meta.go

@ -0,0 +1,84 @@
package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/viant/ptrie"
)
func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error {
glog.V(0).Infof("TraverseBfsMetadata %v", req)
excludedTrie := ptrie.New[bool]()
for _, excluded := range req.ExcludedPrefixes {
excludedTrie.Put([]byte(excluded), true)
}
ctx := stream.Context()
queue := util.NewQueue[*filer.Entry]()
dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory))
if err != nil {
return fmt.Errorf("find dir %s: %v", req.Directory, err)
}
queue.Enqueue(dirEntry)
for item := queue.Dequeue(); item != nil; item = queue.Dequeue() {
if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool {
return true
}) {
// println("excluded", item.FullPath)
continue
}
parent, _ := item.FullPath.DirAndName()
if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{
Directory: parent,
Entry: item.ToProtoEntry(),
}); err != nil {
return fmt.Errorf("send traverse bfs metadata response: %v", err)
}
if !item.IsDirectory() {
continue
}
if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error {
queue.Enqueue(entry)
return nil
}); err != nil {
return err
}
}
return nil
}
func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) {
var lastFileName string
var listErr error
for {
var hasEntries bool
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
hasEntries = true
if fnErr := fn(entry); fnErr != nil {
err = fnErr
return false
}
return true
})
if listErr != nil {
return listErr
}
if err != nil {
return err
}
if !hasEntries {
return nil
}
}
}

31
weed/server/filer_grpc_server_traverse_meta_test.go

@ -0,0 +1,31 @@
package weed_server
import (
"github.com/stretchr/testify/assert"
"github.com/viant/ptrie"
"testing"
)
func TestPtrie(t *testing.T) {
b := []byte("/topics/abc/dev")
excludedTrie := ptrie.New[bool]()
excludedTrie.Put([]byte("/topics/abc/d"), true)
excludedTrie.Put([]byte("/topics/abc"), true)
assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool {
println("matched1", string(key))
return true
}))
assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool {
println("matched2", string(key))
return true
}))
assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool {
println("matched3", string(key))
return true
}))
assert.False(t, excludedTrie.Has(b))
}

2
weed/server/filer_server.go

@ -203,7 +203,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
}
if isFresh {
glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
}
}

30
weed/util/queue.go

@ -1,35 +1,37 @@
package util
import "sync"
import (
"sync"
)
type node struct {
data interface{}
next *node
type node[T any]struct {
data T
next *node[T]
}
type Queue struct {
head *node
tail *node
type Queue[T any] struct {
head *node[T]
tail *node[T]
count int
sync.RWMutex
}
func NewQueue() *Queue {
q := &Queue{}
func NewQueue[T any]() *Queue[T] {
q := &Queue[T]{}
return q
}
func (q *Queue) Len() int {
func (q *Queue[T]) Len() int {
q.RLock()
defer q.RUnlock()
return q.count
}
func (q *Queue) Enqueue(item interface{}) {
func (q *Queue[T]) Enqueue(item T) {
q.Lock()
defer q.Unlock()
n := &node{data: item}
n := &node[T]{data: item}
if q.tail == nil {
q.tail = n
@ -41,12 +43,12 @@ func (q *Queue) Enqueue(item interface{}) {
q.count++
}
func (q *Queue) Dequeue() interface{} {
func (q *Queue[T]) Dequeue() (result T) {
q.Lock()
defer q.Unlock()
if q.head == nil {
return nil
return
}
n := q.head

Loading…
Cancel
Save