diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index ce53fae98..fa0262160 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -3,6 +3,8 @@ package command import ( "context" "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/olivere/elastic/v7" "io" "path/filepath" "strings" @@ -31,6 +33,8 @@ var ( tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") + esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://") + esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name") ) func runFilerMetaTail(cmd *Command, args []string) bool { @@ -80,6 +84,14 @@ func runFilerMetaTail(cmd *Command, args []string) bool { fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification) return nil } + if *esServers != "" { + var err error + eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex) + if err != nil { + fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err) + return false + } + } tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -118,3 +130,72 @@ func runFilerMetaTail(cmd *Command, args []string) bool { return true } + +type EsDocument struct { + Dir string `json:"dir,omitempty"` + Name string `json:"name,omitempty"` + IsDirectory bool `json:"isDir,omitempty"` + Size uint64 `json:"size,omitempty"` + Uid uint32 `json:"uid,omitempty"` + Gid uint32 `json:"gid,omitempty"` + UserName string `json:"userName,omitempty"` + Collection string `json:"collection,omitempty"` + Crtime int64 `json:"crtime,omitempty"` + Mtime int64 `json:"mtime,omitempty"` + Mime string `json:"mime,omitempty"` +} + +func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { + entry := event.NewEntry + dir, name := event.NewParentPath, entry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + esEntry := &EsDocument{ + Dir: dir, + Name: name, + IsDirectory: entry.IsDirectory, + Size: entry.Attributes.FileSize, + Uid: entry.Attributes.Uid, + Gid: entry.Attributes.Gid, + UserName: entry.Attributes.UserName, + Collection: entry.Attributes.Collection, + Crtime: entry.Attributes.Crtime, + Mtime: entry.Attributes.Mtime, + Mime: entry.Attributes.Mime, + } + return esEntry, id +} + +func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { + options := []elastic.ClientOptionFunc{} + options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) + options = append(options, elastic.SetSniff(false)) + options = append(options, elastic.SetHealthcheck(false)) + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return func(resp *filer_pb.SubscribeMetadataResponse) error { + event := resp.EventNotification + if event.OldEntry != nil && + (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { + // delete or not update the same file + dir, name := resp.Directory, event.OldEntry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + println("delete", id) + _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) + return err + } + if event.NewEntry != nil { + // add a new file or update the same file + esEntry, id := toEsEntry(event) + value, err := jsoniter.Marshal(esEntry) + if err != nil { + return err + } + println(string(value)) + _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) + return err + } + return nil + }, nil +}