Browse Source

filer.remote.sync can work now

pull/2241/head
Chris Lu 3 years ago
parent
commit
13e45e1605
  1. 4
      weed/Makefile
  2. 1
      weed/command/command.go
  3. 1
      weed/command/filer.go
  4. 219
      weed/command/filer_remote_sync.go
  5. 6
      weed/command/filer_replication.go
  6. 31
      weed/command/imports.go
  7. 1
      weed/filer/filer_remote_storage.go
  8. 32
      weed/filer/stream.go
  9. 6
      weed/remote_storage/remote_storage.go
  10. 86
      weed/remote_storage/s3/s3_storage_client.go
  11. 11
      weed/shell/command_remote_configure.go
  12. 5
      weed/shell/command_remote_mount.go

4
weed/Makefile

@ -37,3 +37,7 @@ debug_s3:
debug_filer_copy:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h
debug_filer_remote_sync:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.remote.sync -filer="localhost:8888" -dir=/buckets/b2 -timeAgo=10000h

1
weed/command/command.go

@ -21,6 +21,7 @@ var Commands = []*Command{
cmdFilerCopy,
cmdFilerMetaBackup,
cmdFilerMetaTail,
cmdFilerRemoteSynchronize,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,

1
weed/command/filer.go

@ -3,7 +3,6 @@ package command
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"

219
weed/command/filer_remote_sync.go

@ -0,0 +1,219 @@
package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"strings"
"time"
)
type RemoteSyncOptions struct {
filerAddress *string
grpcDialOption grpc.DialOption
readChunkFromFiler *bool
debug *bool
timeAgo *time.Duration
dir *string
}
const (
RemoteSyncKeyPrefix = "remote.sync."
)
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
var (
remoteSyncOptions RemoteSyncOptions
)
func init() {
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerRemoteSynchronize = &Command{
UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
filer.remote.sync listens on filer update events.
If any mounted remote file is updated, it will fetch the updated content,
and write to the remote storage.
`,
}
func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteSyncOptions.grpcDialOption = grpcDialOption
// read filer remote storage mount mappings
mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
if readErr != nil {
fmt.Printf("read mount mapping: %v", readErr)
return false
}
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
*remoteSyncOptions.filerAddress,
pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
"/", // does not matter
*remoteSyncOptions.readChunkFromFiler,
)
var found bool
for dir, remoteStorageMountLocation := range mappings.Mappings {
if *remoteSyncOptions.dir == dir {
found = true
storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
if readErr != nil {
fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
continue
}
fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
if err := util.Retry("filer.remote.sync "+dir, func() error {
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
}); err != nil {
fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
}
break
}
}
if !found {
fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
return false
}
return true
}
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
dirHash := util.HashStringToLong(mountedDir)
// 1. specified by timeAgo
// 2. last offset timestamp for this directory
// 3. directory creation time
var lastOffsetTs time.Time
if *option.timeAgo == 0 {
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
if err != nil {
return fmt.Errorf("lookup %s: %v", mountedDir, err)
}
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resume from %v", lastOffsetTs)
} else {
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
}
} else {
lastOffsetTs = time.Now().Add(-*option.timeAgo)
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
if message.OldEntry == nil && message.NewEntry != nil {
if len(message.NewEntry.Chunks) == 0 {
return nil
}
fmt.Printf("create: %+v\n", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
return client.WriteFile(dest, message.NewEntry, reader)
}
if message.OldEntry != nil && message.NewEntry == nil {
fmt.Printf("delete: %+v\n", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
return client.DeleteFile(dest)
}
if message.OldEntry != nil && message.NewEntry != nil {
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
fmt.Printf("update meta: %+v\n", resp)
return client.UpdateFileMetadata(dest, message.NewEntry)
}
}
fmt.Printf("update: %+v\n", resp)
if err := client.DeleteFile(oldDest); err != nil {
return err
}
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
return client.WriteFile(dest, message.NewEntry, reader)
}
return nil
}
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
})
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
"filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
var dest string
source := string(sourcePath[len(mountDir):])
if strings.HasSuffix(remoteMountLocation.Path, "/") {
dest = remoteMountLocation.Path + source[1:]
} else {
dest = remoteMountLocation.Path + source
}
return &filer_pb.RemoteStorageLocation{
Name: remoteMountLocation.Name,
Bucket: remoteMountLocation.Bucket,
Path: dest,
}
}
func isSameChunks(a, b []*filer_pb.FileChunk) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
x, y := a[i], b[i]
if !proto.Equal(x, y) {
return false
}
}
return true
}

6
weed/command/filer_replication.go

@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
)

31
weed/command/imports.go

@ -0,0 +1,31 @@
package command
import (
_ "net/http/pprof"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
)

1
weed/filer/filer_remote_storage.go

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"

32
weed/filer/stream.go

@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
type ChunkStreamReader struct {
chunkViews []*ChunkView
totalSize int64
logicOffset int64
buffer []byte
bufferOffset int64
bufferPos int
@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
_, err = c.Seek(off, io.SeekStart)
if err != nil {
if err = c.prepareBufferFor(c.logicOffset); err != nil {
return
}
return c.Read(p)
@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
return n, io.EOF
}
chunkView := c.chunkViews[c.nextChunkViewIndex]
c.fetchChunkToBuffer(chunkView)
if err = c.fetchChunkToBuffer(chunkView); err != nil {
return
}
c.nextChunkViewIndex++
}
t := copy(p[n:], c.buffer[c.bufferPos:])
c.bufferPos += t
n += t
c.logicOffset += int64(t)
}
return
}
@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
offset += c.bufferOffset + int64(c.bufferPos)
offset += c.logicOffset
case io.SeekEnd:
offset = c.totalSize + offset
}
if offset > c.totalSize {
err = io.ErrUnexpectedEOF
} else {
c.logicOffset = offset
}
return offset, err
}
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
// stay in the same chunk
if !c.isBufferEmpty() {
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
c.bufferPos = int(offset - c.bufferOffset)
return offset, nil
return nil
}
}
@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
return c.chunkViews[i].LogicOffset <= offset
})
if currentChunkIndex == len(c.chunkViews) {
return 0, io.EOF
return io.EOF
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
c.fetchChunkToBuffer(chunk)
if err = c.fetchChunkToBuffer(chunk); err != nil {
return
}
c.nextChunkViewIndex = currentChunkIndex + 1
}
c.bufferPos = int(offset - c.bufferOffset)
} else {
return 0, io.ErrUnexpectedEOF
}
return offset, err
return
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {

6
weed/remote_storage/remote_storage.go

@ -3,6 +3,7 @@ package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"strings"
"sync"
)
@ -30,7 +31,10 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error)
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error)
UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error)
}
type RemoteStorageClientMaker interface {

86
weed/remote_storage/s3/s3_storage_client.go

@ -8,9 +8,11 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
)
func init() {
@ -45,7 +47,9 @@ type s3RemoteStorageClient struct {
conn s3iface.S3API
}
func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{})
func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := remote.Path[1:]
@ -91,7 +95,7 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation,
}
return
}
func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) {
func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
u.PartSize = int64(4 * 1024 * 1024)
u.Concurrency = 1
@ -111,3 +115,81 @@ func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, off
return writerAt.Bytes(), nil
}
func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) {
fileSize := int64(filer.FileSize(entry))
partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
for partSize*1000 < fileSize {
partSize *= 4
}
// Create an uploader with the session and custom options
uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) {
u.PartSize = partSize
u.Concurrency = 5
})
// process tagging
tags := ""
for k, v := range entry.Extended {
if len(tags) > 0 {
tags = tags + "&"
}
tags = tags + k + "=" + string(v)
}
// Upload the file to S3.
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
Body: reader,
ACL: aws.String("private"),
ServerSideEncryption: aws.String("AES256"),
StorageClass: aws.String("STANDARD_IA"),
Tagging: aws.String(tags),
})
//in case it fails to upload
if err != nil {
return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
}
return nil
}
func toTagging(attributes map[string][]byte) *s3.Tagging {
tagging := &s3.Tagging{}
for k, v := range attributes {
tagging.TagSet = append(tagging.TagSet, &s3.Tag{
Key: aws.String(k),
Value: aws.String(string(v)),
})
}
return tagging
}
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
tagging := toTagging(entry.Extended)
if len(tagging.TagSet) > 0 {
_, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
Tagging: toTagging(entry.Extended),
})
} else {
_, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
})
}
return
}
func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) {
_, err = s.conn.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(loc.Bucket),
Key: aws.String(loc.Path[1:]),
})
return
}

11
weed/shell/command_remote_configure.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
"regexp"
@ -96,9 +97,15 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
conf.S3SecretKey = ""
fmt.Fprintf(writer, "%+v\n", conf)
m := jsonpb.Marshaler{
EmitDefaults: false,
Indent: " ",
}
return nil
err := m.Marshal(writer, conf)
fmt.Fprintln(writer)
return err
})
}

5
weed/shell/command_remote_mount.go

@ -88,7 +88,10 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command
Indent: " ",
}
return m.Marshal(writer, mappings)
err = m.Marshal(writer, mappings)
fmt.Fprintln(writer)
return
}

Loading…
Cancel
Save