You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
4.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
4 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/pb"
  5. "github.com/golang/protobuf/jsonpb"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
  16. }
  17. var cmdFilerMetaTail = &Command{
  18. UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
  19. Short: "see continuous changes on a filer",
  20. Long: `See continuous changes on a filer.
  21. weed filer.meta.tail -timeAgo=30h | grep truncate
  22. weed filer.meta.tail -timeAgo=30h | jq .
  23. weed filer.meta.tail -timeAgo=30h -untilTimeAgo=20h | jq .
  24. weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
  25. weed filer.meta.tail -timeAgo=30h -es=http://<elasticSearchServerHost>:<port> -es.index=seaweedfs
  26. `,
  27. }
  28. var (
  29. tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
  30. tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
  31. tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  32. tailStop = cmdFilerMetaTail.Flag.Duration("untilTimeAgo", 0, "read until this time ago. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  33. tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
  34. esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
  35. esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
  36. )
  37. func runFilerMetaTail(cmd *Command, args []string) bool {
  38. util.LoadConfiguration("security", false)
  39. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  40. clientId := util.RandomInt32()
  41. var filterFunc func(dir, fname string) bool
  42. if *tailPattern != "" {
  43. if strings.Contains(*tailPattern, "/") {
  44. println("watch path pattern", *tailPattern)
  45. filterFunc = func(dir, fname string) bool {
  46. matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
  47. if err != nil {
  48. fmt.Printf("error: %v", err)
  49. }
  50. return matched
  51. }
  52. } else {
  53. println("watch file pattern", *tailPattern)
  54. filterFunc = func(dir, fname string) bool {
  55. matched, err := filepath.Match(*tailPattern, fname)
  56. if err != nil {
  57. fmt.Printf("error: %v", err)
  58. }
  59. return matched
  60. }
  61. }
  62. }
  63. shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
  64. if filer_pb.IsEmpty(resp) {
  65. return false
  66. }
  67. if filterFunc == nil {
  68. return true
  69. }
  70. if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
  71. return true
  72. }
  73. if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
  74. return true
  75. }
  76. return false
  77. }
  78. jsonpbMarshaler := jsonpb.Marshaler{
  79. EmitDefaults: false,
  80. }
  81. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  82. jsonpbMarshaler.Marshal(os.Stdout, resp)
  83. fmt.Fprintln(os.Stdout)
  84. return nil
  85. }
  86. if *esServers != "" {
  87. var err error
  88. eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
  89. if err != nil {
  90. fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
  91. return false
  92. }
  93. }
  94. var untilTsNs int64
  95. if *tailStop != 0 {
  96. untilTsNs = time.Now().Add(-*tailStop).UnixNano()
  97. }
  98. tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil,
  99. time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error {
  100. if !shouldPrint(resp) {
  101. return nil
  102. }
  103. if err := eachEntryFunc(resp); err != nil {
  104. return err
  105. }
  106. return nil
  107. }, false)
  108. if tailErr != nil {
  109. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  110. }
  111. return true
  112. }