You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.3 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "path/filepath"
  7. "strings"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. cmdWatch.Run = runWatch // break init cycle
  16. }
  17. var cmdWatch = &Command{
  18. UsageLine: "watch [-filer=localhost:8888] [-target=/]",
  19. Short: "see recent changes on a filer",
  20. Long: `See recent changes on a filer.
  21. `,
  22. }
  23. var (
  24. watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
  25. watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
  26. watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  27. watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
  28. )
  29. func runWatch(cmd *Command, args []string) bool {
  30. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  31. var filterFunc func(dir, fname string) bool
  32. if *watchPattern != "" {
  33. if strings.Contains(*watchPattern, "/") {
  34. println("watch path pattern", *watchPattern)
  35. filterFunc = func(dir, fname string) bool {
  36. matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
  37. if err != nil {
  38. fmt.Printf("error: %v", err)
  39. }
  40. return matched
  41. }
  42. } else {
  43. println("watch file pattern", *watchPattern)
  44. filterFunc = func(dir, fname string) bool {
  45. matched, err := filepath.Match(*watchPattern, fname)
  46. if err != nil {
  47. fmt.Printf("error: %v", err)
  48. }
  49. return matched
  50. }
  51. }
  52. }
  53. shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
  54. if filterFunc == nil {
  55. return true
  56. }
  57. if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
  58. return false
  59. }
  60. if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
  61. return true
  62. }
  63. if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
  64. return true
  65. }
  66. return false
  67. }
  68. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  69. fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
  70. return nil
  71. }
  72. watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  73. ctx, cancel := context.WithCancel(context.Background())
  74. defer cancel()
  75. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  76. ClientName: "watch",
  77. PathPrefix: *watchTarget,
  78. SinceNs: time.Now().Add(-*watchStart).UnixNano(),
  79. })
  80. if err != nil {
  81. return fmt.Errorf("listen: %v", err)
  82. }
  83. for {
  84. resp, listenErr := stream.Recv()
  85. if listenErr == io.EOF {
  86. return nil
  87. }
  88. if listenErr != nil {
  89. return listenErr
  90. }
  91. if !shouldPrint(resp) {
  92. continue
  93. }
  94. if err = eachEntryFunc(resp); err != nil {
  95. return err
  96. }
  97. }
  98. })
  99. if watchErr != nil {
  100. fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
  101. }
  102. return true
  103. }