From 83080b5e034bdbc0ba58eb410d04fb78bebf08cf Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Fri, 4 Sep 2020 15:40:13 +0800 Subject: [PATCH 1/3] ES backended filer support kv ops. --- go.mod | 1 - weed/command/scaffold.go | 2 +- weed/filer/elastic/v7/elastic_store.go | 63 ++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index d404b9d52..98ac2b4e5 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6 github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect - github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.4.2 github.com/google/btree v1.0.0 github.com/google/uuid v1.1.1 diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 479e0665f..68fe8e982 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -177,7 +177,7 @@ database = "seaweedfs" [elastic7] enabled = false servers = "http://localhost:9200" -# increase the value is recommend, both here and in elastic cluster configuration +# increase the value is recommend, be sure the value in Elastic is greater or equal here index.max_result_window = 10000 ` diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 5c57e352a..d263b5dae 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -18,6 +18,7 @@ import ( var ( indexType = "_doc" indexPrefix = ".seaweedfs_" + indexKV = ".seaweedfs_kv_entries" ) type ESEntry struct { @@ -34,6 +35,11 @@ type ElasticStore struct { maxPageSize int } +type ESKVEntry struct { + Key string `json:Key` + Value string `json:Value` +} + func (store *ElasticStore) GetName() string { return "elastic7" } @@ -66,15 +72,66 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } + func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { - return filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + deleteResult, err := store.client.Delete(). + Index(indexKV). + Type(indexType). + Id(id). + Do(context.Background()) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete key(id:%s) %v.", string(key), err) + return fmt.Errorf("delete key %v.", err) } + func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - return []byte(""), filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + searchResult, err := store.client.Get(). + Index(indexKV). + Type(indexType). + Id(id). + Do(context.Background()) + if elastic.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESKVEntry{} + if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { + return []byte(esEntry.Value), nil + } + } + glog.Errorf("find key(%s),%v.", string(key), err) + return nil, filer_pb.ErrNotFound } + func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return filer.ErrKvNotImplemented + id := fmt.Sprintf("%x", md5.Sum(key)) + esEntry := &ESKVEntry{ + string(key), + string(value), + } + val, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert key(%s) %v.", string(key), err) + return fmt.Errorf("insert key %v.", err) + } + _, err = store.client.Index(). + Index(indexKV). + Type(indexType). + Id(id). + BodyJson(string(val)). + Do(context.Background()) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + return nil } + func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { return nil, filer.ErrUnsupportedListDirectoryPrefixed } From 1384ff9a2f521ddc17ae6ef41d9e4cd8237565f4 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Fri, 4 Sep 2020 17:34:26 +0800 Subject: [PATCH 2/3] 1.split kv in one file. 2.disable query for kv in es index. --- weed/filer/elastic/v7/elastic_store.go | 122 +++++++--------------- weed/filer/elastic/v7/elastic_store_kv.go | 64 ++++++++++++ 2 files changed, 100 insertions(+), 86 deletions(-) create mode 100644 weed/filer/elastic/v7/elastic_store_kv.go diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index d263b5dae..6f00c8768 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -16,9 +16,14 @@ import ( ) var ( - indexType = "_doc" - indexPrefix = ".seaweedfs_" - indexKV = ".seaweedfs_kv_entries" + indexType = "_doc" + indexPrefix = ".seaweedfs_" + indexKV = ".seaweedfs_kv_entries" + mappingWithoutQuery = ` { + "mappings": { + "enabled": false + } +}` ) type ESEntry struct { @@ -26,6 +31,10 @@ type ESEntry struct { Entry *filer.Entry } +type ESKVEntry struct { + Value string `json:Value` +} + func init() { filer.Stores = append(filer.Stores, &ElasticStore{}) } @@ -35,11 +44,6 @@ type ElasticStore struct { maxPageSize int } -type ESKVEntry struct { - Key string `json:Key` - Value string `json:Value` -} - func (store *ElasticStore) GetName() string { return "elastic7" } @@ -61,6 +65,12 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre if err != nil { return fmt.Errorf("init elastic %s: %v.", servers, err) } + if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok { + _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background()) + if err != nil { + return fmt.Errorf("create index(%s) %v.", indexKV, err) + } + } return nil } func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -72,66 +82,6 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } - -func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { - id := fmt.Sprintf("%x", md5.Sum(key)) - deleteResult, err := store.client.Delete(). - Index(indexKV). - Type(indexType). - Id(id). - Do(context.Background()) - if err == nil { - if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { - return nil - } - } - glog.Errorf("delete key(id:%s) %v.", string(key), err) - return fmt.Errorf("delete key %v.", err) -} - -func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - id := fmt.Sprintf("%x", md5.Sum(key)) - searchResult, err := store.client.Get(). - Index(indexKV). - Type(indexType). - Id(id). - Do(context.Background()) - if elastic.IsNotFound(err) { - return nil, filer_pb.ErrNotFound - } - if searchResult != nil && searchResult.Found { - esEntry := &ESKVEntry{} - if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { - return []byte(esEntry.Value), nil - } - } - glog.Errorf("find key(%s),%v.", string(key), err) - return nil, filer_pb.ErrNotFound -} - -func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - id := fmt.Sprintf("%x", md5.Sum(key)) - esEntry := &ESKVEntry{ - string(key), - string(value), - } - val, err := jsoniter.Marshal(esEntry) - if err != nil { - glog.Errorf("insert key(%s) %v.", string(key), err) - return fmt.Errorf("insert key %v.", err) - } - _, err = store.client.Index(). - Index(indexKV). - Type(indexType). - Id(id). - BodyJson(string(val)). - Do(context.Background()) - if err != nil { - return fmt.Errorf("kv put: %v", err) - } - return nil -} - func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { return nil, filer.ErrUnsupportedListDirectoryPrefixed } @@ -154,7 +104,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) Type(indexType). Id(id). BodyJson(string(value)). - Do(context.Background()) + Do(ctx) if err != nil { glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) return fmt.Errorf("insert entry %v.", err) @@ -171,7 +121,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful Index(index). Type(indexType). Id(id). - Do(context.Background()) + Do(ctx) if elastic.IsNotFound(err) { return nil, filer_pb.ErrNotFound } @@ -190,24 +140,24 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) if strings.Count(string(fullpath), "/") == 1 { - return store.deleteIndex(index) + return store.deleteIndex(ctx, index) } - return store.deleteEntry(index, id) + return store.deleteEntry(ctx, index, id) } -func (store *ElasticStore) deleteIndex(index string) (err error) { - deleteResult, err := store.client.DeleteIndex(index).Do(context.Background()) +func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { + deleteResult, err := store.client.DeleteIndex(index).Do(ctx) if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { return nil } glog.Errorf("delete index(%s) %v.", index, err) return err } -func (store *ElasticStore) deleteEntry(index, id string) (err error) { +func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { deleteResult, err := store.client.Delete(). Index(index). Type(indexType). Id(id). - Do(context.Background()) + Do(ctx) if err == nil { if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { return nil @@ -235,7 +185,7 @@ func (store *ElasticStore) ListDirectoryEntries( } func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { - indexResult, err := store.client.CatIndices().Do(context.Background()) + indexResult, err := store.client.CatIndices().Do(ctx) if err != nil { glog.Errorf("list indices %v.", err) return entries, err @@ -266,16 +216,16 @@ func (store *ElasticStore) listDirectoryEntries( index := getIndex(fullpath) nextStart := "" parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) - if _, err := store.client.Refresh(index).Do(context.Background()); err != nil { + if _, err := store.client.Refresh(index).Do(ctx); err != nil { if elastic.IsNotFound(err) { - store.client.CreateIndex(index).Do(context.Background()) + store.client.CreateIndex(index).Do(ctx) return entries, nil } } for { result := &elastic.SearchResult{} if (startFileName == "" && first) || inclusive { - if result, err = store.search(index, parentId); err != nil { + if result, err = store.search(ctx, index, parentId); err != nil { glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) return entries, err } @@ -285,7 +235,7 @@ func (store *ElasticStore) listDirectoryEntries( fullPath = nextStart } after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath))) - if result, err = store.searchAfter(index, parentId, after); err != nil { + if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) return entries, err } @@ -316,8 +266,8 @@ func (store *ElasticStore) listDirectoryEntries( return entries, nil } -func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) { - if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 { +func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { + if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 { return &elastic.SearchResult{ Hits: &elastic.SearchHits{ Hits: make([]*elastic.SearchHit, 0)}, @@ -328,18 +278,18 @@ func (store *ElasticStore) search(index, parentId string) (result *elastic.Searc Query(elastic.NewMatchQuery("ParentId", parentId)). Size(store.maxPageSize). Sort("_id", false). - Do(context.Background()) + Do(ctx) return queryResult, err } -func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) { +func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) { queryResult, err := store.client.Search(). Index(index). Query(elastic.NewMatchQuery("ParentId", parentId)). SearchAfter(after). Size(store.maxPageSize). Sort("_id", false). - Do(context.Background()) + Do(ctx) return queryResult, err } diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go new file mode 100644 index 000000000..cfb3fdf8c --- /dev/null +++ b/weed/filer/elastic/v7/elastic_store_kv.go @@ -0,0 +1,64 @@ +package elastic + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + jsoniter "github.com/json-iterator/go" + elastic "github.com/olivere/elastic/v7" +) + +func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { + deleteResult, err := store.client.Delete(). + Index(indexKV). + Type(indexType). + Id(string(key)). + Do(ctx) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete key(id:%s) %v.", string(key), err) + return fmt.Errorf("delete key %v.", err) +} + +func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + searchResult, err := store.client.Get(). + Index(indexKV). + Type(indexType). + Id(string(key)). + Do(ctx) + if elastic.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESKVEntry{} + if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { + return []byte(esEntry.Value), nil + } + } + glog.Errorf("find key(%s),%v.", string(key), err) + return nil, filer_pb.ErrNotFound +} + +func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + esEntry := &ESKVEntry{string(value)} + val, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert key(%s) %v.", string(key), err) + return fmt.Errorf("insert key %v.", err) + } + _, err = store.client.Index(). + Index(indexKV). + Type(indexType). + Id(string(key)). + BodyJson(string(val)). + Do(ctx) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + return nil +} From 450cf075051a2a1099288be43984290ce1721c24 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Fri, 4 Sep 2020 21:49:03 +0800 Subject: [PATCH 3/3] skip the index that for kv usage. --- weed/filer/elastic/v7/elastic_store.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 6f00c8768..29e9689f4 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -191,6 +191,9 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi return entries, err } for _, index := range indexResult { + if index.Index == indexKV { + continue + } if strings.HasPrefix(index.Index, indexPrefix) { if entry, err := store.FindEntry(ctx, weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {