69 lines
1.7 KiB

package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/query/json"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/tidwall/gjson"
)
func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
for _, fid := range req.FromFileIds {
vid, id_cookie, err := operation.ParseFileId(fid)
if err != nil {
glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
return err
}
n := new(needle.Needle)
volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(id_cookie)
cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
if n.Cookie != cookie {
glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
return err
}
if req.InputSerialization.CsvInput != nil {
}
if req.InputSerialization.JsonInput != nil {
stripe := &volume_server_pb.QueriedStripe{
Records: nil,
}
filter := json.Query{
Field: req.Filter.Field,
Op: req.Filter.Operand,
Value: req.Filter.Value,
}
gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
if !passedFilter {
return true
}
stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
return true
})
err = stream.Send(stripe)
if err != nil {
return err
}
}
}
return nil
}