You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
9.3 KiB

  1. package elastic
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "fmt"
  6. "math"
  7. "strings"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  12. jsoniter "github.com/json-iterator/go"
  13. elastic "github.com/olivere/elastic/v7"
  14. )
  15. var (
  16. indexType = "_doc"
  17. indexPrefix = ".seaweedfs_"
  18. )
  19. type ESEntry struct {
  20. ParentId string `json:"ParentId"`
  21. Entry *filer.Entry
  22. }
  23. func init() {
  24. filer.Stores = append(filer.Stores, &ElasticStore{})
  25. }
  26. type ElasticStore struct {
  27. client *elastic.Client
  28. maxPageSize int
  29. }
  30. func (store *ElasticStore) GetName() string {
  31. return "elastic7"
  32. }
  33. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  34. servers := configuration.GetString(prefix + "servers")
  35. if servers == "" {
  36. return fmt.Errorf("error elastic endpoints.")
  37. }
  38. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  39. if store.maxPageSize <= 0 {
  40. return fmt.Errorf("error elastic index.max_result_window.")
  41. }
  42. glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
  43. store.client, err = elastic.NewClient(
  44. elastic.SetSniff(false),
  45. elastic.SetHealthcheck(false),
  46. elastic.SetURL(servers),
  47. )
  48. if err != nil {
  49. return fmt.Errorf("init elastic %s: %v.", servers, err)
  50. }
  51. return nil
  52. }
  53. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  54. return ctx, nil
  55. }
  56. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  57. return nil
  58. }
  59. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  60. return nil
  61. }
  62. func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
  63. return filer.ErrKvNotImplemented
  64. }
  65. func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  66. return []byte(""), filer.ErrKvNotImplemented
  67. }
  68. func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  69. return filer.ErrKvNotImplemented
  70. }
  71. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  72. return nil, filer.ErrUnsupportedListDirectoryPrefixed
  73. }
  74. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  75. index := getIndex(entry.FullPath)
  76. dir, _ := entry.FullPath.DirAndName()
  77. id := fmt.Sprintf("%x", md5.Sum([]byte(entry.FullPath)))
  78. esEntry := &ESEntry{
  79. ParentId: fmt.Sprintf("%x", md5.Sum([]byte(dir))),
  80. Entry: entry,
  81. }
  82. value, err := jsoniter.Marshal(esEntry)
  83. if err != nil {
  84. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  85. return fmt.Errorf("insert entry %v.", err)
  86. }
  87. _, err = store.client.Index().
  88. Index(index).
  89. Type(indexType).
  90. Id(id).
  91. BodyJson(string(value)).
  92. Do(context.Background())
  93. if err != nil {
  94. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  95. return fmt.Errorf("insert entry %v.", err)
  96. }
  97. return nil
  98. }
  99. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  100. return store.InsertEntry(ctx, entry)
  101. }
  102. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  103. index := getIndex(fullpath)
  104. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  105. searchResult, err := store.client.Get().
  106. Index(index).
  107. Type(indexType).
  108. Id(id).
  109. Do(context.Background())
  110. if elastic.IsNotFound(err) {
  111. return nil, filer_pb.ErrNotFound
  112. }
  113. if searchResult != nil && searchResult.Found {
  114. esEntry := &ESEntry{
  115. ParentId: "",
  116. Entry: &filer.Entry{},
  117. }
  118. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  119. return esEntry.Entry, err
  120. }
  121. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  122. return nil, filer_pb.ErrNotFound
  123. }
  124. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  125. index := getIndex(fullpath)
  126. id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  127. if strings.Count(string(fullpath), "/") == 1 {
  128. return store.deleteIndex(index)
  129. }
  130. return store.deleteEntry(index, id)
  131. }
  132. func (store *ElasticStore) deleteIndex(index string) (err error) {
  133. deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
  134. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  135. return nil
  136. }
  137. glog.Errorf("delete index(%s) %v.", index, err)
  138. return err
  139. }
  140. func (store *ElasticStore) deleteEntry(index, id string) (err error) {
  141. deleteResult, err := store.client.Delete().
  142. Index(index).
  143. Type(indexType).
  144. Id(id).
  145. Do(context.Background())
  146. if err == nil {
  147. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  148. return nil
  149. }
  150. }
  151. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  152. return fmt.Errorf("delete entry %v.", err)
  153. }
  154. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  155. if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
  156. for _, entry := range entries {
  157. store.DeleteEntry(ctx, entry.FullPath)
  158. }
  159. }
  160. return nil
  161. }
  162. func (store *ElasticStore) ListDirectoryEntries(
  163. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  164. ) (entries []*filer.Entry, err error) {
  165. if string(fullpath) == "/" {
  166. return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
  167. }
  168. return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
  169. }
  170. func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
  171. indexResult, err := store.client.CatIndices().Do(context.Background())
  172. if err != nil {
  173. glog.Errorf("list indices %v.", err)
  174. return entries, err
  175. }
  176. for _, index := range indexResult {
  177. if strings.HasPrefix(index.Index, indexPrefix) {
  178. if entry, err := store.FindEntry(ctx,
  179. weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
  180. fileName := getFileName(entry.FullPath)
  181. if fileName == startFileName && !inclusive {
  182. continue
  183. }
  184. limit--
  185. if limit < 0 {
  186. break
  187. }
  188. entries = append(entries, entry)
  189. }
  190. }
  191. }
  192. return entries, nil
  193. }
  194. func (store *ElasticStore) listDirectoryEntries(
  195. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  196. ) (entries []*filer.Entry, err error) {
  197. first := true
  198. index := getIndex(fullpath)
  199. nextStart := ""
  200. parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
  201. if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
  202. if elastic.IsNotFound(err) {
  203. store.client.CreateIndex(index).Do(context.Background())
  204. return entries, nil
  205. }
  206. }
  207. for {
  208. result := &elastic.SearchResult{}
  209. if (startFileName == "" && first) || inclusive {
  210. if result, err = store.search(index, parentId); err != nil {
  211. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  212. return entries, err
  213. }
  214. } else {
  215. fullPath := string(fullpath) + "/" + startFileName
  216. if !first {
  217. fullPath = nextStart
  218. }
  219. after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
  220. if result, err = store.searchAfter(index, parentId, after); err != nil {
  221. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  222. return entries, err
  223. }
  224. }
  225. first = false
  226. for _, hit := range result.Hits.Hits {
  227. esEntry := &ESEntry{
  228. ParentId: "",
  229. Entry: &filer.Entry{},
  230. }
  231. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  232. limit--
  233. if limit < 0 {
  234. return entries, nil
  235. }
  236. nextStart = string(esEntry.Entry.FullPath)
  237. fileName := getFileName(esEntry.Entry.FullPath)
  238. if fileName == startFileName && !inclusive {
  239. continue
  240. }
  241. entries = append(entries, esEntry.Entry)
  242. }
  243. }
  244. if len(result.Hits.Hits) < store.maxPageSize {
  245. break
  246. }
  247. }
  248. return entries, nil
  249. }
  250. func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
  251. if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
  252. return &elastic.SearchResult{
  253. Hits: &elastic.SearchHits{
  254. Hits: make([]*elastic.SearchHit, 0)},
  255. }, nil
  256. }
  257. queryResult, err := store.client.Search().
  258. Index(index).
  259. Query(elastic.NewMatchQuery("ParentId", parentId)).
  260. Size(store.maxPageSize).
  261. Sort("_id", false).
  262. Do(context.Background())
  263. return queryResult, err
  264. }
  265. func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
  266. queryResult, err := store.client.Search().
  267. Index(index).
  268. Query(elastic.NewMatchQuery("ParentId", parentId)).
  269. SearchAfter(after).
  270. Size(store.maxPageSize).
  271. Sort("_id", false).
  272. Do(context.Background())
  273. return queryResult, err
  274. }
  275. func (store *ElasticStore) Shutdown() {
  276. store.client.Stop()
  277. }
  278. func getIndex(fullpath weed_util.FullPath) string {
  279. path := strings.Split(string(fullpath), "/")
  280. if len(path) > 1 {
  281. return indexPrefix + path[1]
  282. }
  283. return ""
  284. }
  285. func getFileName(fullpath weed_util.FullPath) string {
  286. path := strings.Split(string(fullpath), "/")
  287. if len(path) > 1 {
  288. return path[len(path)-1]
  289. }
  290. return ""
  291. }