From 423ce57cde83433103a4a59c04421c8c33b84287 Mon Sep 17 00:00:00 2001 From: elee Date: Thu, 17 Mar 2022 21:12:25 -0500 Subject: [PATCH] prefix search, bucket implemented --- weed/command/scaffold/filer.toml | 10 +- weed/filer/arangodb/arangodb_store.go | 224 +++++++++---------- weed/filer/arangodb/arangodb_store_bucket.go | 30 +++ weed/filer/arangodb/arangodb_store_kv.go | 26 +-- weed/filer/arangodb/helpers.go | 44 ++++ weed/filer/arangodb/readme.md | 29 +++ weed/filer/filer_delete_entry.go | 2 - 7 files changed, 227 insertions(+), 138 deletions(-) create mode 100644 weed/filer/arangodb/arangodb_store_bucket.go create mode 100644 weed/filer/arangodb/helpers.go create mode 100644 weed/filer/arangodb/readme.md diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 19da66dc1..6a7835de3 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -287,9 +287,13 @@ index.max_result_window = 10000 [arangodb] # in development dont use it enabled = false -arango_host=["http://localhost:8529"] -arango_user="" -arango_pass="" +db_name = "seaweedfs" +servers=["http://localhost:8529"] # list of servers to connect to +# only basic auth supported for now +user="" +pass="" +# skip tls cert validation +insecure_skip_verify = true ########################## diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index e0f036344..a9f55f5bc 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -2,12 +2,9 @@ package arangodb import ( "context" - "crypto/md5" "crypto/tls" - "encoding/binary" - "encoding/hex" "fmt" - "io" + "strings" "time" "github.com/arangodb/go-driver" @@ -27,13 +24,20 @@ type ArangodbStore struct { client driver.Client database driver.Database collection driver.Collection + + databaseName string } type Model struct { - Key string `json:"_key"` - Directory string `json:"directory"` - Name string `json:"name"` - Meta []uint64 `json:"meta"` + Key string `json:"_key"` + Directory string `json:"directory,omitempty"` + Name string `json:"name,omitempty"` + Bucket string `json:"bucket,omitempty"` + + //arangodb does not support binary blobs + //we encode byte slice into uint64 slice + //see helpers.go + Meta []uint64 `json:"meta"` } func (store *ArangodbStore) GetName() string { @@ -41,19 +45,21 @@ func (store *ArangodbStore) GetName() string { } func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { - return store.connection(configuration.GetStringSlice(prefix+"arango_host"), - configuration.GetString(prefix+"arango_user"), - configuration.GetString(prefix+"arango_pass"), + store.databaseName = configuration.GetString(prefix + "db_name") + return store.connection(configuration.GetStringSlice(prefix+"servers"), + configuration.GetString(prefix+"user"), + configuration.GetString(prefix+"pass"), + configuration.GetBool(prefix+"insecure_skip_verify"), ) } -func (store *ArangodbStore) connection(uris []string, user string, pass string) (err error) { +func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) { ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) store.connect, err = http.NewConnection(http.ConnectionConfig{ Endpoints: uris, TLSConfig: &tls.Config{ - InsecureSkipVerify: true, + InsecureSkipVerify: insecure, }, }) if err != nil { @@ -66,15 +72,14 @@ func (store *ArangodbStore) connection(uris []string, user string, pass string) if err != nil { return err } - db_name := "seaweed-filer" - ok, err := store.client.DatabaseExists(ctx, db_name) + ok, err := store.client.DatabaseExists(ctx, store.databaseName) if err != nil { return err } if ok { - store.database, err = store.client.Database(ctx, db_name) + store.database, err = store.client.Database(ctx, store.databaseName) } else { - store.database, err = store.client.CreateDatabase(ctx, db_name, &driver.CreateDatabaseOptions{}) + store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{}) } if err != nil { return err @@ -96,29 +101,29 @@ func (store *ArangodbStore) connection(uris []string, user string, pass string) // ensure indices - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory", "name"}, &driver.EnsurePersistentIndexOptions{ - Name: "directory_name_multi", - Unique: true, - }); err != nil { + if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory", "name"}, + &driver.EnsurePersistentIndexOptions{ + Name: "directory_name_multi", + Unique: true, + }); err != nil { return err } if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory"}, &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { return err } - // fulltext index not required since no prefix search - // might change - // if _, _, err = store.collection.EnsureFullTextIndex(ctx, []string{"directory"}, - // &driver.EnsureFullTextIndexOptions{Name: "IDX_FULLTEXT_directory", MinLength: 1}); err != nil { - // return err - // } if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ Name: "IDX_name", }); err != nil { return err } - + if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"bucket"}, &driver.EnsurePersistentIndexOptions{ + Name: "IDX_bucket", + Sparse: true, //sparse index, to locate files of bucket + }); err != nil { + return err + } return err } @@ -175,22 +180,41 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) if len(entry.Chunks) > 50 { meta = util.MaybeGzipData(meta) } + bucket, _ := extractBucket(entry.FullPath) model := &Model{ Key: hashString(string(entry.FullPath)), Directory: dir, Name: name, Meta: bytesToArray(meta), + Bucket: bucket, } _, err = store.collection.CreateDocument(ctx, model) - + if driver.IsConflict(err) { + return store.UpdateEntry(ctx, entry) + } if err != nil { - return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) + return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err) } return nil } +func extractBucket(fullpath util.FullPath) (string, string) { + if !strings.HasPrefix(string(fullpath), "/buckets/") { + return "", string(fullpath) + } + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + bucket := bucketAndObjectKey + shortPath := "/" + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = string(util.FullPath(bucketAndObjectKey[t:])) + } + return bucket, shortPath +} + func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -243,20 +267,20 @@ func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPat func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { _, err := store.collection.RemoveDocument(ctx, hashString(string(fullpath))) - if err != nil { + if err != nil && !driver.IsNotFound(err) { glog.Errorf("find %s: %v", fullpath, err) return fmt.Errorf("delete %s : %v", fullpath, err) } - return nil } func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - dir, _ := fullpath.DirAndName() - cur, err := store.database.Query(ctx, ` -for d in files -filter d.directory == @dir -remove d in files`, map[string]interface{}{"dir": dir}) + query := "" + query = fmt.Sprintf(`for d in files filter starts_with(d.directory, "%s") remove d._key in files`, + strings.Join(strings.Split(string(fullpath), "/"), ","), + string(fullpath), + ) + cur, err := store.database.Query(ctx, query, nil) if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -265,53 +289,53 @@ remove d in files`, map[string]interface{}{"dir": dir}) } func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed -} + // if no prefix, then dont use index + if prefix == "" { + return store.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) + } + eq := "" + if includeStartFile { + eq = "filter d.name >= \"" + startFileName + "\"" + } else { + eq = "filter d.name > \"" + startFileName + "\"" + } + query := fmt.Sprintf(` +for d in files +filter d.directory == @dir +filter starts_with(d.name, @prefix) +%s +sort d.name asc +limit %d +return d`, eq, limit) + cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath, "prefix": prefix}) + if err != nil { + return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) + } + defer cur.Close() + for cur.HasMore() { + var data Model + _, err = cur.ReadDocument(ctx, &data) + if err != nil { + break + } + entry := &filer.Entry{ + FullPath: util.NewFullPath(data.Directory, data.Name), + } + lastFileName = data.Name + converted := arrayToBytes(data.Meta) + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } -//TODO: i must be misunderstanding what this function is supposed to do -//so figure it out is the todo, i guess lol - aaaaa -//func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { -// eq := "" -// if includeStartFile { -// eq = "filter d.name >= \"" + startFileName + "\"" -// } else { -// eq = "filter d.name > \"" + startFileName + "\"" -// } -// query := fmt.Sprintf(` -//for d in fulltext(files,"directory","prefix:%s") -//sort d.name desc -//%s -//limit %d -//return d`, string(dirPath), eq, limit) -// cur, err := store.database.Query(ctx, query, nil) -// if err != nil { -// return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) -// } -// defer cur.Close() -// for cur.HasMore() { -// var data Model -// _, err = cur.ReadDocument(ctx, &data) -// if err != nil { -// break -// } -// entry := &filer.Entry{ -// FullPath: util.NewFullPath(data.Directory, data.Name), -// } -// lastFileName = data.Name -// converted := arrayToBytes(data.Meta) -// if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { -// err = decodeErr -// glog.V(0).Infof("list %s : %v", entry.FullPath, err) -// break -// } -// -// if !eachEntryFunc(entry) { -// break -// } -// -// } -// return lastFileName, err -//} + if !eachEntryFunc(entry) { + break + } + + } + return lastFileName, err +} func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { eq := "" @@ -323,8 +347,8 @@ func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath ut query := fmt.Sprintf(` for d in files filter d.directory == "%s" -sort d.name desc %s +sort d.name asc limit %d return d`, string(dirPath), eq, limit) cur, err := store.database.Query(ctx, query, nil) @@ -359,35 +383,3 @@ return d`, string(dirPath), eq, limit) func (store *ArangodbStore) Shutdown() { } - -//convert a string into arango-key safe hex bytes hash -func hashString(dir string) string { - h := md5.New() - io.WriteString(h, dir) - b := h.Sum(nil) - return hex.EncodeToString(b) -} - -func bytesToArray(bs []byte) []uint64 { - out := make([]uint64, 0, 2+len(bs)/8) - out = append(out, uint64(len(bs))) - for len(bs)%8 != 0 { - bs = append(bs, 0) - } - for i := 0; i < len(bs); i = i + 8 { - out = append(out, binary.BigEndian.Uint64(bs[i:])) - } - return out -} - -func arrayToBytes(xs []uint64) []byte { - if len(xs) < 2 { - return []byte{} - } - first := xs[0] - out := make([]byte, len(xs)*8) - for i := 1; i < len(xs); i = i + 1 { - binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i]) - } - return out[:first] -} diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go new file mode 100644 index 000000000..907328bda --- /dev/null +++ b/weed/filer/arangodb/arangodb_store_bucket.go @@ -0,0 +1,30 @@ +package arangodb + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +var _ filer.BucketAware = (*ArangodbStore)(nil) + +func (store *ArangodbStore) OnBucketCreation(bucket string) { + //nothing needs to be done +} +func (store *ArangodbStore) OnBucketDeletion(bucket string) { + timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + cur, err := store.database.Query(timeout, ` +for d in files +filter d.bucket == @bucket +remove d in files`, map[string]interface{}{"bucket": bucket}) + if err != nil { + glog.V(0).Infof("bucket delete %s : %v", bucket, err) + } + defer cur.Close() +} +func (store *ArangodbStore) CanDropWholeBucket() bool { + return true +} diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go index 93caa75ed..2978f3dbe 100644 --- a/weed/filer/arangodb/arangodb_store_kv.go +++ b/weed/filer/arangodb/arangodb_store_kv.go @@ -4,16 +4,15 @@ import ( "context" "fmt" + "github.com/arangodb/go-driver" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" ) func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - dir, name := genDirAndName(key) model := &Model{ - Key: hashString(string(key)), - Directory: dir, - Name: name, + Key: hashString(".kvstore." + string(key)), + Directory: ".kvstore." + string(key), Meta: bytesToArray(value), } @@ -32,31 +31,24 @@ func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) return nil } - func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { var model Model - _, err = store.collection.ReadDocument(ctx, hashString(string(key)), &model) + _, err = store.collection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model) + if driver.IsNotFound(err) { + return nil, filer.ErrKvNotFound + } if err != nil { - glog.Errorf("kv get: %v", err) + glog.Errorf("kv get: %s %v", string(key), err) return nil, filer.ErrKvNotFound } return arrayToBytes(model.Meta), nil } func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) { - _, err = store.collection.RemoveDocument(ctx, hashString(string(key))) + _, err = store.collection.RemoveDocument(ctx, hashString(".kvstore."+string(key))) if err != nil { glog.Errorf("kv del: %v", err) return filer.ErrKvNotFound } return nil } - -func genDirAndName(key []byte) (dir string, name string) { - for len(key) < 8 { - key = append(key, 0) - } - dir = string(key[:8]) - name = string(key[8:]) - return -} diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go new file mode 100644 index 000000000..cf59957a6 --- /dev/null +++ b/weed/filer/arangodb/helpers.go @@ -0,0 +1,44 @@ +package arangodb + +import ( + "crypto/md5" + "encoding/binary" + "encoding/hex" + "io" +) + +//convert a string into arango-key safe hex bytes hash +func hashString(dir string) string { + h := md5.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return hex.EncodeToString(b) +} + +// convert slice of bytes into slice of uint64 +// the first uint64 indicates the length in bytes +func bytesToArray(bs []byte) []uint64 { + out := make([]uint64, 0, 2+len(bs)/8) + out = append(out, uint64(len(bs))) + for len(bs)%8 != 0 { + bs = append(bs, 0) + } + for i := 0; i < len(bs); i = i + 8 { + out = append(out, binary.BigEndian.Uint64(bs[i:])) + } + return out +} + +// convert from slice of uint64 back to bytes +// if input length is 0 or 1, will return nil +func arrayToBytes(xs []uint64) []byte { + if len(xs) < 2 { + return nil + } + first := xs[0] + out := make([]byte, len(xs)*8) // i think this can actually be len(xs)*8-8, but i dont think an extra 8 bytes hurts... + for i := 1; i < len(xs); i = i + 1 { + binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i]) + } + return out[:first] +} diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md new file mode 100644 index 000000000..6cd187305 --- /dev/null +++ b/weed/filer/arangodb/readme.md @@ -0,0 +1,29 @@ +##arangodb + +database: https://github.com/arangodb/arangodb +go driver: https://github.com/arangodb/go-driver + + +options: + +``` +[arangodb] +enabled=true +db_name="seaweedfs" +servers=["http://localhost:8529"] +user="root" +pass="test" + +# whether to enable fulltext index +# this allows for directory prefix query +fulltext=true + +# tls settings +insecure_skip_verify=true +``` + +supports buckets with an extra field in document. +omitempty means extra space is not used. + +i test with +`docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0` diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index c774f5d27..27e68433d 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -25,9 +25,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR if findErr != nil { return findErr } - isDeleteCollection := f.isBucket(entry) - if entry.IsDirectory() { // delete the folder children, not including the folder itself err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(chunks []*filer_pb.FileChunk) error {