You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

365 lines
11 KiB

  1. package elastic
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "math"
  7. "strings"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  12. jsoniter "github.com/json-iterator/go"
  13. elastic "github.com/olivere/elastic/v7"
  14. )
  15. var (
  16. indexType = "_doc"
  17. indexPrefix = ".seaweedfs_"
  18. indexKV = ".seaweedfs_kv_entries"
  19. )
  20. type ESEntry struct {
  21. ParentId string `json:"ParentId"`
  22. Entry *filer.Entry
  23. }
  24. func init() {
  25. filer.Stores = append(filer.Stores, &ElasticStore{})
  26. }
  27. type ElasticStore struct {
  28. client *elastic.Client
  29. maxPageSize int
  30. }
  31. type ESKVEntry struct {
  32. Key string `json:Key`
  33. Value string `json:Value`
  34. }
  35. func (store *ElasticStore) GetName() string {
  36. return "elastic7"
  37. }
  38. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  39. servers := configuration.GetString(prefix + "servers")
  40. if servers == "" {
  41. return fmt.Errorf("error elastic endpoints.")
  42. }
  43. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  44. if store.maxPageSize <= 0 {
  45. store.maxPageSize = 10000
  46. }
  47. glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
  48. store.client, err = elastic.NewClient(
  49. elastic.SetSniff(false),
  50. elastic.SetHealthcheck(false),
  51. elastic.SetURL(servers),
  52. )
  53. if err != nil {
  54. return fmt.Errorf("init elastic %s: %v.", servers, err)
  55. }
  56. return nil
  57. }
  58. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  59. return ctx, nil
  60. }
  61. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  62. return nil
  63. }
  64. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  65. return nil
  66. }
  67. func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
  68. id := fmt.Sprintf("%x", md5.Sum(key))
  69. deleteResult, err := store.client.Delete().
  70. Index(indexKV).
  71. Type(indexType).
  72. Id(id).
  73. Do(context.Background())
  74. if err == nil {
  75. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  76. return nil
  77. }
  78. }
  79. glog.Errorf("delete key(id:%s) %v.", string(key), err)
  80. return fmt.Errorf("delete key %v.", err)
  81. }
  82. func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  83. id := fmt.Sprintf("%x", md5.Sum(key))
  84. searchResult, err := store.client.Get().
  85. Index(indexKV).
  86. Type(indexType).
  87. Id(id).
  88. Do(context.Background())
  89. if elastic.IsNotFound(err) {
  90. return nil, filer_pb.ErrNotFound
  91. }
  92. if searchResult != nil && searchResult.Found {
  93. esEntry := &ESKVEntry{}
  94. if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
  95. return []byte(esEntry.Value), nil
  96. }
  97. }
  98. glog.Errorf("find key(%s),%v.", string(key), err)
  99. return nil, filer_pb.ErrNotFound
  100. }
  101. func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  102. id := fmt.Sprintf("%x", md5.Sum(key))
  103. esEntry := &ESKVEntry{
  104. string(key),
  105. string(value),
  106. }
  107. val, err := jsoniter.Marshal(esEntry)
  108. if err != nil {
  109. glog.Errorf("insert key(%s) %v.", string(key), err)
  110. return fmt.Errorf("insert key %v.", err)
  111. }
  112. _, err = store.client.Index().
  113. Index(indexKV).
  114. Type(indexType).
  115. Id(id).
  116. BodyJson(string(val)).
  117. Do(context.Background())
  118. if err != nil {
  119. return fmt.Errorf("kv put: %v", err)
  120. }
  121. return nil
  122. }
  123. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  124. return nil, filer.ErrUnsupportedListDirectoryPrefixed
  125. }
  126. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  127. index := getIndex(entry.FullPath)
  128. dir, _ := entry.FullPath.DirAndName()
  129. id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath)))
  130. esEntry := &ESEntry{
  131. ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))),
  132. Entry: entry,
  133. }
  134. value, err := jsoniter.Marshal(esEntry)
  135. if err != nil {
  136. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  137. return fmt.Errorf("insert entry %v.", err)
  138. }
  139. _, err = store.client.Index().
  140. Index(index).
  141. Type(indexType).
  142. Id(id).
  143. BodyJson(string(value)).
  144. Do(context.Background())
  145. if err != nil {
  146. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  147. return fmt.Errorf("insert entry %v.", err)
  148. }
  149. return nil
  150. }
  151. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  152. return store.InsertEntry(ctx, entry)
  153. }
  154. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  155. index := getIndex(fullpath)
  156. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  157. searchResult, err := store.client.Get().
  158. Index(index).
  159. Type(indexType).
  160. Id(id).
  161. Do(context.Background())
  162. if elastic.IsNotFound(err) {
  163. return nil, filer_pb.ErrNotFound
  164. }
  165. if searchResult != nil && searchResult.Found {
  166. esEntry := &ESEntry{
  167. ParentId: "",
  168. Entry: &filer.Entry{},
  169. }
  170. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  171. return esEntry.Entry, err
  172. }
  173. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  174. return nil, filer_pb.ErrNotFound
  175. }
  176. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  177. index := getIndex(fullpath)
  178. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  179. if strings.Count(string(fullpath), "/") == 1 {
  180. return store.deleteIndex(index)
  181. }
  182. return store.deleteEntry(index, id)
  183. }
  184. func (store *ElasticStore) deleteIndex(index string) (err error) {
  185. deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
  186. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  187. return nil
  188. }
  189. glog.Errorf("delete index(%s) %v.", index, err)
  190. return err
  191. }
  192. func (store *ElasticStore) deleteEntry(index, id string) (err error) {
  193. deleteResult, err := store.client.Delete().
  194. Index(index).
  195. Type(indexType).
  196. Id(id).
  197. Do(context.Background())
  198. if err == nil {
  199. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  200. return nil
  201. }
  202. }
  203. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  204. return fmt.Errorf("delete entry %v.", err)
  205. }
  206. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  207. if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
  208. for _, entry := range entries {
  209. store.DeleteEntry(ctx, entry.FullPath)
  210. }
  211. }
  212. return nil
  213. }
  214. func (store *ElasticStore) ListDirectoryEntries(
  215. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  216. ) (entries []*filer.Entry, err error) {
  217. if string(fullpath) == "/" {
  218. return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
  219. }
  220. return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
  221. }
  222. func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
  223. indexResult, err := store.client.CatIndices().Do(context.Background())
  224. if err != nil {
  225. glog.Errorf("list indices %v.", err)
  226. return entries, err
  227. }
  228. for _, index := range indexResult {
  229. if strings.HasPrefix(index.Index, indexPrefix) {
  230. if entry, err := store.FindEntry(ctx,
  231. weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
  232. fileName := getFileName(entry.FullPath)
  233. if fileName == startFileName && !inclusive {
  234. continue
  235. }
  236. limit--
  237. if limit < 0 {
  238. break
  239. }
  240. entries = append(entries, entry)
  241. }
  242. }
  243. }
  244. return entries, nil
  245. }
  246. func (store *ElasticStore) listDirectoryEntries(
  247. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  248. ) (entries []*filer.Entry, err error) {
  249. first := true
  250. index := getIndex(fullpath)
  251. nextStart := ""
  252. parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  253. if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
  254. if elastic.IsNotFound(err) {
  255. store.client.CreateIndex(index).Do(context.Background())
  256. return entries, nil
  257. }
  258. }
  259. for {
  260. result := &elastic.SearchResult{}
  261. if (startFileName == "" && first) || inclusive {
  262. if result, err = store.search(index, parentId); err != nil {
  263. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  264. return entries, err
  265. }
  266. } else {
  267. fullPath := string(fullpath) + "/" + startFileName
  268. if !first {
  269. fullPath = nextStart
  270. }
  271. after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
  272. if result, err = store.searchAfter(index, parentId, after); err != nil {
  273. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  274. return entries, err
  275. }
  276. }
  277. first = false
  278. for _, hit := range result.Hits.Hits {
  279. esEntry := &ESEntry{
  280. ParentId: "",
  281. Entry: &filer.Entry{},
  282. }
  283. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  284. limit--
  285. if limit < 0 {
  286. return entries, nil
  287. }
  288. nextStart = string(esEntry.Entry.FullPath)
  289. fileName := getFileName(esEntry.Entry.FullPath)
  290. if fileName == startFileName && !inclusive {
  291. continue
  292. }
  293. entries = append(entries, esEntry.Entry)
  294. }
  295. }
  296. if len(result.Hits.Hits) < store.maxPageSize {
  297. break
  298. }
  299. }
  300. return entries, nil
  301. }
  302. func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
  303. if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
  304. return &elastic.SearchResult{
  305. Hits: &elastic.SearchHits{
  306. Hits: make([]*elastic.SearchHit, 0)},
  307. }, nil
  308. }
  309. queryResult, err := store.client.Search().
  310. Index(index).
  311. Query(elastic.NewMatchQuery("ParentId", parentId)).
  312. Size(store.maxPageSize).
  313. Sort("_id", false).
  314. Do(context.Background())
  315. return queryResult, err
  316. }
  317. func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
  318. queryResult, err := store.client.Search().
  319. Index(index).
  320. Query(elastic.NewMatchQuery("ParentId", parentId)).
  321. SearchAfter(after).
  322. Size(store.maxPageSize).
  323. Sort("_id", false).
  324. Do(context.Background())
  325. return queryResult, err
  326. }
  327. func (store *ElasticStore) Shutdown() {
  328. store.client.Stop()
  329. }
  330. func getIndex(fullpath weed_util.FullPath) string {
  331. path := strings.Split(string(fullpath), "/")
  332. if len(path) > 1 {
  333. return indexPrefix + path[1]
  334. }
  335. return ""
  336. }
  337. func getFileName(fullpath weed_util.FullPath) string {
  338. path := strings.Split(string(fullpath), "/")
  339. if len(path) > 1 {
  340. return path[len(path)-1]
  341. }
  342. return ""
  343. }