You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

82 lines
2.7 KiB

  1. //go:build elastic
  2. // +build elastic
  3. package command
  4. import (
  5. "context"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. jsoniter "github.com/json-iterator/go"
  9. elastic "github.com/olivere/elastic/v7"
  10. "strings"
  11. )
  12. type EsDocument struct {
  13. Dir string `json:"dir,omitempty"`
  14. Name string `json:"name,omitempty"`
  15. IsDirectory bool `json:"isDir,omitempty"`
  16. Size uint64 `json:"size,omitempty"`
  17. Uid uint32 `json:"uid,omitempty"`
  18. Gid uint32 `json:"gid,omitempty"`
  19. UserName string `json:"userName,omitempty"`
  20. Collection string `json:"collection,omitempty"`
  21. Crtime int64 `json:"crtime,omitempty"`
  22. Mtime int64 `json:"mtime,omitempty"`
  23. Mime string `json:"mime,omitempty"`
  24. }
  25. func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
  26. entry := event.NewEntry
  27. dir, name := event.NewParentPath, entry.Name
  28. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  29. esEntry := &EsDocument{
  30. Dir: dir,
  31. Name: name,
  32. IsDirectory: entry.IsDirectory,
  33. Size: entry.Attributes.FileSize,
  34. Uid: entry.Attributes.Uid,
  35. Gid: entry.Attributes.Gid,
  36. UserName: entry.Attributes.UserName,
  37. Collection: entry.Attributes.Collection,
  38. Crtime: entry.Attributes.Crtime,
  39. Mtime: entry.Attributes.Mtime,
  40. Mime: entry.Attributes.Mime,
  41. }
  42. return esEntry, id
  43. }
  44. func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
  45. options := []elastic.ClientOptionFunc{}
  46. options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
  47. options = append(options, elastic.SetSniff(false))
  48. options = append(options, elastic.SetHealthcheck(false))
  49. client, err := elastic.NewClient(options...)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  54. event := resp.EventNotification
  55. if event.OldEntry != nil &&
  56. (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
  57. // delete or not update the same file
  58. dir, name := resp.Directory, event.OldEntry.Name
  59. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  60. println("delete", id)
  61. _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
  62. return err
  63. }
  64. if event.NewEntry != nil {
  65. // add a new file or update the same file
  66. esEntry, id := toEsEntry(event)
  67. value, err := jsoniter.Marshal(esEntry)
  68. if err != nil {
  69. return err
  70. }
  71. println(string(value))
  72. _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
  73. return err
  74. }
  75. return nil
  76. }, nil
  77. }