From 411c0df3fec3344c3e5538cf56b54c1567162386 Mon Sep 17 00:00:00 2001 From: elee Date: Fri, 18 Mar 2022 21:51:16 -0500 Subject: [PATCH] switch to multi collection, change readme --- weed/filer/arangodb/arangodb_store.go | 213 +++++++------------ weed/filer/arangodb/arangodb_store_bucket.go | 26 ++- weed/filer/arangodb/arangodb_store_kv.go | 10 +- weed/filer/arangodb/helpers.go | 92 ++++++++ weed/filer/arangodb/readme.md | 48 +++-- 5 files changed, 218 insertions(+), 171 deletions(-) diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go index 23aefe40f..d27799b0e 100644 --- a/weed/filer/arangodb/arangodb_store.go +++ b/weed/filer/arangodb/arangodb_store.go @@ -4,7 +4,9 @@ import ( "context" "crypto/tls" "fmt" + "strconv" "strings" + "sync" "time" "github.com/arangodb/go-driver" @@ -19,11 +21,20 @@ func init() { filer.Stores = append(filer.Stores, &ArangodbStore{}) } +var ( + BUCKET_PREFIX = "/buckets" + DEFAULT_COLLECTION = "seaweed_no_bucket" + KVMETA_COLLECTION = "seaweed_kvmeta" +) + type ArangodbStore struct { - connect driver.Connection - client driver.Client - database driver.Database - collection driver.Collection + connect driver.Connection + client driver.Client + database driver.Database + kvCollection driver.Collection + + buckets map[string]driver.Collection + mu sync.RWMutex databaseName string } @@ -32,7 +43,6 @@ type Model struct { Key string `json:"_key"` Directory string `json:"directory,omitempty"` Name string `json:"name,omitempty"` - Bucket string `json:"bucket,omitempty"` Ttl string `json:"ttl,omitempty"` //arangodb does not support binary blobs @@ -46,6 +56,7 @@ func (store *ArangodbStore) GetName() string { } func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + store.buckets = make(map[string]driver.Collection, 3) store.databaseName = configuration.GetString(prefix + "db_name") return store.connection(configuration.GetStringSlice(prefix+"servers"), configuration.GetString(prefix+"user"), @@ -85,49 +96,7 @@ func (store *ArangodbStore) connection(uris []string, user string, pass string, if err != nil { return err } - - coll_name := "files" - ok, err = store.database.CollectionExists(ctx, coll_name) - if err != nil { - return err - } - if ok { - store.collection, err = store.database.Collection(ctx, coll_name) - } else { - store.collection, err = store.database.CreateCollection(ctx, coll_name, &driver.CreateCollectionOptions{}) - } - if err != nil { - return err - } - - // ensure indices - - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory", "name"}, - &driver.EnsurePersistentIndexOptions{ - Name: "directory_name_multi", - Unique: true, - }); err != nil { - return err - } - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"directory"}, - &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { - return err - } - - if _, _, err = store.collection.EnsureTTLIndex(ctx, "ttl", 1, - &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil { - return err - } - - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ - Name: "IDX_name", - }); err != nil { - return err - } - if _, _, err = store.collection.EnsurePersistentIndex(ctx, []string{"bucket"}, &driver.EnsurePersistentIndexOptions{ - Name: "IDX_bucket", - Sparse: true, //sparse index, to locate files of bucket - }); err != nil { + if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil { return err } return err @@ -140,8 +109,13 @@ const ( ) func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + keys := make([]string, 0, len(store.buckets)+1) + for k := range store.buckets { + keys = append(keys, k) + } + keys = append(keys, store.kvCollection.Name()) txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{ - Exclusive: []string{"files"}, + Exclusive: keys, }, &driver.BeginTransactionOptions{}) if err != nil { return nil, err @@ -186,23 +160,27 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) if len(entry.Chunks) > 50 { meta = util.MaybeGzipData(meta) } - bucket, _ := extractBucket(entry.FullPath) model := &Model{ Key: hashString(string(entry.FullPath)), Directory: dir, Name: name, Meta: bytesToArray(meta), - Bucket: bucket, } if entry.TtlSec > 0 { model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339) } else { model.Ttl = "" } - _, err = store.collection.CreateDocument(ctx, model) + + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.CreateDocument(ctx, model) if driver.IsConflict(err) { return store.UpdateEntry(ctx, entry) } + if err != nil { return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err) } @@ -211,21 +189,6 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) } -func extractBucket(fullpath util.FullPath) (string, string) { - if !strings.HasPrefix(string(fullpath), "/buckets/") { - return "", string(fullpath) - } - bucketAndObjectKey := string(fullpath)[len("/buckets/"):] - t := strings.Index(bucketAndObjectKey, "/") - bucket := bucketAndObjectKey - shortPath := "/" - if t > 0 { - bucket = bucketAndObjectKey[:t] - shortPath = string(util.FullPath(bucketAndObjectKey[t:])) - } - return bucket, shortPath -} - func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -247,9 +210,11 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) } else { model.Ttl = "none" } - - _, err = store.collection.UpdateDocument(ctx, model.Key, model) - + targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath) + if err != nil { + return err + } + _, err = targetCollection.UpdateDocument(ctx, model.Key, model) if err != nil { return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err) } @@ -259,11 +224,15 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { var data Model - _, err = store.collection.ReadDocument(ctx, hashString(string(fullpath)), &data) - if driver.IsNotFound(err) { - return nil, filer_pb.ErrNotFound + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return nil, err } + _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data) if err != nil { + if driver.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } glog.Errorf("find %s: %v", fullpath, err) return nil, filer_pb.ErrNotFound } @@ -281,8 +250,12 @@ func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPat return entry, nil } -func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - _, err := store.collection.RemoveDocument(ctx, hashString(string(fullpath))) +func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } + _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath))) if err != nil && !driver.IsNotFound(err) { glog.Errorf("find %s: %v", fullpath, err) return fmt.Errorf("delete %s : %v", fullpath, err) @@ -290,14 +263,21 @@ func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullP return nil } -func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { +// this runs in log time +func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { var query string + targetCollection, err := store.extractBucketCollection(ctx, fullpath) + if err != nil { + return err + } query = query + fmt.Sprintf(` - for d in files + for d in %s filter starts_with(d.directory, "%s/") || d.directory == "%s" - remove d._key in files`, + remove d._key in %s`, + targetCollection.Name(), strings.Join(strings.Split(string(fullpath), "/"), ","), string(fullpath), + targetCollection.Name(), ) cur, err := store.database.Query(ctx, query, nil) if err != nil { @@ -307,70 +287,33 @@ func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath u return nil } +func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - // if no prefix, then dont use index - if prefix == "" { - return store.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) + targetCollection, err := store.extractBucketCollection(ctx, dirPath) + if err != nil { + return lastFileName, err } - eq := "" + query := "for d in " + targetCollection.Name() if includeStartFile { - eq = "filter d.name >= \"" + startFileName + "\"" + query = query + " filter d.name >= \"" + startFileName + "\" " } else { - eq = "filter d.name > \"" + startFileName + "\"" + query = query + " filter d.name > \"" + startFileName + "\" " } - query := fmt.Sprintf(` -for d in files + if prefix != "" { + query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix) + } + query = query + ` filter d.directory == @dir -filter starts_with(d.name, @prefix) -%s sort d.name asc -limit %d -return d`, eq, limit) - cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath, "prefix": prefix}) - if err != nil { - return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) +` + if limit > 0 { + query = query + "limit " + strconv.Itoa(int(limit)) } - defer cur.Close() - for cur.HasMore() { - var data Model - _, err = cur.ReadDocument(ctx, &data) - if err != nil { - break - } - entry := &filer.Entry{ - FullPath: util.NewFullPath(data.Directory, data.Name), - } - lastFileName = data.Name - converted := arrayToBytes(data.Meta) - if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil { - err = decodeErr - glog.V(0).Infof("list %s : %v", entry.FullPath, err) - break - } - - if !eachEntryFunc(entry) { - break - } - - } - return lastFileName, err -} - -func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - eq := "" - if includeStartFile { - eq = "filter d.name >= \"" + startFileName + "\"" - } else { - eq = "filter d.name > \"" + startFileName + "\"" - } - query := fmt.Sprintf(` -for d in files -filter d.directory == "%s" -%s -sort d.name asc -limit %d -return d`, string(dirPath), eq, limit) - cur, err := store.database.Query(ctx, query, nil) + query = query + "\n return d" + cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath}) if err != nil { return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err) } @@ -382,7 +325,7 @@ return d`, string(dirPath), eq, limit) break } entry := &filer.Entry{ - FullPath: util.NewFullPath(string(dirPath), data.Name), + FullPath: util.NewFullPath(data.Directory, data.Name), } lastFileName = data.Name converted := arrayToBytes(data.Meta) diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go index 907328bda..63d407309 100644 --- a/weed/filer/arangodb/arangodb_store_bucket.go +++ b/weed/filer/arangodb/arangodb_store_bucket.go @@ -2,28 +2,38 @@ package arangodb import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer" "time" + "github.com/arangodb/go-driver" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" ) var _ filer.BucketAware = (*ArangodbStore)(nil) func (store *ArangodbStore) OnBucketCreation(bucket string) { - //nothing needs to be done + timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // create the collection && add to cache + _, err := store.ensureBucket(timeout, bucket) + if err != nil { + glog.V(0).Infof("bucket create %s : %w", bucket, err) + } } func (store *ArangodbStore) OnBucketDeletion(bucket string) { timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - cur, err := store.database.Query(timeout, ` -for d in files -filter d.bucket == @bucket -remove d in files`, map[string]interface{}{"bucket": bucket}) + collection, err := store.ensureBucket(timeout, bucket) if err != nil { - glog.V(0).Infof("bucket delete %s : %v", bucket, err) + glog.V(0).Infof("bucket delete %s : %w", bucket, err) + return + } + err = collection.Remove(timeout) + if err != nil && !driver.IsNotFound(err) { + glog.V(0).Infof("bucket delete %s : %w", bucket, err) + return } - defer cur.Close() } func (store *ArangodbStore) CanDropWholeBucket() bool { return true diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go index 2978f3dbe..c1307e78d 100644 --- a/weed/filer/arangodb/arangodb_store_kv.go +++ b/weed/filer/arangodb/arangodb_store_kv.go @@ -16,14 +16,14 @@ func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) Meta: bytesToArray(value), } - exists, err := store.collection.DocumentExists(ctx, model.Key) + exists, err := store.kvCollection.DocumentExists(ctx, model.Key) if err != nil { return fmt.Errorf("kv put: %v", err) } if exists { - _, err = store.collection.UpdateDocument(ctx, model.Key, model) + _, err = store.kvCollection.UpdateDocument(ctx, model.Key, model) } else { - _, err = store.collection.CreateDocument(ctx, model) + _, err = store.kvCollection.CreateDocument(ctx, model) } if err != nil { return fmt.Errorf("kv put: %v", err) @@ -33,7 +33,7 @@ func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) } func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { var model Model - _, err = store.collection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model) + _, err = store.kvCollection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model) if driver.IsNotFound(err) { return nil, filer.ErrKvNotFound } @@ -45,7 +45,7 @@ func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte } func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) { - _, err = store.collection.RemoveDocument(ctx, hashString(".kvstore."+string(key))) + _, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key))) if err != nil { glog.Errorf("kv del: %v", err) return filer.ErrKvNotFound diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go index cf59957a6..c91ef2be5 100644 --- a/weed/filer/arangodb/helpers.go +++ b/weed/filer/arangodb/helpers.go @@ -1,10 +1,15 @@ package arangodb import ( + "context" "crypto/md5" "encoding/binary" "encoding/hex" "io" + "strings" + + "github.com/arangodb/go-driver" + "github.com/chrislusf/seaweedfs/weed/util" ) //convert a string into arango-key safe hex bytes hash @@ -42,3 +47,90 @@ func arrayToBytes(xs []uint64) []byte { } return out[:first] } + +// gets the bucket name out of filepath +func extractBucket(fullpath util.FullPath) (string, string) { + if !strings.HasPrefix(string(fullpath), BUCKET_PREFIX+"/") { + return "", string(fullpath) + } + if strings.Count(string(fullpath), "/") < 2 { + return "", string(fullpath) + } + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + bucket := bucketAndObjectKey + shortPath := "/" + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = string(util.FullPath(bucketAndObjectKey[t:])) + } + return bucket, shortPath +} + +// gets the collection the bucket points to from filepath +func (store *ArangodbStore) extractBucketCollection(ctx context.Context, fullpath util.FullPath) (c driver.Collection, err error) { + bucket, _ := extractBucket(fullpath) + if bucket == "" { + bucket = DEFAULT_COLLECTION + } + c, err = store.ensureBucket(ctx, bucket) + if err != nil { + return nil, err + } + return c, err +} + +// get bucket collection from cache. if not exist, creates the buckets collection and grab it +func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc driver.Collection, err error) { + var ok bool + store.mu.RLock() + bc, ok = store.buckets[bucket] + store.mu.RUnlock() + if ok { + return bc, nil + } + store.mu.Lock() + defer store.mu.Unlock() + store.buckets[bucket], err = store.ensureCollection(ctx, bucket) + if err != nil { + return nil, err + } + return store.buckets[bucket], nil +} + +// creates collection if not exist, ensures indices if not exist +func (store *ArangodbStore) ensureCollection(ctx context.Context, name string) (c driver.Collection, err error) { + ok, err := store.database.CollectionExists(ctx, name) + if err != nil { + return + } + if ok { + c, err = store.database.Collection(ctx, name) + } else { + c, err = store.database.CreateCollection(ctx, name, &driver.CreateCollectionOptions{}) + } + if err != nil { + return + } + // ensure indices + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory", "name"}, + &driver.EnsurePersistentIndexOptions{ + Name: "directory_name_multi", Unique: true, + }); err != nil { + return + } + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory"}, + &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil { + return + } + if _, _, err = c.EnsureTTLIndex(ctx, "ttl", 1, + &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil { + return + } + if _, _, err = c.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{ + Name: "IDX_name", + }); err != nil { + return + } + return c, nil +} diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md index e56012d8c..e189811fb 100644 --- a/weed/filer/arangodb/readme.md +++ b/weed/filer/arangodb/readme.md @@ -3,7 +3,6 @@ database: https://github.com/arangodb/arangodb go driver: https://github.com/arangodb/go-driver - options: ``` @@ -11,40 +10,43 @@ options: enabled=true db_name="seaweedfs" servers=["http://localhost:8529"] +#basic auth user="root" pass="test" -# whether to enable fulltext index -# this allows for directory prefix query -fulltext=true - # tls settings insecure_skip_verify=true ``` -supports buckets with an extra field in document. -omitempty means extra space is not used. - -i test with +i test using this dev database: `docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0` -## todo - -performance test - +## features i don't personally need but are missing + [ ] provide tls cert to arango + [ ] authentication that is not basic auth + [ ] synchronise endpoint interval config + [ ] automatic creation of custom index + [ ] configure default arangodb collection sharding rules + [ ] configure default arangodb collection replication rules -## thoughts -should there be one collection per bucket? this would make deleting a bucket O(1) instead of O(n) - - -## comparison +## complexity ok, so if https://www.arangodb.com/docs/stable/indexing-index-basics.html#persistent-index is correct -it should be log time to the number of files in the directory - -and constant time if you have full directory + file - -deleting a folder should be log time to number of folders + files that need to be deleted +O(1) +- InsertEntry +- UpdateEntry +- FindEntry +- DeleteEntry +- KvPut +- KvGet +- KvDelete + +O(log(BUCKET_SIZE)) +- DeleteFolderChildren + +O(log(DIRECTORY_SIZE)) +- ListDirectoryEntries +- ListDirectoryPrefixedEntries