Browse Source

fier store: fix elastic search regression

fix https://github.com/chrislusf/seaweedfs/issues/1774
pull/1784/head
Chris Lu 4 years ago
parent
commit
c3af72d950
  1. 65
      weed/filer/elastic/v7/elastic_store.go

65
weed/filer/elastic/v7/elastic_store.go

@ -101,7 +101,7 @@ func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
} }
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
index := getIndex(entry.FullPath)
index := getIndex(entry.FullPath, false)
dir, _ := entry.FullPath.DirAndName() dir, _ := entry.FullPath.DirAndName()
id := weed_util.Md5String([]byte(entry.FullPath)) id := weed_util.Md5String([]byte(entry.FullPath))
esEntry := &ESEntry{ esEntry := &ESEntry{
@ -131,7 +131,7 @@ func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
} }
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
index := getIndex(fullpath)
index := getIndex(fullpath, false)
id := weed_util.Md5String([]byte(fullpath)) id := weed_util.Md5String([]byte(fullpath))
searchResult, err := store.client.Get(). searchResult, err := store.client.Get().
Index(index). Index(index).
@ -154,7 +154,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
} }
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
index := getIndex(fullpath)
index := getIndex(fullpath, false)
id := weed_util.Md5String([]byte(fullpath)) id := weed_util.Md5String([]byte(fullpath))
if strings.Count(string(fullpath), "/") == 1 { if strings.Count(string(fullpath), "/") == 1 {
return store.deleteIndex(ctx, index) return store.deleteIndex(ctx, index)
@ -198,47 +198,13 @@ func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath we
} }
func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if string(dirPath) == "/" {
return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
}
return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
} }
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
return
}
for _, index := range indexResult {
if index.Index == indexKV {
continue
}
if strings.HasPrefix(index.Index, indexPrefix) {
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
fileName := getFileName(entry.FullPath)
if fileName == startFileName && !inclusive {
continue
}
limit--
if limit < 0 {
break
}
if !eachEntryFunc(entry) {
break
}
lastFileName = fileName
}
}
}
return
}
func (store *ElasticStore) listDirectoryEntries( func (store *ElasticStore) listDirectoryEntries(
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true first := true
index := getIndex(fullpath)
index := getIndex(fullpath, true)
nextStart := "" nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath)) parentId := weed_util.Md5String([]byte(fullpath))
if _, err = store.client.Refresh(index).Do(ctx); err != nil { if _, err = store.client.Refresh(index).Do(ctx); err != nil {
@ -277,7 +243,7 @@ func (store *ElasticStore) listDirectoryEntries(
return lastFileName, nil return lastFileName, nil
} }
nextStart = string(esEntry.Entry.FullPath) nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath)
fileName := esEntry.Entry.FullPath.Name()
if fileName == startFileName && !inclusive { if fileName == startFileName && !inclusive {
continue continue
} }
@ -287,6 +253,9 @@ func (store *ElasticStore) listDirectoryEntries(
lastFileName = fileName lastFileName = fileName
} }
} }
if len(result.Hits.Hits) < store.maxPageSize {
break
}
} }
return return
} }
@ -323,18 +292,16 @@ func (store *ElasticStore) Shutdown() {
store.client.Stop() store.client.Stop()
} }
func getIndex(fullpath weed_util.FullPath) string {
func getIndex(fullpath weed_util.FullPath, isDirectory bool) string {
path := strings.Split(string(fullpath), "/") path := strings.Split(string(fullpath), "/")
if len(path) > 1 {
return indexPrefix + path[1]
if isDirectory && len(path) >= 2 {
return indexPrefix + strings.ToLower(path[1])
} }
return ""
}
func getFileName(fullpath weed_util.FullPath) string {
path := strings.Split(string(fullpath), "/")
if len(path) > 1 {
return path[len(path)-1]
if len(path) > 2 {
return indexPrefix + strings.ToLower(path[1])
}
if len(path) == 2 {
return indexPrefix
} }
return "" return ""
} }
Loading…
Cancel
Save