chrislu
2 years ago
7 changed files with 1635 additions and 0 deletions
-
6weed/data/Makefile
-
32weed/data/column_uint16.go
-
32weed/data/column_uint32.go
-
103weed/data/columnar.proto
-
1199weed/data/columnar_pb/columnar.pb.go
-
69weed/data/datum.go
-
194weed/data/read_test.go
@ -0,0 +1,6 @@ |
|||
all: gen |
|||
|
|||
.PHONY : gen |
|||
|
|||
gen: |
|||
protoc columnar.proto --go_out=./columnar_pb --go-grpc_out=./columnar_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative |
@ -0,0 +1,32 @@ |
|||
package data |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"fmt" |
|||
"io" |
|||
) |
|||
|
|||
type ColumnUint16 struct { |
|||
} |
|||
|
|||
const SIZE_Uint16 = 2 |
|||
|
|||
func (c *ColumnUint16) Read(buf []byte, readerAt io.ReaderAt, offset int64, i int64) uint16 { |
|||
if n, err := readerAt.ReadAt(buf, offset+i*SIZE_Uint16); n == SIZE_Uint16 && err == nil { |
|||
return binary.BigEndian.Uint16(buf) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func WriteUint16s(buf []byte, data []uint16) (err error) { |
|||
off := 0 |
|||
size := len(data) |
|||
if len(buf) < size<<1 { |
|||
return fmt.Errorf("buf too small") |
|||
} |
|||
for _, dat := range data { |
|||
binary.BigEndian.PutUint16(buf[off:], dat) |
|||
off += SIZE_Uint16 |
|||
} |
|||
return nil |
|||
} |
@ -0,0 +1,32 @@ |
|||
package data |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"fmt" |
|||
"io" |
|||
) |
|||
|
|||
type ColumnUint32 struct { |
|||
} |
|||
|
|||
const SIZE_Uint32 = 4 |
|||
|
|||
func (c *ColumnUint32) Read(buf []byte, readerAt io.ReaderAt, offset int64, i int64) uint32 { |
|||
if n, err := readerAt.ReadAt(buf, offset+i*SIZE_Uint32); n == SIZE_Uint32 && err == nil { |
|||
return binary.BigEndian.Uint32(buf) |
|||
} |
|||
return 0 |
|||
} |
|||
|
|||
func WriteUint32s(buf []byte, data []uint32) (err error) { |
|||
off := 0 |
|||
size := len(data) |
|||
if len(buf) < size<<2 { |
|||
return fmt.Errorf("buf too small") |
|||
} |
|||
for _, dat := range data { |
|||
binary.BigEndian.PutUint32(buf[off:], dat) |
|||
off += SIZE_Uint32 |
|||
} |
|||
return nil |
|||
} |
@ -0,0 +1,103 @@ |
|||
syntax = "proto3"; |
|||
|
|||
package columnar_pb; |
|||
|
|||
option go_package = "github.com/seaweedfs/seaweedfs/weed/data/columnar_pb"; |
|||
|
|||
message FileId { |
|||
uint32 volume_id = 1; |
|||
uint64 file_key = 2; |
|||
fixed32 cookie = 3; |
|||
} |
|||
|
|||
enum LogicalType { |
|||
Uint8 = 0; |
|||
Uint16 = 1; |
|||
Float32 = 4; |
|||
} |
|||
|
|||
message ColumnUint16 { |
|||
uint32 base = 1; |
|||
uint32 min = 3; |
|||
uint32 max = 4; |
|||
} |
|||
|
|||
message ColumnUint32 { |
|||
uint32 base = 1; |
|||
uint32 min = 3; |
|||
uint32 max = 4; |
|||
} |
|||
|
|||
message ColumnFloat32 { |
|||
uint32 min = 3; |
|||
uint32 max = 4; |
|||
} |
|||
|
|||
message ColumnSplit { |
|||
// The ids of the fields/columns in this file |
|||
int32 field_id = 1; |
|||
FileId file_id = 2; |
|||
int64 row_offset = 3; |
|||
int32 row_count = 4; |
|||
|
|||
oneof storage_type { |
|||
ColumnUint16 meta_uint16 = 8; |
|||
ColumnUint32 meta_uint32 = 9; |
|||
ColumnFloat32 meta_float32 = 10; |
|||
} |
|||
} |
|||
|
|||
message Snapshot { |
|||
// All fields of the dataset, including the nested fields. |
|||
repeated Field fields = 1; |
|||
|
|||
repeated string data_files = 2; |
|||
|
|||
// Snapshot version number. |
|||
uint64 version = 3; |
|||
|
|||
} |
|||
|
|||
message DataFile { |
|||
repeated int32 field_ids = 1; |
|||
repeated RowGroup row_groups = 2; |
|||
} |
|||
|
|||
message RowGroup { |
|||
int64 row_offset = 1; |
|||
int32 row_count = 2; |
|||
repeated ColumnSplit column_splits = 3; |
|||
} |
|||
|
|||
// Field metadata for a column. |
|||
message Field { |
|||
enum Type { |
|||
PARENT = 0; |
|||
REPEATED = 1; |
|||
LEAF = 2; |
|||
} |
|||
Type type = 1; |
|||
|
|||
// Fully qualified name. |
|||
string name = 2; |
|||
/// Field Id. |
|||
int32 id = 3; |
|||
/// Parent Field ID. If not set, this is a top-level column. |
|||
int32 parent_id = 4; |
|||
|
|||
// Logical types, support parameterized Arrow Type. |
|||
LogicalType logical_type = 5; |
|||
// If this field is nullable. |
|||
bool nullable = 6; |
|||
} |
|||
|
|||
|
|||
message AnyValue { |
|||
oneof value { |
|||
bytes bytes_value = 1; |
|||
bool bool_value = 2; |
|||
uint64 int64_value = 3; |
|||
uint32 int32_value = 4; |
|||
double double_value = 5; |
|||
} |
|||
} |
1199
weed/data/columnar_pb/columnar.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,69 @@ |
|||
package data |
|||
|
|||
import "fmt" |
|||
|
|||
type Datum interface { |
|||
Compare(other Datum) (int, error) |
|||
} |
|||
type Datums []Datum |
|||
|
|||
type DUint16 uint16 |
|||
type DUint32 uint32 |
|||
type dNull struct{} |
|||
|
|||
var ( |
|||
DNull Datum = dNull{} |
|||
) |
|||
|
|||
func (d dNull) Compare(other Datum) (int, error) { |
|||
if other == DNull { |
|||
return 0, nil |
|||
} |
|||
return -1, nil |
|||
} |
|||
|
|||
func NewDUint16(d DUint16) *DUint16 { |
|||
return &d |
|||
} |
|||
func NewDUint32(d DUint32) *DUint32 { |
|||
return &d |
|||
} |
|||
|
|||
func (d *DUint16) Compare(other Datum) (int, error) { |
|||
if other == DNull { |
|||
return 1, nil |
|||
} |
|||
thisV := *d |
|||
var otherV DUint16 |
|||
switch t := other.(type) { |
|||
case *DUint16: |
|||
otherV = *t |
|||
default: |
|||
return 0, fmt.Errorf("unsupported") |
|||
} |
|||
if thisV < otherV { |
|||
return -1, nil |
|||
} |
|||
if thisV > otherV { |
|||
return 1, nil |
|||
} |
|||
return 0, nil |
|||
} |
|||
func (d *DUint32) Compare(other Datum) (int, error) { |
|||
if other == DNull { |
|||
return 1, nil |
|||
} |
|||
thisV := *d |
|||
var otherV DUint32 |
|||
switch t := other.(type) { |
|||
case *DUint32: |
|||
otherV = *t |
|||
} |
|||
if thisV < otherV { |
|||
return -1, nil |
|||
} |
|||
if thisV > otherV { |
|||
return 1, nil |
|||
} |
|||
return 0, nil |
|||
} |
@ -0,0 +1,194 @@ |
|||
package data |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"fmt" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
"io" |
|||
"testing" |
|||
) |
|||
|
|||
func TestRead(t *testing.T) { |
|||
x := make([]uint16, 128) |
|||
y := make([]uint32, 128) |
|||
|
|||
for i := range x { |
|||
x[i] = uint16(i) |
|||
} |
|||
for i := range y { |
|||
y[i] = uint32(i * 32) |
|||
} |
|||
|
|||
xbuf := make([]byte, len(x)*SIZE_Uint16) |
|||
ybuf := make([]byte, len(x)*SIZE_Uint32) |
|||
|
|||
WriteUint16s(xbuf, x) |
|||
WriteUint32s(ybuf, y) |
|||
|
|||
df := &DataFile{ |
|||
xbuf: xbuf, |
|||
ybuf: ybuf, |
|||
xLen: len(xbuf), |
|||
yLen: len(ybuf), |
|||
xReaderAt: util.NewBytesReader(xbuf), |
|||
yReaderAt: util.NewBytesReader(ybuf), |
|||
} |
|||
|
|||
dataLayout := make(map[FieldName]DataLayout) |
|||
dataLayout["x"] = DataLayout{ |
|||
LayoutType: Uint16, |
|||
SortType: Unsorted, |
|||
} |
|||
dataLayout["y"] = DataLayout{ |
|||
LayoutType: Uint32, |
|||
SortType: Unsorted, |
|||
} |
|||
|
|||
rows, err := df.ReadRows("x", dataLayout, Equal, NewDUint16(65)) |
|||
if err != nil { |
|||
fmt.Printf("err: %v", err) |
|||
return |
|||
} |
|||
for _, row := range rows { |
|||
fmt.Printf("row %d width %d ", row.index, len(row.Datums)) |
|||
for i, d := range row.Datums { |
|||
fmt.Printf("%d: %v ", i, d) |
|||
} |
|||
fmt.Println() |
|||
} |
|||
|
|||
} |
|||
|
|||
type Operator int32 |
|||
type LayoutType int32 |
|||
type SortType int32 |
|||
|
|||
const ( |
|||
Equal Operator = 0 |
|||
GreaterThan |
|||
GreaterOrEqual |
|||
LessThan |
|||
LessOrEqual |
|||
|
|||
Uint16 LayoutType = 0 |
|||
Uint32 = 1 |
|||
|
|||
Unsorted SortType = 0 |
|||
Ascending |
|||
Descending |
|||
) |
|||
|
|||
type DataFile struct { |
|||
xbuf []byte |
|||
ybuf []byte |
|||
xReaderAt io.ReaderAt |
|||
xLen int |
|||
yReaderAt io.ReaderAt |
|||
yLen int |
|||
} |
|||
|
|||
type DataLayout struct { |
|||
LayoutType |
|||
SortType |
|||
} |
|||
|
|||
type FieldName string |
|||
|
|||
func (d *DataFile) ReadRows(field FieldName, layout map[FieldName]DataLayout, op Operator, operand Datum) (rows []*Row, err error) { |
|||
if field == "x" { |
|||
rows, err = pushDownReadRows(d.xReaderAt, d.xLen, layout[field], op, operand) |
|||
if err != nil { |
|||
return |
|||
} |
|||
err = hydrateRows(d.yReaderAt, d.yLen, layout["y"], rows) |
|||
} |
|||
if field == "y" { |
|||
rows, err = pushDownReadRows(d.yReaderAt, d.yLen, layout[field], op, operand) |
|||
if err != nil { |
|||
return |
|||
} |
|||
err = hydrateRows(d.xReaderAt, d.xLen, layout["x"], rows) |
|||
} |
|||
return |
|||
} |
|||
|
|||
type Row struct { |
|||
index int |
|||
Datums |
|||
} |
|||
|
|||
func pushDownReadRows(readerAt io.ReaderAt, dataLen int, layout DataLayout, op Operator, operand Datum) (rows []*Row, err error) { |
|||
if layout.LayoutType == Uint16 { |
|||
if layout.SortType == Unsorted { |
|||
buf := make([]byte, SIZE_Uint16) |
|||
for i := 0; i < dataLen; i += SIZE_Uint16 { |
|||
if n, err := readerAt.ReadAt(buf, int64(i)); n == SIZE_Uint16 && err == nil { |
|||
d := NewDUint16(DUint16(binary.BigEndian.Uint16(buf))) |
|||
cmp, err := d.Compare(operand) |
|||
if err != nil { |
|||
return rows, err |
|||
} |
|||
if cmp == 0 && op == Equal { |
|||
println(1) |
|||
rows = append(rows, &Row{ |
|||
index: i / SIZE_Uint16, |
|||
Datums: []Datum{d}, |
|||
}) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
if layout.LayoutType == Uint32 { |
|||
if layout.SortType == Unsorted { |
|||
buf := make([]byte, SIZE_Uint32) |
|||
for i := 0; i < dataLen; i += SIZE_Uint32 { |
|||
if n, err := readerAt.ReadAt(buf, int64(i)); n == SIZE_Uint32 && err == nil { |
|||
d := NewDUint32(DUint32(binary.BigEndian.Uint32(buf))) |
|||
cmp, err := d.Compare(operand) |
|||
if err != nil { |
|||
return rows, err |
|||
} |
|||
if cmp == 0 && op == Equal { |
|||
println(2) |
|||
rows = append(rows, &Row{ |
|||
index: i / SIZE_Uint32, |
|||
Datums: []Datum{d}, |
|||
}) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func hydrateRows(readerAt io.ReaderAt, dataLen int, layout DataLayout, rows []*Row) (err error) { |
|||
if layout.LayoutType == Uint16 { |
|||
if layout.SortType == Unsorted { |
|||
buf := make([]byte, SIZE_Uint16) |
|||
for _, row := range rows { |
|||
if n, err := readerAt.ReadAt(buf, int64(row.index)*SIZE_Uint16); n == SIZE_Uint16 && err == nil { |
|||
t := binary.BigEndian.Uint16(buf) |
|||
d := NewDUint16(DUint16(t)) |
|||
println(3, "add", t) |
|||
row.Datums = append(row.Datums, d) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
if layout.LayoutType == Uint32 { |
|||
if layout.SortType == Unsorted { |
|||
buf := make([]byte, SIZE_Uint32) |
|||
for _, row := range rows { |
|||
if n, err := readerAt.ReadAt(buf, int64(row.index)*SIZE_Uint32); n == SIZE_Uint32 && err == nil { |
|||
t := binary.BigEndian.Uint32(buf) |
|||
d := NewDUint32(DUint32(t)) |
|||
println(4, "add", t) |
|||
row.Datums = append(row.Datums, d) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue