You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

144 lines
2.7 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package B2Sink
  2. import (
  3. "context"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  8. "github.com/chrislusf/seaweedfs/weed/replication/source"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/kurin/blazer/b2"
  11. )
  12. type B2Sink struct {
  13. client *b2.Client
  14. bucket string
  15. dir string
  16. filerSource *source.FilerSource
  17. }
  18. func init() {
  19. sink.Sinks = append(sink.Sinks, &B2Sink{})
  20. }
  21. func (g *B2Sink) GetName() string {
  22. return "backblaze"
  23. }
  24. func (g *B2Sink) GetSinkToDirectory() string {
  25. return g.dir
  26. }
  27. func (g *B2Sink) Initialize(configuration util.Configuration) error {
  28. return g.initialize(
  29. configuration.GetString("b2_account_id"),
  30. configuration.GetString("b2_master_application_key"),
  31. configuration.GetString("bucket"),
  32. configuration.GetString("directory"),
  33. )
  34. }
  35. func (g *B2Sink) SetSourceFiler(s *source.FilerSource) {
  36. g.filerSource = s
  37. }
  38. func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
  39. ctx := context.Background()
  40. client, err := b2.NewClient(ctx, accountId, accountKey)
  41. if err != nil {
  42. return err
  43. }
  44. g.client = client
  45. g.bucket = bucket
  46. g.dir = dir
  47. return nil
  48. }
  49. func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error {
  50. key = cleanKey(key)
  51. if isDirectory {
  52. key = key + "/"
  53. }
  54. ctx := context.Background()
  55. bucket, err := g.client.Bucket(ctx, g.bucket)
  56. if err != nil {
  57. return err
  58. }
  59. targetObject := bucket.Object(key)
  60. return targetObject.Delete(ctx)
  61. }
  62. func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
  63. key = cleanKey(key)
  64. if entry.IsDirectory {
  65. return nil
  66. }
  67. totalSize := filer2.TotalSize(entry.Chunks)
  68. chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
  69. ctx := context.Background()
  70. bucket, err := g.client.Bucket(ctx, g.bucket)
  71. if err != nil {
  72. return err
  73. }
  74. targetObject := bucket.Object(key)
  75. writer := targetObject.NewWriter(ctx)
  76. for _, chunk := range chunkViews {
  77. fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
  78. if err != nil {
  79. return err
  80. }
  81. var writeErr error
  82. _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
  83. _, err := writer.Write(data)
  84. if err != nil {
  85. writeErr = err
  86. }
  87. })
  88. if readErr != nil {
  89. return readErr
  90. }
  91. if writeErr != nil {
  92. return writeErr
  93. }
  94. }
  95. return writer.Close()
  96. }
  97. func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
  98. key = cleanKey(key)
  99. // TODO improve efficiency
  100. return false, nil
  101. }
  102. func cleanKey(key string) string {
  103. if strings.HasPrefix(key, "/") {
  104. key = key[1:]
  105. }
  106. return key
  107. }