124 lines
4.7 KiB

package command
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"os"
"time"
)
type RemoteGatewayOptions struct {
filerAddress *string
grpcDialOption grpc.DialOption
readChunkFromFiler *bool
timeAgo *time.Duration
createBucketAt *string
createBucketRandomSuffix *bool
include *string
exclude *string
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
clientId int32
clientEpoch int32
}
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (option *RemoteGatewayOptions) GetDataCenter() string {
return ""
}
var (
remoteGatewayOptions RemoteGatewayOptions
)
func init() {
cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle
remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*")
remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*")
remoteGatewayOptions.clientId = util.RandomInt32()
}
var cmdFilerRemoteGateway = &Command{
UsageLine: "filer.remote.gateway",
Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote object store",
Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote object store
filer.remote.gateway listens on filer local buckets update events.
If any bucket is created, deleted, or updated, it will mirror the changes to remote object store.
weed filer.remote.sync -createBucketAt=cloud1
`,
}
func runFilerRemoteGateway(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteGatewayOptions.grpcDialOption = grpcDialOption
filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress)
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
filerAddress.ToHttpAddress(),
filerAddress.ToGrpcAddress(),
"/", // does not matter
*remoteGatewayOptions.readChunkFromFiler,
)
remoteGatewayOptions.bucketsDir = "/buckets"
// check buckets again
remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
remoteGatewayOptions.bucketsDir = resp.DirBuckets
return nil
})
// read filer remote storage mount mappings
if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil {
fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
return true
}
// synchronize /buckets folder
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir)
util.RetryForever("filer.remote.sync buckets", func() error {
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource)
}, func(err error) bool {
if err != nil {
glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err)
}
return true
})
return true
}