You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
9.3 KiB

  1. package elastic
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "math"
  7. "strings"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  12. jsoniter "github.com/json-iterator/go"
  13. elastic "github.com/olivere/elastic/v7"
  14. )
  15. var (
  16. indexType = "_doc"
  17. indexPrefix = ".seaweedfs_"
  18. indexKV = ".seaweedfs_kv_entries"
  19. mappingWithoutQuery = ` {
  20. "mappings": {
  21. "enabled": false
  22. }
  23. }`
  24. )
  25. type ESEntry struct {
  26. ParentId string `json:"ParentId"`
  27. Entry *filer.Entry
  28. }
  29. type ESKVEntry struct {
  30. Value string `json:Value`
  31. }
  32. func init() {
  33. filer.Stores = append(filer.Stores, &ElasticStore{})
  34. }
  35. type ElasticStore struct {
  36. client *elastic.Client
  37. maxPageSize int
  38. }
  39. func (store *ElasticStore) GetName() string {
  40. return "elastic7"
  41. }
  42. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  43. servers := configuration.GetString(prefix + "servers")
  44. if servers == "" {
  45. return fmt.Errorf("error elastic endpoints.")
  46. }
  47. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  48. if store.maxPageSize <= 0 {
  49. store.maxPageSize = 10000
  50. }
  51. glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
  52. store.client, err = elastic.NewClient(
  53. elastic.SetSniff(false),
  54. elastic.SetHealthcheck(false),
  55. elastic.SetURL(servers),
  56. )
  57. if err != nil {
  58. return fmt.Errorf("init elastic %s: %v.", servers, err)
  59. }
  60. if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
  61. _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
  62. if err != nil {
  63. return fmt.Errorf("create index(%s) %v.", indexKV, err)
  64. }
  65. }
  66. return nil
  67. }
  68. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  69. return ctx, nil
  70. }
  71. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  72. return nil
  73. }
  74. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  75. return nil
  76. }
  77. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  78. return nil, filer.ErrUnsupportedListDirectoryPrefixed
  79. }
  80. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  81. index := getIndex(entry.FullPath)
  82. dir, _ := entry.FullPath.DirAndName()
  83. id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath)))
  84. esEntry := &ESEntry{
  85. ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))),
  86. Entry: entry,
  87. }
  88. value, err := jsoniter.Marshal(esEntry)
  89. if err != nil {
  90. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  91. return fmt.Errorf("insert entry %v.", err)
  92. }
  93. _, err = store.client.Index().
  94. Index(index).
  95. Type(indexType).
  96. Id(id).
  97. BodyJson(string(value)).
  98. Do(ctx)
  99. if err != nil {
  100. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  101. return fmt.Errorf("insert entry %v.", err)
  102. }
  103. return nil
  104. }
  105. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  106. return store.InsertEntry(ctx, entry)
  107. }
  108. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  109. index := getIndex(fullpath)
  110. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  111. searchResult, err := store.client.Get().
  112. Index(index).
  113. Type(indexType).
  114. Id(id).
  115. Do(ctx)
  116. if elastic.IsNotFound(err) {
  117. return nil, filer_pb.ErrNotFound
  118. }
  119. if searchResult != nil && searchResult.Found {
  120. esEntry := &ESEntry{
  121. ParentId: "",
  122. Entry: &filer.Entry{},
  123. }
  124. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  125. return esEntry.Entry, err
  126. }
  127. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  128. return nil, filer_pb.ErrNotFound
  129. }
  130. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  131. index := getIndex(fullpath)
  132. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  133. if strings.Count(string(fullpath), "/") == 1 {
  134. return store.deleteIndex(ctx, index)
  135. }
  136. return store.deleteEntry(ctx, index, id)
  137. }
  138. func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
  139. deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
  140. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  141. return nil
  142. }
  143. glog.Errorf("delete index(%s) %v.", index, err)
  144. return err
  145. }
  146. func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
  147. deleteResult, err := store.client.Delete().
  148. Index(index).
  149. Type(indexType).
  150. Id(id).
  151. Do(ctx)
  152. if err == nil {
  153. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  154. return nil
  155. }
  156. }
  157. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  158. return fmt.Errorf("delete entry %v.", err)
  159. }
  160. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  161. if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
  162. for _, entry := range entries {
  163. store.DeleteEntry(ctx, entry.FullPath)
  164. }
  165. }
  166. return nil
  167. }
  168. func (store *ElasticStore) ListDirectoryEntries(
  169. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  170. ) (entries []*filer.Entry, err error) {
  171. if string(fullpath) == "/" {
  172. return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
  173. }
  174. return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
  175. }
  176. func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
  177. indexResult, err := store.client.CatIndices().Do(ctx)
  178. if err != nil {
  179. glog.Errorf("list indices %v.", err)
  180. return entries, err
  181. }
  182. for _, index := range indexResult {
  183. if index.Index == indexKV {
  184. continue
  185. }
  186. if strings.HasPrefix(index.Index, indexPrefix) {
  187. if entry, err := store.FindEntry(ctx,
  188. weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
  189. fileName := getFileName(entry.FullPath)
  190. if fileName == startFileName && !inclusive {
  191. continue
  192. }
  193. limit--
  194. if limit < 0 {
  195. break
  196. }
  197. entries = append(entries, entry)
  198. }
  199. }
  200. }
  201. return entries, nil
  202. }
  203. func (store *ElasticStore) listDirectoryEntries(
  204. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  205. ) (entries []*filer.Entry, err error) {
  206. first := true
  207. index := getIndex(fullpath)
  208. nextStart := ""
  209. parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  210. if _, err := store.client.Refresh(index).Do(ctx); err != nil {
  211. if elastic.IsNotFound(err) {
  212. store.client.CreateIndex(index).Do(ctx)
  213. return entries, nil
  214. }
  215. }
  216. for {
  217. result := &elastic.SearchResult{}
  218. if (startFileName == "" && first) || inclusive {
  219. if result, err = store.search(ctx, index, parentId); err != nil {
  220. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  221. return entries, err
  222. }
  223. } else {
  224. fullPath := string(fullpath) + "/" + startFileName
  225. if !first {
  226. fullPath = nextStart
  227. }
  228. after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
  229. if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
  230. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  231. return entries, err
  232. }
  233. }
  234. first = false
  235. for _, hit := range result.Hits.Hits {
  236. esEntry := &ESEntry{
  237. ParentId: "",
  238. Entry: &filer.Entry{},
  239. }
  240. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  241. limit--
  242. if limit < 0 {
  243. return entries, nil
  244. }
  245. nextStart = string(esEntry.Entry.FullPath)
  246. fileName := getFileName(esEntry.Entry.FullPath)
  247. if fileName == startFileName && !inclusive {
  248. continue
  249. }
  250. entries = append(entries, esEntry.Entry)
  251. }
  252. }
  253. if len(result.Hits.Hits) < store.maxPageSize {
  254. break
  255. }
  256. }
  257. return entries, nil
  258. }
  259. func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
  260. if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
  261. return &elastic.SearchResult{
  262. Hits: &elastic.SearchHits{
  263. Hits: make([]*elastic.SearchHit, 0)},
  264. }, nil
  265. }
  266. queryResult, err := store.client.Search().
  267. Index(index).
  268. Query(elastic.NewMatchQuery("ParentId", parentId)).
  269. Size(store.maxPageSize).
  270. Sort("_id", false).
  271. Do(ctx)
  272. return queryResult, err
  273. }
  274. func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
  275. queryResult, err := store.client.Search().
  276. Index(index).
  277. Query(elastic.NewMatchQuery("ParentId", parentId)).
  278. SearchAfter(after).
  279. Size(store.maxPageSize).
  280. Sort("_id", false).
  281. Do(ctx)
  282. return queryResult, err
  283. }
  284. func (store *ElasticStore) Shutdown() {
  285. store.client.Stop()
  286. }
  287. func getIndex(fullpath weed_util.FullPath) string {
  288. path := strings.Split(string(fullpath), "/")
  289. if len(path) > 1 {
  290. return indexPrefix + path[1]
  291. }
  292. return ""
  293. }
  294. func getFileName(fullpath weed_util.FullPath) string {
  295. path := strings.Split(string(fullpath), "/")
  296. if len(path) > 1 {
  297. return path[len(path)-1]
  298. }
  299. return ""
  300. }