|
@ -2,6 +2,8 @@ package B2Sink |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
|
|
|
"strings" |
|
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer2" |
|
|
"github.com/chrislusf/seaweedfs/weed/filer2" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/replication/sink" |
|
|
"github.com/chrislusf/seaweedfs/weed/replication/sink" |
|
@ -31,8 +33,8 @@ func (g *B2Sink) GetSinkToDirectory() string { |
|
|
|
|
|
|
|
|
func (g *B2Sink) Initialize(configuration util.Configuration) error { |
|
|
func (g *B2Sink) Initialize(configuration util.Configuration) error { |
|
|
return g.initialize( |
|
|
return g.initialize( |
|
|
configuration.GetString("account_id"), |
|
|
|
|
|
configuration.GetString("account_key"), |
|
|
|
|
|
|
|
|
configuration.GetString("b2_account_id"), |
|
|
|
|
|
configuration.GetString("b2_master_application_key"), |
|
|
configuration.GetString("bucket"), |
|
|
configuration.GetString("bucket"), |
|
|
configuration.GetString("directory"), |
|
|
configuration.GetString("directory"), |
|
|
) |
|
|
) |
|
@ -46,7 +48,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { |
|
|
ctx := context.Background() |
|
|
ctx := context.Background() |
|
|
client, err := b2.NewClient(ctx, accountId, accountKey) |
|
|
client, err := b2.NewClient(ctx, accountId, accountKey) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil |
|
|
|
|
|
|
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
g.client = client |
|
|
g.client = client |
|
@ -58,6 +60,8 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { |
|
|
|
|
|
|
|
|
func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { |
|
|
func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { |
|
|
|
|
|
|
|
|
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
|
if isDirectory { |
|
|
if isDirectory { |
|
|
key = key + "/" |
|
|
key = key + "/" |
|
|
} |
|
|
} |
|
@ -77,6 +81,8 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) |
|
|
|
|
|
|
|
|
func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
|
|
|
|
|
|
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
|
if entry.IsDirectory { |
|
|
if entry.IsDirectory { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
@ -123,6 +129,16 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { |
|
|
func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { |
|
|
|
|
|
|
|
|
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
|
// TODO improve efficiency
|
|
|
// TODO improve efficiency
|
|
|
return false, nil |
|
|
return false, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func cleanKey(key string) string { |
|
|
|
|
|
if strings.HasPrefix(key, "/") { |
|
|
|
|
|
key = key[1:] |
|
|
|
|
|
} |
|
|
|
|
|
return key |
|
|
|
|
|
} |