diff --git a/README.md b/README.md index 41305a223..fb3058dbf 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ SeaweedFS can work very well with just the object store. [[Filer]] is added late * [filer server][Filer] provide "normal" directories and files via http. * [mount filer][Mount] to read and write files directly as a local directory via FUSE. * [Amazon S3 compatible API][AmazonS3API] to access files with S3 tooling. -* [Async Backup To Cloud][BackupToCloud] can enjoy extreme fast local access and backup to Amazon S3, Google Cloud Storage, Azure. +* [Async Backup To Cloud][BackupToCloud] can enjoy extreme fast local access and backup to Amazon S3, Google Cloud Storage, Azure, BackBlaze. [Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files [Mount]: https://github.com/chrislusf/seaweedfs/wiki/Mount diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 8d5c9d8cc..1c137bb53 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -6,6 +6,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication/sink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 2e7aa0cb6..95ddbd57c 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -195,12 +195,20 @@ bucket = "your_bucket_seaweedfs" # an existing bucket directory = "/" # destination directory [sink.azure] -# experimental +# experimental, let me know if it works enabled = false account_name = "" account_key = "" container = "mycontainer" # an existing container directory = "" # destination directory (do not prefix or suffix with "/") +[sink.backblaze] +# experimental, let me know if it works +enabled = false +account_id = "" +account_key = "" +bucket = "mybucket" # an existing bucket +directory = "" # destination directory (do not prefix or suffix with "/") + ` ) diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go new file mode 100644 index 000000000..ce0e9eb3c --- /dev/null +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -0,0 +1,128 @@ +package B2Sink + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/kurin/blazer/b2" +) + +type B2Sink struct { + client *b2.Client + bucket string + dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &B2Sink{}) +} + +func (g *B2Sink) GetName() string { + return "backblaze" +} + +func (g *B2Sink) GetSinkToDirectory() string { + return g.dir +} + +func (g *B2Sink) Initialize(configuration util.Configuration) error { + return g.initialize( + configuration.GetString("account_id"), + configuration.GetString("account_key"), + configuration.GetString("bucket"), + configuration.GetString("directory"), + ) +} + +func (g *B2Sink) SetSourceFiler(s *source.FilerSource) { + g.filerSource = s +} + +func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { + ctx := context.Background() + client, err := b2.NewClient(ctx, accountId, accountKey) + if err != nil { + return nil + } + + g.client = client + g.bucket = bucket + g.dir = dir + + return nil +} + +func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + + if isDirectory { + key = key + "/" + } + + ctx := context.Background() + + bucket, err := g.client.Bucket(ctx, g.bucket) + if err != nil { + return err + } + + targetObject := bucket.Object(key) + + return targetObject.Delete(ctx) + +} + +func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { + + if entry.IsDirectory { + return nil + } + + totalSize := filer2.TotalSize(entry.Chunks) + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) + + ctx := context.Background() + + bucket, err := g.client.Bucket(ctx, g.bucket) + if err != nil { + return err + } + + targetObject := bucket.Object(key) + writer := targetObject.NewWriter(ctx) + + for _, chunk := range chunkViews { + + fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) + if err != nil { + return err + } + + var writeErr error + _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) { + _, err := writer.Write(data) + if err != nil { + writeErr = err + } + }) + + if readErr != nil { + return readErr + } + if writeErr != nil { + return writeErr + } + + } + + return writer.Close() + +} + +func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + // TODO improve efficiency + return false, nil +}