You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
10 KiB

  1. package elastic
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  11. jsoniter "github.com/json-iterator/go"
  12. elastic "github.com/olivere/elastic/v7"
  13. )
  14. var (
  15. indexType = "_doc"
  16. indexPrefix = ".seaweedfs_"
  17. indexKV = ".seaweedfs_kv_entries"
  18. kvMappings = ` {
  19. "mappings": {
  20. "enabled": false,
  21. "properties": {
  22. "Value":{
  23. "type": "binary"
  24. }
  25. }
  26. }
  27. }`
  28. )
  29. type ESEntry struct {
  30. ParentId string `json:"ParentId"`
  31. Entry *filer.Entry
  32. }
  33. type ESKVEntry struct {
  34. Value []byte `json:"Value"`
  35. }
  36. func init() {
  37. filer.Stores = append(filer.Stores, &ElasticStore{})
  38. }
  39. type ElasticStore struct {
  40. client *elastic.Client
  41. maxPageSize int
  42. }
  43. func (store *ElasticStore) GetName() string {
  44. return "elastic7"
  45. }
  46. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  47. options := []elastic.ClientOptionFunc{}
  48. servers := configuration.GetStringSlice(prefix + "servers")
  49. options = append(options, elastic.SetURL(servers...))
  50. username := configuration.GetString(prefix + "username")
  51. password := configuration.GetString(prefix + "password")
  52. if username != "" && password != "" {
  53. options = append(options, elastic.SetBasicAuth(username, password))
  54. }
  55. options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
  56. options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
  57. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  58. if store.maxPageSize <= 0 {
  59. store.maxPageSize = 10000
  60. }
  61. glog.Infof("filer store elastic endpoints: %v.", servers)
  62. return store.initialize(options)
  63. }
  64. func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) {
  65. ctx := context.Background()
  66. store.client, err = elastic.NewClient(options...)
  67. if err != nil {
  68. return fmt.Errorf("init elastic %v.", err)
  69. }
  70. if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok {
  71. _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx)
  72. if err != nil {
  73. return fmt.Errorf("create index(%s) %v.", indexKV, err)
  74. }
  75. }
  76. return nil
  77. }
  78. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  79. return ctx, nil
  80. }
  81. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  82. return nil
  83. }
  84. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  85. return nil
  86. }
  87. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  88. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  89. }
  90. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  91. index := getIndex(entry.FullPath)
  92. dir, _ := entry.FullPath.DirAndName()
  93. id := weed_util.Md5String([]byte(entry.FullPath))
  94. esEntry := &ESEntry{
  95. ParentId: weed_util.Md5String([]byte(dir)),
  96. Entry: entry,
  97. }
  98. value, err := jsoniter.Marshal(esEntry)
  99. if err != nil {
  100. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  101. return fmt.Errorf("insert entry %v.", err)
  102. }
  103. _, err = store.client.Index().
  104. Index(index).
  105. Type(indexType).
  106. Id(id).
  107. BodyJson(string(value)).
  108. Do(ctx)
  109. if err != nil {
  110. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  111. return fmt.Errorf("insert entry %v.", err)
  112. }
  113. return nil
  114. }
  115. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  116. return store.InsertEntry(ctx, entry)
  117. }
  118. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  119. index := getIndex(fullpath)
  120. id := weed_util.Md5String([]byte(fullpath))
  121. searchResult, err := store.client.Get().
  122. Index(index).
  123. Type(indexType).
  124. Id(id).
  125. Do(ctx)
  126. if elastic.IsNotFound(err) {
  127. return nil, filer_pb.ErrNotFound
  128. }
  129. if searchResult != nil && searchResult.Found {
  130. esEntry := &ESEntry{
  131. ParentId: "",
  132. Entry: &filer.Entry{},
  133. }
  134. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  135. return esEntry.Entry, err
  136. }
  137. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  138. return nil, filer_pb.ErrNotFound
  139. }
  140. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  141. index := getIndex(fullpath)
  142. id := weed_util.Md5String([]byte(fullpath))
  143. if strings.Count(string(fullpath), "/") == 1 {
  144. return store.deleteIndex(ctx, index)
  145. }
  146. return store.deleteEntry(ctx, index, id)
  147. }
  148. func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
  149. deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
  150. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  151. return nil
  152. }
  153. glog.Errorf("delete index(%s) %v.", index, err)
  154. return err
  155. }
  156. func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
  157. deleteResult, err := store.client.Delete().
  158. Index(index).
  159. Type(indexType).
  160. Id(id).
  161. Do(ctx)
  162. if err == nil {
  163. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  164. return nil
  165. }
  166. }
  167. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  168. return fmt.Errorf("delete entry %v.", err)
  169. }
  170. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  171. _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
  172. if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
  173. glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
  174. return false
  175. }
  176. return true
  177. })
  178. return
  179. }
  180. func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  181. if string(dirPath) == "/" {
  182. return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
  183. }
  184. return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  185. }
  186. func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  187. indexResult, err := store.client.CatIndices().Do(ctx)
  188. if err != nil {
  189. glog.Errorf("list indices %v.", err)
  190. return
  191. }
  192. for _, index := range indexResult {
  193. if index.Index == indexKV {
  194. continue
  195. }
  196. if strings.HasPrefix(index.Index, indexPrefix) {
  197. if entry, err := store.FindEntry(ctx,
  198. weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
  199. fileName := getFileName(entry.FullPath)
  200. lastFileName = fileName
  201. if fileName == startFileName && !inclusive {
  202. continue
  203. }
  204. limit--
  205. if limit < 0 {
  206. break
  207. }
  208. if !eachEntryFunc(entry) {
  209. break
  210. }
  211. }
  212. }
  213. }
  214. return
  215. }
  216. func (store *ElasticStore) listDirectoryEntries(
  217. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  218. first := true
  219. index := getIndex(fullpath)
  220. nextStart := ""
  221. parentId := weed_util.Md5String([]byte(fullpath))
  222. if _, err = store.client.Refresh(index).Do(ctx); err != nil {
  223. if elastic.IsNotFound(err) {
  224. store.client.CreateIndex(index).Do(ctx)
  225. return
  226. }
  227. }
  228. for {
  229. result := &elastic.SearchResult{}
  230. if (startFileName == "" && first) || inclusive {
  231. if result, err = store.search(ctx, index, parentId); err != nil {
  232. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  233. return
  234. }
  235. } else {
  236. fullPath := string(fullpath) + "/" + startFileName
  237. if !first {
  238. fullPath = nextStart
  239. }
  240. after := weed_util.Md5String([]byte(fullPath))
  241. if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
  242. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  243. return
  244. }
  245. }
  246. first = false
  247. for _, hit := range result.Hits.Hits {
  248. esEntry := &ESEntry{
  249. ParentId: "",
  250. Entry: &filer.Entry{},
  251. }
  252. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  253. limit--
  254. if limit < 0 {
  255. return lastFileName, nil
  256. }
  257. nextStart = string(esEntry.Entry.FullPath)
  258. fileName := getFileName(esEntry.Entry.FullPath)
  259. lastFileName = fileName
  260. if fileName == startFileName && !inclusive {
  261. continue
  262. }
  263. if !eachEntryFunc(esEntry.Entry) {
  264. break
  265. }
  266. }
  267. }
  268. }
  269. return
  270. }
  271. func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
  272. if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
  273. return &elastic.SearchResult{
  274. Hits: &elastic.SearchHits{
  275. Hits: make([]*elastic.SearchHit, 0)},
  276. }, nil
  277. }
  278. queryResult, err := store.client.Search().
  279. Index(index).
  280. Query(elastic.NewMatchQuery("ParentId", parentId)).
  281. Size(store.maxPageSize).
  282. Sort("_id", false).
  283. Do(ctx)
  284. return queryResult, err
  285. }
  286. func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
  287. queryResult, err := store.client.Search().
  288. Index(index).
  289. Query(elastic.NewMatchQuery("ParentId", parentId)).
  290. SearchAfter(after).
  291. Size(store.maxPageSize).
  292. Sort("_id", false).
  293. Do(ctx)
  294. return queryResult, err
  295. }
  296. func (store *ElasticStore) Shutdown() {
  297. store.client.Stop()
  298. }
  299. func getIndex(fullpath weed_util.FullPath) string {
  300. path := strings.Split(string(fullpath), "/")
  301. if len(path) > 1 {
  302. return indexPrefix + path[1]
  303. }
  304. return ""
  305. }
  306. func getFileName(fullpath weed_util.FullPath) string {
  307. path := strings.Split(string(fullpath), "/")
  308. if len(path) > 1 {
  309. return path[len(path)-1]
  310. }
  311. return ""
  312. }