diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 68fe8e982..7ced118ca 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -176,7 +176,11 @@ database = "seaweedfs" [elastic7] enabled = false -servers = "http://localhost:9200" +servers = "http://localhost1:9200,http://localhost2:9200,http://localhost3:9200" +username = "" +password = "" +sniff_enabled = false +healthcheck_enabled = false # increase the value is recommend, be sure the value in Elastic is greater or equal here index.max_result_window = 10000 ` diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index 29e9689f4..f720fdea0 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -47,23 +47,12 @@ type ElasticStore struct { func (store *ElasticStore) GetName() string { return "elastic7" } + func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { - servers := configuration.GetString(prefix + "servers") - if servers == "" { - return fmt.Errorf("error elastic endpoints.") - } - store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") - if store.maxPageSize <= 0 { - store.maxPageSize = 10000 - } - glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize) - store.client, err = elastic.NewClient( - elastic.SetSniff(false), - elastic.SetHealthcheck(false), - elastic.SetURL(servers), - ) + options := store.initialize(configuration, prefix) + store.client, err = elastic.NewClient(options...) if err != nil { - return fmt.Errorf("init elastic %s: %v.", servers, err) + return fmt.Errorf("init elastic %v.", err) } if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok { _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background()) @@ -73,6 +62,30 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre } return nil } + +func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) { + configuration.SetDefault(prefix+"servers", "http://localhost:9200") + servers := configuration.GetString(prefix + "servers") + url := make([]string, 0) + for _, v := range strings.Split(servers, ",") { + url = append(url, v) + } + options = append(options, elastic.SetURL(url...)) + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + if username != "" && password != "" { + options = append(options, elastic.SetBasicAuth(username, password)) + } + options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled"))) + options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled"))) + store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") + if store.maxPageSize <= 0 { + store.maxPageSize = 10000 + } + glog.Infof("filer store elastic endpoints: %s.", servers) + return options +} + func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } @@ -82,6 +95,7 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error { func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } + func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { return nil, filer.ErrUnsupportedListDirectoryPrefixed } @@ -111,9 +125,11 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) } return nil } + func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { return store.InsertEntry(ctx, entry) } + func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) @@ -136,6 +152,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful glog.Errorf("find entry(%s),%v.", string(fullpath), err) return nil, filer_pb.ErrNotFound } + func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { index := getIndex(fullpath) id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath))) @@ -144,6 +161,7 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F } return store.deleteEntry(ctx, index, id) } + func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { deleteResult, err := store.client.DeleteIndex(index).Do(ctx) if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { @@ -152,6 +170,7 @@ func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err e glog.Errorf("delete index(%s) %v.", index, err) return err } + func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { deleteResult, err := store.client.Delete(). Index(index). @@ -166,6 +185,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) return fmt.Errorf("delete entry %v.", err) } + func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { for _, entry := range entries {