From a93d27d1e81efbe498cc8a34648dd5314e933255 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 3 Sep 2020 16:34:58 +0800 Subject: [PATCH 1/2] new filer option to es v7. --- go.mod | 2 + weed/command/scaffold.go | 6 + weed/filer/elastic/v7/elastic_store.go | 295 +++++++++++++++++++++++++ weed/server/filer_server.go | 1 + 4 files changed, 304 insertions(+) create mode 100644 weed/filer/elastic/v7/elastic_store.go diff --git a/go.mod b/go.mod index cdb951f9c..d2dad60cd 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,8 @@ require ( gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect + github.com/json-iterator/go v1.1.10 + github.com/olivere/elastic/v7 v7.0.19 ) replace go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547 diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index b199f2d2d..c07751786 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -173,6 +173,12 @@ enabled = false uri = "mongodb://localhost:27017" option_pool_size = 0 database = "seaweedfs" + +[elastic7] +enabled = false +servers = "http://localhost:9200" +# increase the value is recommend, both filer and elastic cluster +index.max_result_window = 10000 ` NOTIFICATION_TOML_EXAMPLE = ` diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go new file mode 100644 index 000000000..190ec4897 --- /dev/null +++ b/weed/filer/elastic/v7/elastic_store.go @@ -0,0 +1,295 @@ +package elastic + +import ( + "context" + "crypto/md5" + "fmt" + "math" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + jsoniter "github.com/json-iterator/go" + elastic "github.com/olivere/elastic/v7" +) + +var ( + indexType = "_doc" + indexPrefix = ".seaweedfs_" +) + +type ESEntry struct { + ParentId string `json:"ParentId"` + Entry *filer2.Entry +} + +func init() { + filer2.Stores = append(filer2.Stores, &ElasticStore{}) +} + +type ElasticStore struct { + client *elastic.Client + maxPageSize int +} + +func (store *ElasticStore) GetName() string { + return "elastic7" +} +func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + servers := configuration.GetString(prefix + "servers") + if servers == "" { + return fmt.Errorf("error elastic endpoints.") + } + store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") + if store.maxPageSize <= 0 { + return fmt.Errorf("error elastic index.max_result_window.") + } + glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize) + store.client, err = elastic.NewClient( + elastic.SetSniff(false), + elastic.SetHealthcheck(false), + elastic.SetURL(servers), + ) + if err != nil { + return fmt.Errorf("init elastic %s: %v.", servers, err) + } + return nil +} +func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *ElasticStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { + return nil +} +func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + index := getIndex(entry.FullPath) + dir, _ := entry.FullPath.DirAndName() + id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath))) + esEntry := &ESEntry{ + ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))), + Entry: entry, + } + value, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) + return fmt.Errorf("insert entry %v.", err) + } + _, err = store.client.Index(). + Index(index). + Type(indexType). + Id(id). + BodyJson(string(value)). + Do(context.Background()) + if err != nil { + glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) + return fmt.Errorf("insert entry %v.", err) + } + return nil +} +func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} +func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { + index := getIndex(fullpath) + id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + searchResult, err := store.client.Get(). + Index(index). + Type(indexType). + Id(id). + Do(context.Background()) + if elastic.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESEntry{ + ParentId: "", + Entry: &filer2.Entry{}, + } + err := jsoniter.Unmarshal(searchResult.Source, esEntry) + return esEntry.Entry, err + } + glog.Errorf("find entry(%s),%v.", string(fullpath), err) + return nil, filer_pb.ErrNotFound +} +func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + index := getIndex(fullpath) + id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + if strings.Count(string(fullpath), "/") == 1 { + return store.deleteIndex(index) + } + return store.deleteEntry(index, id) +} +func (store *ElasticStore) deleteIndex(index string) (err error) { + deleteResult, err := store.client.DeleteIndex(index).Do(context.Background()) + if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { + return nil + } + glog.Errorf("delete index(%s) %v.", index, err) + return err +} +func (store *ElasticStore) deleteEntry(index, id string) (err error) { + deleteResult, err := store.client.Delete(). + Index(index). + Type(indexType). + Id(id). + Do(context.Background()) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) + return fmt.Errorf("delete entry %v.", err) +} +func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { + for _, entry := range entries { + store.DeleteEntry(ctx, entry.FullPath) + } + } + return nil +} + +func (store *ElasticStore) ListDirectoryEntries( + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, +) (entries []*filer2.Entry, err error) { + if string(fullpath) == "/" { + return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit) + } + return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) +} + +func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { + indexResult, err := store.client.CatIndices().Do(context.Background()) + if err != nil { + glog.Errorf("list indices %v.", err) + return entries, err + } + for _, index := range indexResult { + if strings.HasPrefix(index.Index, indexPrefix) { + if entry, err := store.FindEntry(ctx, + weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil { + fileName := getFileName(entry.FullPath) + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entries = append(entries, entry) + } + } + } + return entries, nil +} + +func (store *ElasticStore) listDirectoryEntries( + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, +) (entries []*filer2.Entry, err error) { + first := true + index := getIndex(fullpath) + nextStart := "" + parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) + if _, err := store.client.Refresh(index).Do(context.Background()); err != nil { + if elastic.IsNotFound(err) { + store.client.CreateIndex(index).Do(context.Background()) + return entries, nil + } + } + for { + result := &elastic.SearchResult{} + if (startFileName == "" && first) || inclusive { + if result, err = store.search(index, parentId); err != nil { + glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) + return entries, err + } + } else { + fullPath := string(fullpath) + "/" + startFileName + if !first { + fullPath = nextStart + } + after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath))) + if result, err = store.searchAfter(index, parentId, after); err != nil { + glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) + return entries, err + } + } + first = false + for _, hit := range result.Hits.Hits { + esEntry := &ESEntry{ + ParentId: "", + Entry: &filer2.Entry{}, + } + if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { + limit-- + if limit < 0 { + return entries, nil + } + nextStart = string(esEntry.Entry.FullPath) + fileName := getFileName(esEntry.Entry.FullPath) + if fileName == startFileName && !inclusive { + continue + } + entries = append(entries, esEntry.Entry) + } + } + if len(result.Hits.Hits) < store.maxPageSize { + break + } + } + return entries, nil +} + +func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) { + if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 { + return &elastic.SearchResult{ + Hits: &elastic.SearchHits{ + Hits: make([]*elastic.SearchHit, 0)}, + }, nil + } + queryResult, err := store.client.Search(). + Index(index). + Query(elastic.NewMatchQuery("ParentId", parentId)). + Size(store.maxPageSize). + Sort("_id", false). + Do(context.Background()) + return queryResult, err +} + +func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) { + queryResult, err := store.client.Search(). + Index(index). + Query(elastic.NewMatchQuery("ParentId", parentId)). + SearchAfter(after). + Size(store.maxPageSize). + Sort("_id", false). + Do(context.Background()) + return queryResult, err + +} + +func (store *ElasticStore) Shutdown() { + store.client.Stop() +} + +func getIndex(fullpath weed_util.FullPath) string { + path := strings.Split(string(fullpath), "/") + if len(path) > 1 { + return indexPrefix + path[1] + } + return "" +} + +func getFileName(fullpath weed_util.FullPath) string { + path := strings.Split(string(fullpath), "/") + if len(path) > 1 { + return path[len(path)-1] + } + return "" +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 160ea5a6d..167a822b2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,6 +28,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer2/elastic/v7" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" From 798280e98003fac50faf057057b2568268a9d566 Mon Sep 17 00:00:00 2001 From: "ruitao.liu" Date: Thu, 3 Sep 2020 17:05:26 +0800 Subject: [PATCH 2/2] change filer2 to filer. --- go.mod | 4 +-- weed/filer/elastic/v7/elastic_store.go | 35 ++++++++++++++++++-------- weed/server/filer_server.go | 2 +- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index d2dad60cd..69092fd88 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/jcmturner/gofork v1.0.0 // indirect + github.com/json-iterator/go v1.1.10 github.com/karlseguin/ccache v2.0.3+incompatible github.com/karlseguin/expect v1.0.1 // indirect github.com/klauspost/compress v1.10.9 @@ -48,6 +49,7 @@ require ( github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect github.com/mattn/go-runewidth v0.0.4 // indirect github.com/nats-io/nats-server/v2 v2.0.4 // indirect + github.com/olivere/elastic/v7 v7.0.19 github.com/onsi/ginkgo v1.10.1 // indirect github.com/onsi/gomega v1.7.0 // indirect github.com/peterh/liner v1.1.0 @@ -87,8 +89,6 @@ require ( gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect - github.com/json-iterator/go v1.1.10 - github.com/olivere/elastic/v7 v7.0.19 ) replace go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547 diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 190ec4897..e75f55239 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -7,7 +7,7 @@ import ( "math" "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" weed_util "github.com/chrislusf/seaweedfs/weed/util" @@ -22,11 +22,11 @@ var ( type ESEntry struct { ParentId string `json:"ParentId"` - Entry *filer2.Entry + Entry *filer.Entry } func init() { - filer2.Stores = append(filer2.Stores, &ElasticStore{}) + filer.Stores = append(filer.Stores, &ElasticStore{}) } type ElasticStore struct { @@ -66,7 +66,20 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { +func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { + return filer.ErrKvNotImplemented +} +func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return []byte(""), filer.ErrKvNotImplemented +} +func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return filer.ErrKvNotImplemented +} +func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { index := getIndex(entry.FullPath) dir, _ := entry.FullPath.DirAndName() id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath))) @@ -91,10 +104,10 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer2.Entry) } return nil } -func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { +func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { return store.InsertEntry(ctx, entry) } -func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { +func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) searchResult, err := store.client.Get(). @@ -108,7 +121,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful if searchResult != nil && searchResult.Found { esEntry := &ESEntry{ ParentId: "", - Entry: &filer2.Entry{}, + Entry: &filer.Entry{}, } err := jsoniter.Unmarshal(searchResult.Source, esEntry) return esEntry.Entry, err @@ -157,14 +170,14 @@ func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath we func (store *ElasticStore) ListDirectoryEntries( ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, -) (entries []*filer2.Entry, err error) { +) (entries []*filer.Entry, err error) { if string(fullpath) == "/" { return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit) } return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) } -func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { indexResult, err := store.client.CatIndices().Do(context.Background()) if err != nil { glog.Errorf("list indices %v.", err) @@ -191,7 +204,7 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi func (store *ElasticStore) listDirectoryEntries( ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, -) (entries []*filer2.Entry, err error) { +) (entries []*filer.Entry, err error) { first := true index := getIndex(fullpath) nextStart := "" @@ -224,7 +237,7 @@ func (store *ElasticStore) listDirectoryEntries( for _, hit := range result.Hits.Hits { esEntry := &ESEntry{ ParentId: "", - Entry: &filer2.Entry{}, + Entry: &filer.Entry{}, } if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { limit-- diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 167a822b2..9661d8759 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -20,6 +20,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" @@ -28,7 +29,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/elastic/v7" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"