|
@ -1,12 +1,9 @@ |
|
|
package command |
|
|
package command |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
|
|
|
"fmt" |
|
|
"fmt" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb" |
|
|
"github.com/chrislusf/seaweedfs/weed/pb" |
|
|
"github.com/golang/protobuf/jsonpb" |
|
|
"github.com/golang/protobuf/jsonpb" |
|
|
jsoniter "github.com/json-iterator/go" |
|
|
|
|
|
elastic "github.com/olivere/elastic/v7" |
|
|
|
|
|
"os" |
|
|
"os" |
|
|
"path/filepath" |
|
|
"path/filepath" |
|
|
"strings" |
|
|
"strings" |
|
@ -124,72 +121,3 @@ func runFilerMetaTail(cmd *Command, args []string) bool { |
|
|
|
|
|
|
|
|
return true |
|
|
return true |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type EsDocument struct { |
|
|
|
|
|
Dir string `json:"dir,omitempty"` |
|
|
|
|
|
Name string `json:"name,omitempty"` |
|
|
|
|
|
IsDirectory bool `json:"isDir,omitempty"` |
|
|
|
|
|
Size uint64 `json:"size,omitempty"` |
|
|
|
|
|
Uid uint32 `json:"uid,omitempty"` |
|
|
|
|
|
Gid uint32 `json:"gid,omitempty"` |
|
|
|
|
|
UserName string `json:"userName,omitempty"` |
|
|
|
|
|
Collection string `json:"collection,omitempty"` |
|
|
|
|
|
Crtime int64 `json:"crtime,omitempty"` |
|
|
|
|
|
Mtime int64 `json:"mtime,omitempty"` |
|
|
|
|
|
Mime string `json:"mime,omitempty"` |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { |
|
|
|
|
|
entry := event.NewEntry |
|
|
|
|
|
dir, name := event.NewParentPath, entry.Name |
|
|
|
|
|
id := util.Md5String([]byte(util.NewFullPath(dir, name))) |
|
|
|
|
|
esEntry := &EsDocument{ |
|
|
|
|
|
Dir: dir, |
|
|
|
|
|
Name: name, |
|
|
|
|
|
IsDirectory: entry.IsDirectory, |
|
|
|
|
|
Size: entry.Attributes.FileSize, |
|
|
|
|
|
Uid: entry.Attributes.Uid, |
|
|
|
|
|
Gid: entry.Attributes.Gid, |
|
|
|
|
|
UserName: entry.Attributes.UserName, |
|
|
|
|
|
Collection: entry.Attributes.Collection, |
|
|
|
|
|
Crtime: entry.Attributes.Crtime, |
|
|
|
|
|
Mtime: entry.Attributes.Mtime, |
|
|
|
|
|
Mime: entry.Attributes.Mime, |
|
|
|
|
|
} |
|
|
|
|
|
return esEntry, id |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { |
|
|
|
|
|
options := []elastic.ClientOptionFunc{} |
|
|
|
|
|
options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) |
|
|
|
|
|
options = append(options, elastic.SetSniff(false)) |
|
|
|
|
|
options = append(options, elastic.SetHealthcheck(false)) |
|
|
|
|
|
client, err := elastic.NewClient(options...) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, err |
|
|
|
|
|
} |
|
|
|
|
|
return func(resp *filer_pb.SubscribeMetadataResponse) error { |
|
|
|
|
|
event := resp.EventNotification |
|
|
|
|
|
if event.OldEntry != nil && |
|
|
|
|
|
(event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { |
|
|
|
|
|
// delete or not update the same file
|
|
|
|
|
|
dir, name := resp.Directory, event.OldEntry.Name |
|
|
|
|
|
id := util.Md5String([]byte(util.NewFullPath(dir, name))) |
|
|
|
|
|
println("delete", id) |
|
|
|
|
|
_, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
if event.NewEntry != nil { |
|
|
|
|
|
// add a new file or update the same file
|
|
|
|
|
|
esEntry, id := toEsEntry(event) |
|
|
|
|
|
value, err := jsoniter.Marshal(esEntry) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
println(string(value)) |
|
|
|
|
|
_, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
}, nil |
|
|
|
|
|
} |
|
|
|