You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
6.5 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/golang/protobuf/jsonpb"
  6. jsoniter "github.com/json-iterator/go"
  7. "github.com/olivere/elastic/v7"
  8. "io"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/security"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. func init() {
  19. cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
  20. }
  21. var cmdFilerMetaTail = &Command{
  22. UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
  23. Short: "see continuous changes on a filer",
  24. Long: `See continuous changes on a filer.
  25. weed filer.meta.tail -timeAgo=30h | grep truncate
  26. weed filer.meta.tail -timeAgo=30h | jq .
  27. weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
  28. `,
  29. }
  30. var (
  31. tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
  32. tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
  33. tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  34. tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
  35. esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
  36. esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
  37. )
  38. func runFilerMetaTail(cmd *Command, args []string) bool {
  39. util.LoadConfiguration("security", false)
  40. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  41. var filterFunc func(dir, fname string) bool
  42. if *tailPattern != "" {
  43. if strings.Contains(*tailPattern, "/") {
  44. println("watch path pattern", *tailPattern)
  45. filterFunc = func(dir, fname string) bool {
  46. matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
  47. if err != nil {
  48. fmt.Printf("error: %v", err)
  49. }
  50. return matched
  51. }
  52. } else {
  53. println("watch file pattern", *tailPattern)
  54. filterFunc = func(dir, fname string) bool {
  55. matched, err := filepath.Match(*tailPattern, fname)
  56. if err != nil {
  57. fmt.Printf("error: %v", err)
  58. }
  59. return matched
  60. }
  61. }
  62. }
  63. shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
  64. if filterFunc == nil {
  65. return true
  66. }
  67. if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
  68. return false
  69. }
  70. if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
  71. return true
  72. }
  73. if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
  74. return true
  75. }
  76. return false
  77. }
  78. jsonpbMarshaler := jsonpb.Marshaler{
  79. EmitDefaults: false,
  80. }
  81. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  82. jsonpbMarshaler.Marshal(os.Stdout, resp)
  83. fmt.Fprintln(os.Stdout)
  84. return nil
  85. }
  86. if *esServers != "" {
  87. var err error
  88. eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
  89. if err != nil {
  90. fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
  91. return false
  92. }
  93. }
  94. tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  95. ctx, cancel := context.WithCancel(context.Background())
  96. defer cancel()
  97. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  98. ClientName: "tail",
  99. PathPrefix: *tailTarget,
  100. SinceNs: time.Now().Add(-*tailStart).UnixNano(),
  101. })
  102. if err != nil {
  103. return fmt.Errorf("listen: %v", err)
  104. }
  105. for {
  106. resp, listenErr := stream.Recv()
  107. if listenErr == io.EOF {
  108. return nil
  109. }
  110. if listenErr != nil {
  111. return listenErr
  112. }
  113. if !shouldPrint(resp) {
  114. continue
  115. }
  116. if err = eachEntryFunc(resp); err != nil {
  117. return err
  118. }
  119. }
  120. })
  121. if tailErr != nil {
  122. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  123. }
  124. return true
  125. }
  126. type EsDocument struct {
  127. Dir string `json:"dir,omitempty"`
  128. Name string `json:"name,omitempty"`
  129. IsDirectory bool `json:"isDir,omitempty"`
  130. Size uint64 `json:"size,omitempty"`
  131. Uid uint32 `json:"uid,omitempty"`
  132. Gid uint32 `json:"gid,omitempty"`
  133. UserName string `json:"userName,omitempty"`
  134. Collection string `json:"collection,omitempty"`
  135. Crtime int64 `json:"crtime,omitempty"`
  136. Mtime int64 `json:"mtime,omitempty"`
  137. Mime string `json:"mime,omitempty"`
  138. }
  139. func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
  140. entry := event.NewEntry
  141. dir, name := event.NewParentPath, entry.Name
  142. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  143. esEntry := &EsDocument{
  144. Dir: dir,
  145. Name: name,
  146. IsDirectory: entry.IsDirectory,
  147. Size: entry.Attributes.FileSize,
  148. Uid: entry.Attributes.Uid,
  149. Gid: entry.Attributes.Gid,
  150. UserName: entry.Attributes.UserName,
  151. Collection: entry.Attributes.Collection,
  152. Crtime: entry.Attributes.Crtime,
  153. Mtime: entry.Attributes.Mtime,
  154. Mime: entry.Attributes.Mime,
  155. }
  156. return esEntry, id
  157. }
  158. func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
  159. options := []elastic.ClientOptionFunc{}
  160. options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
  161. options = append(options, elastic.SetSniff(false))
  162. options = append(options, elastic.SetHealthcheck(false))
  163. client, err := elastic.NewClient(options...)
  164. if err != nil {
  165. return nil, err
  166. }
  167. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  168. event := resp.EventNotification
  169. if event.OldEntry != nil &&
  170. (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
  171. // delete or not update the same file
  172. dir, name := resp.Directory, event.OldEntry.Name
  173. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  174. println("delete", id)
  175. _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
  176. return err
  177. }
  178. if event.NewEntry != nil {
  179. // add a new file or update the same file
  180. esEntry, id := toEsEntry(event)
  181. value, err := jsoniter.Marshal(esEntry)
  182. if err != nil {
  183. return err
  184. }
  185. println(string(value))
  186. _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
  187. return err
  188. }
  189. return nil
  190. }, nil
  191. }