You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
9.8 KiB

  1. package elastic
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  11. jsoniter "github.com/json-iterator/go"
  12. elastic "github.com/olivere/elastic/v7"
  13. )
  14. var (
  15. indexType = "_doc"
  16. indexPrefix = ".seaweedfs_"
  17. indexKV = ".seaweedfs_kv_entries"
  18. mappingWithoutQuery = ` {
  19. "mappings": {
  20. "enabled": false,
  21. "properties": {
  22. "Value":{
  23. "type": "binary"
  24. }
  25. }
  26. }
  27. }`
  28. )
  29. type ESEntry struct {
  30. ParentId string `json:"ParentId"`
  31. Entry *filer.Entry
  32. }
  33. type ESKVEntry struct {
  34. Value []byte `json:"Value"`
  35. }
  36. func init() {
  37. filer.Stores = append(filer.Stores, &ElasticStore{})
  38. }
  39. type ElasticStore struct {
  40. client *elastic.Client
  41. maxPageSize int
  42. }
  43. func (store *ElasticStore) GetName() string {
  44. return "elastic7"
  45. }
  46. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  47. options := store.initialize(configuration, prefix)
  48. store.client, err = elastic.NewClient(options...)
  49. if err != nil {
  50. return fmt.Errorf("init elastic %v.", err)
  51. }
  52. if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
  53. _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
  54. if err != nil {
  55. return fmt.Errorf("create index(%s) %v.", indexKV, err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) {
  61. servers := configuration.GetStringSlice(prefix + "servers")
  62. options = append(options, elastic.SetURL(servers...))
  63. username := configuration.GetString(prefix + "username")
  64. password := configuration.GetString(prefix + "password")
  65. if username != "" && password != "" {
  66. options = append(options, elastic.SetBasicAuth(username, password))
  67. }
  68. options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
  69. options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
  70. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  71. if store.maxPageSize <= 0 {
  72. store.maxPageSize = 10000
  73. }
  74. glog.Infof("filer store elastic endpoints: %v.", servers)
  75. return options
  76. }
  77. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  78. return ctx, nil
  79. }
  80. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  81. return nil
  82. }
  83. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  84. return nil
  85. }
  86. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  87. return nil, filer.ErrUnsupportedListDirectoryPrefixed
  88. }
  89. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  90. index := getIndex(entry.FullPath)
  91. dir, _ := entry.FullPath.DirAndName()
  92. id := weed_util.Md5String([]byte(entry.FullPath))
  93. esEntry := &ESEntry{
  94. ParentId: weed_util.Md5String([]byte(dir)),
  95. Entry: entry,
  96. }
  97. value, err := jsoniter.Marshal(esEntry)
  98. if err != nil {
  99. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  100. return fmt.Errorf("insert entry %v.", err)
  101. }
  102. _, err = store.client.Index().
  103. Index(index).
  104. Type(indexType).
  105. Id(id).
  106. BodyJson(string(value)).
  107. Do(ctx)
  108. if err != nil {
  109. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  110. return fmt.Errorf("insert entry %v.", err)
  111. }
  112. return nil
  113. }
  114. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  115. return store.InsertEntry(ctx, entry)
  116. }
  117. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  118. index := getIndex(fullpath)
  119. id := weed_util.Md5String([]byte(fullpath))
  120. searchResult, err := store.client.Get().
  121. Index(index).
  122. Type(indexType).
  123. Id(id).
  124. Do(ctx)
  125. if elastic.IsNotFound(err) {
  126. return nil, filer_pb.ErrNotFound
  127. }
  128. if searchResult != nil && searchResult.Found {
  129. esEntry := &ESEntry{
  130. ParentId: "",
  131. Entry: &filer.Entry{},
  132. }
  133. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  134. return esEntry.Entry, err
  135. }
  136. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  137. return nil, filer_pb.ErrNotFound
  138. }
  139. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  140. index := getIndex(fullpath)
  141. id := weed_util.Md5String([]byte(fullpath))
  142. if strings.Count(string(fullpath), "/") == 1 {
  143. return store.deleteIndex(ctx, index)
  144. }
  145. return store.deleteEntry(ctx, index, id)
  146. }
  147. func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
  148. deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
  149. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  150. return nil
  151. }
  152. glog.Errorf("delete index(%s) %v.", index, err)
  153. return err
  154. }
  155. func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
  156. deleteResult, err := store.client.Delete().
  157. Index(index).
  158. Type(indexType).
  159. Id(id).
  160. Do(ctx)
  161. if err == nil {
  162. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  163. return nil
  164. }
  165. }
  166. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  167. return fmt.Errorf("delete entry %v.", err)
  168. }
  169. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  170. if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
  171. for _, entry := range entries {
  172. store.DeleteEntry(ctx, entry.FullPath)
  173. }
  174. }
  175. return nil
  176. }
  177. func (store *ElasticStore) ListDirectoryEntries(
  178. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  179. ) (entries []*filer.Entry, err error) {
  180. if string(fullpath) == "/" {
  181. return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
  182. }
  183. return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
  184. }
  185. func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
  186. indexResult, err := store.client.CatIndices().Do(ctx)
  187. if err != nil {
  188. glog.Errorf("list indices %v.", err)
  189. return entries, err
  190. }
  191. for _, index := range indexResult {
  192. if index.Index == indexKV {
  193. continue
  194. }
  195. if strings.HasPrefix(index.Index, indexPrefix) {
  196. if entry, err := store.FindEntry(ctx,
  197. weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
  198. fileName := getFileName(entry.FullPath)
  199. if fileName == startFileName && !inclusive {
  200. continue
  201. }
  202. limit--
  203. if limit < 0 {
  204. break
  205. }
  206. entries = append(entries, entry)
  207. }
  208. }
  209. }
  210. return entries, nil
  211. }
  212. func (store *ElasticStore) listDirectoryEntries(
  213. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
  214. ) (entries []*filer.Entry, err error) {
  215. first := true
  216. index := getIndex(fullpath)
  217. nextStart := ""
  218. parentId := weed_util.Md5String([]byte(fullpath))
  219. if _, err := store.client.Refresh(index).Do(ctx); err != nil {
  220. if elastic.IsNotFound(err) {
  221. store.client.CreateIndex(index).Do(ctx)
  222. return entries, nil
  223. }
  224. }
  225. for {
  226. result := &elastic.SearchResult{}
  227. if (startFileName == "" && first) || inclusive {
  228. if result, err = store.search(ctx, index, parentId); err != nil {
  229. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  230. return entries, err
  231. }
  232. } else {
  233. fullPath := string(fullpath) + "/" + startFileName
  234. if !first {
  235. fullPath = nextStart
  236. }
  237. after := weed_util.Md5String([]byte(fullPath))
  238. if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
  239. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  240. return entries, err
  241. }
  242. }
  243. first = false
  244. for _, hit := range result.Hits.Hits {
  245. esEntry := &ESEntry{
  246. ParentId: "",
  247. Entry: &filer.Entry{},
  248. }
  249. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  250. limit--
  251. if limit < 0 {
  252. return entries, nil
  253. }
  254. nextStart = string(esEntry.Entry.FullPath)
  255. fileName := getFileName(esEntry.Entry.FullPath)
  256. if fileName == startFileName && !inclusive {
  257. continue
  258. }
  259. entries = append(entries, esEntry.Entry)
  260. }
  261. }
  262. if len(result.Hits.Hits) < store.maxPageSize {
  263. break
  264. }
  265. }
  266. return entries, nil
  267. }
  268. func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
  269. if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
  270. return &elastic.SearchResult{
  271. Hits: &elastic.SearchHits{
  272. Hits: make([]*elastic.SearchHit, 0)},
  273. }, nil
  274. }
  275. queryResult, err := store.client.Search().
  276. Index(index).
  277. Query(elastic.NewMatchQuery("ParentId", parentId)).
  278. Size(store.maxPageSize).
  279. Sort("_id", false).
  280. Do(ctx)
  281. return queryResult, err
  282. }
  283. func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
  284. queryResult, err := store.client.Search().
  285. Index(index).
  286. Query(elastic.NewMatchQuery("ParentId", parentId)).
  287. SearchAfter(after).
  288. Size(store.maxPageSize).
  289. Sort("_id", false).
  290. Do(ctx)
  291. return queryResult, err
  292. }
  293. func (store *ElasticStore) Shutdown() {
  294. store.client.Stop()
  295. }
  296. func getIndex(fullpath weed_util.FullPath) string {
  297. path := strings.Split(string(fullpath), "/")
  298. if len(path) > 1 {
  299. return indexPrefix + path[1]
  300. }
  301. return ""
  302. }
  303. func getFileName(fullpath weed_util.FullPath) string {
  304. path := strings.Split(string(fullpath), "/")
  305. if len(path) > 1 {
  306. return path[len(path)-1]
  307. }
  308. return ""
  309. }