You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
2.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package B2Sink
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
  5. "strings"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  9. "github.com/chrislusf/seaweedfs/weed/replication/source"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/kurin/blazer/b2"
  12. )
  13. type B2Sink struct {
  14. client *b2.Client
  15. bucket string
  16. dir string
  17. filerSource *source.FilerSource
  18. }
  19. func init() {
  20. sink.Sinks = append(sink.Sinks, &B2Sink{})
  21. }
  22. func (g *B2Sink) GetName() string {
  23. return "backblaze"
  24. }
  25. func (g *B2Sink) GetSinkToDirectory() string {
  26. return g.dir
  27. }
  28. func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
  29. return g.initialize(
  30. configuration.GetString(prefix+"b2_account_id"),
  31. configuration.GetString(prefix+"b2_master_application_key"),
  32. configuration.GetString(prefix+"bucket"),
  33. configuration.GetString(prefix+"directory"),
  34. )
  35. }
  36. func (g *B2Sink) SetSourceFiler(s *source.FilerSource) {
  37. g.filerSource = s
  38. }
  39. func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
  40. client, err := b2.NewClient(context.Background(), accountId, accountKey)
  41. if err != nil {
  42. return err
  43. }
  44. g.client = client
  45. g.bucket = bucket
  46. g.dir = dir
  47. return nil
  48. }
  49. func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  50. key = cleanKey(key)
  51. if isDirectory {
  52. key = key + "/"
  53. }
  54. bucket, err := g.client.Bucket(context.Background(), g.bucket)
  55. if err != nil {
  56. return err
  57. }
  58. targetObject := bucket.Object(key)
  59. return targetObject.Delete(context.Background())
  60. }
  61. func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  62. key = cleanKey(key)
  63. if entry.IsDirectory {
  64. return nil
  65. }
  66. totalSize := filer.FileSize(entry)
  67. chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
  68. bucket, err := g.client.Bucket(context.Background(), g.bucket)
  69. if err != nil {
  70. return err
  71. }
  72. targetObject := bucket.Object(key)
  73. writer := targetObject.NewWriter(context.Background())
  74. writeFunc := func(data []byte) error {
  75. _, writeErr := writer.Write(data)
  76. return writeErr
  77. }
  78. defer writer.Close()
  79. if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
  80. return err
  81. }
  82. return nil
  83. }
  84. func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  85. key = cleanKey(key)
  86. // TODO improve efficiency
  87. return false, nil
  88. }
  89. func cleanKey(key string) string {
  90. if strings.HasPrefix(key, "/") {
  91. key = key[1:]
  92. }
  93. return key
  94. }