Browse Source
filer store adds kv support
filer store adds kv support
can compile now, need to implement those unimplementedpull/1446/head
Chris Lu
4 years ago
10 changed files with 299 additions and 2 deletions
-
18weed/filer/abstract_sql/abstract_sql_store_kv.go
-
18weed/filer/cassandra/cassandra_store_kv.go
-
44weed/filer/etcd/etcd_store_kv.go
-
2weed/filer/filer.go
-
21weed/filer/filerstore.go
-
39weed/filer/leveldb/leveldb_store_kv.go
-
56weed/filer/leveldb2/leveldb2_store_kv.go
-
19weed/filer/mongodb/mongodb_store_kv.go
-
42weed/filer/redis/universal_redis_store_kv.go
-
42weed/filer/redis2/universal_redis_store_kv.go
@ -0,0 +1,18 @@ |
|||
package abstract_sql |
|||
|
|||
import ( |
|||
"context" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
return nil, filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
@ -0,0 +1,18 @@ |
|||
package cassandra |
|||
|
|||
import ( |
|||
"context" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
func (store *CassandraStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
return nil, filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *CassandraStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
@ -0,0 +1,44 @@ |
|||
package etcd |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
func (store *EtcdStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
_, err = store.client.Put(ctx, string(key), string(value)) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv put: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *EtcdStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
resp, err := store.client.Get(ctx, string(key), nil) |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("kv get: %v", err) |
|||
} |
|||
|
|||
if len(resp.Kvs) == 0 { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return resp.Kvs[0].Value, nil |
|||
} |
|||
|
|||
func (store *EtcdStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
_, err = store.client.Delete(ctx, string(key)) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv delete: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
@ -0,0 +1,39 @@ |
|||
package leveldb |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
) |
|||
|
|||
func (store *LevelDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
err = store.db.Put(key, value, nil) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv put: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *LevelDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
value, err = store.db.Get(key, nil) |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("kv get: %v", err) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func (store *LevelDBStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
err = store.db.Delete(key, nil) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv delete: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
@ -0,0 +1,56 @@ |
|||
package leveldb |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/syndtr/goleveldb/leveldb" |
|||
) |
|||
|
|||
func (store *LevelDB2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
partitionId := bucketKvKey(key, store.dbCount) |
|||
|
|||
err = store.dbs[partitionId].Put(key, value, nil) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv bucket %d put: %v", partitionId, err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *LevelDB2Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
partitionId := bucketKvKey(key, store.dbCount) |
|||
|
|||
value, err = store.dbs[partitionId].Get(key, nil) |
|||
|
|||
if err == leveldb.ErrNotFound { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("kv bucket %d get: %v", partitionId, err) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func (store *LevelDB2Store) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
partitionId := bucketKvKey(key, store.dbCount) |
|||
|
|||
err = store.dbs[partitionId].Delete(key, nil) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv bucket %d delete: %v", partitionId, err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func bucketKvKey(key []byte, dbCount int) (partitionId int) { |
|||
return int(key[len(key)-1]) % dbCount |
|||
} |
@ -0,0 +1,19 @@ |
|||
package mongodb |
|||
|
|||
import ( |
|||
"context" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
) |
|||
|
|||
|
|||
func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *MongodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
return nil, filer.ErrKvNotImplemented |
|||
} |
|||
|
|||
func (store *MongodbStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
return filer.ErrKvNotImplemented |
|||
} |
@ -0,0 +1,42 @@ |
|||
package redis |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/go-redis/redis" |
|||
) |
|||
|
|||
func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
_, err = store.Client.Set(string(key), value, 0).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv put: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
data, err := store.Client.Get(string(key)).Result() |
|||
|
|||
if err == redis.Nil { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return []byte(data), err |
|||
} |
|||
|
|||
func (store *UniversalRedisStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
_, err = store.Client.Del(string(key)).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv delete: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
@ -0,0 +1,42 @@ |
|||
package redis2 |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/go-redis/redis" |
|||
) |
|||
|
|||
func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
|
|||
_, err = store.Client.Set(string(key), value, 0).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv put: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
|
|||
data, err := store.Client.Get(string(key)).Result() |
|||
|
|||
if err == redis.Nil { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return []byte(data), err |
|||
} |
|||
|
|||
func (store *UniversalRedis2Store) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
|
|||
_, err = store.Client.Del(string(key)).Result() |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("kv delete: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue