You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

314 lines
9.5 KiB

8 months ago
9 months ago
  1. //go:build elastic
  2. // +build elastic
  3. package elastic
  4. import (
  5. "context"
  6. "fmt"
  7. "math"
  8. "strings"
  9. jsoniter "github.com/json-iterator/go"
  10. elastic "github.com/olivere/elastic/v7"
  11. "github.com/seaweedfs/seaweedfs/weed/filer"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. weed_util "github.com/seaweedfs/seaweedfs/weed/util"
  15. )
  16. var (
  17. indexType = "_doc"
  18. indexPrefix = ".seaweedfs_"
  19. indexKV = ".seaweedfs_kv_entries"
  20. kvMappings = ` {
  21. "mappings": {
  22. "enabled": false,
  23. "properties": {
  24. "Value":{
  25. "type": "binary"
  26. }
  27. }
  28. }
  29. }`
  30. )
  31. type ESEntry struct {
  32. ParentId string `json:"ParentId"`
  33. Entry *filer.Entry
  34. }
  35. type ESKVEntry struct {
  36. Value []byte `json:"Value"`
  37. }
  38. func init() {
  39. filer.Stores = append(filer.Stores, &ElasticStore{})
  40. }
  41. type ElasticStore struct {
  42. client *elastic.Client
  43. maxPageSize int
  44. }
  45. func (store *ElasticStore) GetName() string {
  46. return "elastic7"
  47. }
  48. func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  49. options := []elastic.ClientOptionFunc{}
  50. servers := configuration.GetStringSlice(prefix + "servers")
  51. options = append(options, elastic.SetURL(servers...))
  52. username := configuration.GetString(prefix + "username")
  53. password := configuration.GetString(prefix + "password")
  54. if username != "" && password != "" {
  55. options = append(options, elastic.SetBasicAuth(username, password))
  56. }
  57. options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
  58. options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
  59. store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
  60. if store.maxPageSize <= 0 {
  61. store.maxPageSize = 10000
  62. }
  63. glog.Infof("filer store elastic endpoints: %v.", servers)
  64. return store.initialize(options)
  65. }
  66. func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) {
  67. ctx := context.Background()
  68. store.client, err = elastic.NewClient(options...)
  69. if err != nil {
  70. return fmt.Errorf("init elastic %v.", err)
  71. }
  72. if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok {
  73. _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx)
  74. if err != nil {
  75. return fmt.Errorf("create index(%s) %v.", indexKV, err)
  76. }
  77. }
  78. return nil
  79. }
  80. func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  81. return ctx, nil
  82. }
  83. func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
  84. return nil
  85. }
  86. func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
  87. return nil
  88. }
  89. func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  90. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  91. }
  92. func (store *ElasticStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  93. return lastFileName, filer.ErrUnsupportedRecursivePrefixed
  94. }
  95. func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  96. index := getIndex(entry.FullPath, false)
  97. dir, _ := entry.FullPath.DirAndName()
  98. id := weed_util.Md5String([]byte(entry.FullPath))
  99. esEntry := &ESEntry{
  100. ParentId: weed_util.Md5String([]byte(dir)),
  101. Entry: entry,
  102. }
  103. value, err := jsoniter.Marshal(esEntry)
  104. if err != nil {
  105. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  106. return fmt.Errorf("insert entry %v.", err)
  107. }
  108. _, err = store.client.Index().
  109. Index(index).
  110. Type(indexType).
  111. Id(id).
  112. BodyJson(string(value)).
  113. Do(ctx)
  114. if err != nil {
  115. glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
  116. return fmt.Errorf("insert entry %v.", err)
  117. }
  118. return nil
  119. }
  120. func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  121. return store.InsertEntry(ctx, entry)
  122. }
  123. func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
  124. index := getIndex(fullpath, false)
  125. id := weed_util.Md5String([]byte(fullpath))
  126. searchResult, err := store.client.Get().
  127. Index(index).
  128. Type(indexType).
  129. Id(id).
  130. Do(ctx)
  131. if elastic.IsNotFound(err) {
  132. return nil, filer_pb.ErrNotFound
  133. }
  134. if searchResult != nil && searchResult.Found {
  135. esEntry := &ESEntry{
  136. ParentId: "",
  137. Entry: &filer.Entry{},
  138. }
  139. err := jsoniter.Unmarshal(searchResult.Source, esEntry)
  140. return esEntry.Entry, err
  141. }
  142. glog.Errorf("find entry(%s),%v.", string(fullpath), err)
  143. return nil, filer_pb.ErrNotFound
  144. }
  145. func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  146. index := getIndex(fullpath, false)
  147. id := weed_util.Md5String([]byte(fullpath))
  148. if strings.Count(string(fullpath), "/") == 1 {
  149. return store.deleteIndex(ctx, index)
  150. }
  151. return store.deleteEntry(ctx, index, id)
  152. }
  153. func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
  154. deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
  155. if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
  156. return nil
  157. }
  158. glog.Errorf("delete index(%s) %v.", index, err)
  159. return err
  160. }
  161. func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
  162. deleteResult, err := store.client.Delete().
  163. Index(index).
  164. Type(indexType).
  165. Id(id).
  166. Do(ctx)
  167. if err == nil {
  168. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  169. return nil
  170. }
  171. }
  172. glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
  173. return fmt.Errorf("delete entry %v.", err)
  174. }
  175. func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
  176. _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
  177. if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
  178. glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
  179. return false
  180. }
  181. return true
  182. })
  183. return
  184. }
  185. func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  186. return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
  187. }
  188. func (store *ElasticStore) listDirectoryEntries(
  189. ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  190. first := true
  191. index := getIndex(fullpath, true)
  192. nextStart := ""
  193. parentId := weed_util.Md5String([]byte(fullpath))
  194. if _, err = store.client.Refresh(index).Do(ctx); err != nil {
  195. if elastic.IsNotFound(err) {
  196. store.client.CreateIndex(index).Do(ctx)
  197. return
  198. }
  199. }
  200. for {
  201. result := &elastic.SearchResult{}
  202. if (startFileName == "" && first) || inclusive {
  203. if result, err = store.search(ctx, index, parentId); err != nil {
  204. glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  205. return
  206. }
  207. } else {
  208. fullPath := string(fullpath) + "/" + startFileName
  209. if !first {
  210. fullPath = nextStart
  211. }
  212. after := weed_util.Md5String([]byte(fullPath))
  213. if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
  214. glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
  215. return
  216. }
  217. }
  218. first = false
  219. for _, hit := range result.Hits.Hits {
  220. esEntry := &ESEntry{
  221. ParentId: "",
  222. Entry: &filer.Entry{},
  223. }
  224. if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
  225. limit--
  226. if limit < 0 {
  227. return lastFileName, nil
  228. }
  229. nextStart = string(esEntry.Entry.FullPath)
  230. fileName := esEntry.Entry.FullPath.Name()
  231. if fileName == startFileName && !inclusive {
  232. continue
  233. }
  234. if !eachEntryFunc(esEntry.Entry) {
  235. break
  236. }
  237. lastFileName = fileName
  238. }
  239. }
  240. if len(result.Hits.Hits) < store.maxPageSize {
  241. break
  242. }
  243. }
  244. return
  245. }
  246. func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
  247. if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
  248. return &elastic.SearchResult{
  249. Hits: &elastic.SearchHits{
  250. Hits: make([]*elastic.SearchHit, 0)},
  251. }, nil
  252. }
  253. queryResult, err := store.client.Search().
  254. Index(index).
  255. Query(elastic.NewMatchQuery("ParentId", parentId)).
  256. Size(store.maxPageSize).
  257. Sort("_id", false).
  258. Do(ctx)
  259. return queryResult, err
  260. }
  261. func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
  262. queryResult, err := store.client.Search().
  263. Index(index).
  264. Query(elastic.NewMatchQuery("ParentId", parentId)).
  265. SearchAfter(after).
  266. Size(store.maxPageSize).
  267. Sort("_id", false).
  268. Do(ctx)
  269. return queryResult, err
  270. }
  271. func (store *ElasticStore) Shutdown() {
  272. store.client.Stop()
  273. }
  274. func getIndex(fullpath weed_util.FullPath, isDirectory bool) string {
  275. path := strings.Split(string(fullpath), "/")
  276. if isDirectory && len(path) >= 2 {
  277. return indexPrefix + strings.ToLower(path[1])
  278. }
  279. if len(path) > 2 {
  280. return indexPrefix + strings.ToLower(path[1])
  281. }
  282. if len(path) == 2 {
  283. return indexPrefix
  284. }
  285. return ""
  286. }