Browse Source

add uint64

data_query_pushdown
chrislu 2 years ago
parent
commit
5e42b69841
  1. 32
      weed/data/column_uint64.go
  2. 22
      weed/data/datum.go
  3. 69
      weed/data/read_test.go

32
weed/data/column_uint64.go

@ -0,0 +1,32 @@
package data
import (
"encoding/binary"
"fmt"
"io"
)
type ColumnUint64 struct {
}
const SIZE_Uint64 = 8
func (c *ColumnUint64) Read(buf []byte, readerAt io.ReaderAt, offset int64, i int64) uint64 {
if n, err := readerAt.ReadAt(buf, offset+i*SIZE_Uint64); n == SIZE_Uint64 && err == nil {
return binary.BigEndian.Uint64(buf)
}
return 0
}
func WriteUint64s(buf []byte, data []uint64) (err error) {
off := 0
size := len(data)
if len(buf) < size<<3 {
return fmt.Errorf("buf too small")
}
for _, dat := range data {
binary.BigEndian.PutUint64(buf[off:], dat)
off += SIZE_Uint64
}
return nil
}

22
weed/data/datum.go

@ -9,6 +9,7 @@ type Datums []Datum
type DUint16 uint16
type DUint32 uint32
type DUint64 uint64
type dNull struct{}
var (
@ -28,6 +29,9 @@ func NewDUint16(d DUint16) *DUint16 {
func NewDUint32(d DUint32) *DUint32 {
return &d
}
func NewDUint64(d DUint64) *DUint64 {
return &d
}
func (d *DUint16) Compare(other Datum) (int, error) {
if other == DNull {
@ -67,3 +71,21 @@ func (d *DUint32) Compare(other Datum) (int, error) {
}
return 0, nil
}
func (d *DUint64) Compare(other Datum) (int, error) {
if other == DNull {
return 1, nil
}
thisV := *d
var otherV DUint64
switch t := other.(type) {
case *DUint64:
otherV = *t
}
if thisV < otherV {
return -1, nil
}
if thisV > otherV {
return 1, nil
}
return 0, nil
}

69
weed/data/read_test.go

@ -11,27 +11,36 @@ import (
func TestRead(t *testing.T) {
x := make([]uint16, 128)
y := make([]uint32, 128)
z := make([]uint64, 128)
for i := range x {
x[i] = uint16(i)
}
for i := range y {
y[i] = uint32(i * 32)
y[i] = uint32(i * 2)
}
for i := range z {
z[i] = uint64(i * 3)
}
xbuf := make([]byte, len(x)*SIZE_Uint16)
ybuf := make([]byte, len(x)*SIZE_Uint32)
zbuf := make([]byte, len(x)*SIZE_Uint64)
WriteUint16s(xbuf, x)
WriteUint32s(ybuf, y)
WriteUint64s(zbuf, z)
df := &DataFile{
xbuf: xbuf,
ybuf: ybuf,
zbuf: zbuf,
xLen: len(xbuf),
yLen: len(ybuf),
zLen: len(zbuf),
xReaderAt: util.NewBytesReader(xbuf),
yReaderAt: util.NewBytesReader(ybuf),
zReaderAt: util.NewBytesReader(zbuf),
}
dataLayout := make(map[FieldName]DataLayout)
@ -43,6 +52,10 @@ func TestRead(t *testing.T) {
LayoutType: Uint32,
SortType: Unsorted,
}
dataLayout["z"] = DataLayout{
LayoutType: Uint64,
SortType: Unsorted,
}
rows, err := df.ReadRows("x", dataLayout, Equal, NewDUint16(65))
if err != nil {
@ -72,6 +85,7 @@ const (
Uint16 LayoutType = 0
Uint32 = 1
Uint64 = 2
Unsorted SortType = 0
Ascending
@ -81,10 +95,13 @@ const (
type DataFile struct {
xbuf []byte
ybuf []byte
zbuf []byte
xReaderAt io.ReaderAt
xLen int
yReaderAt io.ReaderAt
zReaderAt io.ReaderAt
xLen int
yLen int
zLen int
}
type DataLayout struct {
@ -101,6 +118,13 @@ func (d *DataFile) ReadRows(field FieldName, layout map[FieldName]DataLayout, op
return
}
err = hydrateRows(d.yReaderAt, d.yLen, layout["y"], rows)
if err != nil {
return
}
err = hydrateRows(d.zReaderAt, d.zLen, layout["z"], rows)
if err != nil {
return
}
}
if field == "y" {
rows, err = pushDownReadRows(d.yReaderAt, d.yLen, layout[field], op, operand)
@ -108,6 +132,13 @@ func (d *DataFile) ReadRows(field FieldName, layout map[FieldName]DataLayout, op
return
}
err = hydrateRows(d.xReaderAt, d.xLen, layout["x"], rows)
if err != nil {
return
}
err = hydrateRows(d.zReaderAt, d.zLen, layout["z"], rows)
if err != nil {
return
}
}
return
}
@ -160,6 +191,27 @@ func pushDownReadRows(readerAt io.ReaderAt, dataLen int, layout DataLayout, op O
}
}
}
if layout.LayoutType == Uint64 {
if layout.SortType == Unsorted {
buf := make([]byte, SIZE_Uint64)
for i := 0; i < dataLen; i += SIZE_Uint64 {
if n, err := readerAt.ReadAt(buf, int64(i)); n == SIZE_Uint64 && err == nil {
d := NewDUint64(DUint64(binary.BigEndian.Uint64(buf)))
cmp, err := d.Compare(operand)
if err != nil {
return rows, err
}
if cmp == 0 && op == Equal {
println(2)
rows = append(rows, &Row{
index: i / SIZE_Uint64,
Datums: []Datum{d},
})
}
}
}
}
}
return
}
@ -190,5 +242,18 @@ func hydrateRows(readerAt io.ReaderAt, dataLen int, layout DataLayout, rows []*R
}
}
}
if layout.LayoutType == Uint64 {
if layout.SortType == Unsorted {
buf := make([]byte, SIZE_Uint64)
for _, row := range rows {
if n, err := readerAt.ReadAt(buf, int64(row.index)*SIZE_Uint64); n == SIZE_Uint64 && err == nil {
t := binary.BigEndian.Uint64(buf)
d := NewDUint64(DUint64(t))
println(5, "add", t)
row.Datums = append(row.Datums, d)
}
}
}
}
return
}
Loading…
Cancel
Save