...
https://arrow.apache.org/docs/
Arrow Test Code (Go)
Code Block |
---|
import (
"bytes"
"fmt"
"testing"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
const (
_DIM = 4
)
var pool = memory.NewGoAllocator()
func CreateArrowSchema() *arrow.Schema {
fieldVector := arrow.Field{
Name: "field_vector",
Type: arrow.FixedSizeListOf(_DIM, arrow.PrimitiveTypes.Float32),
}
fieldVal := arrow.Field{
Name: "field_val",
Type: arrow.PrimitiveTypes.Int64,
}
schema := arrow.NewSchema([]arrow.Field{fieldVector, fieldVal}, nil)
return schema
}
func CreateArrowRecord(schema *arrow.Schema, iValues []int64, vValues []float32) array.Record {
rb := array.NewRecordBuilder(pool, schema)
defer rb.Release()
rb.Reserve(len(iValues))
rowNum := len(iValues)
for i, field := range rb.Schema().Fields() {
switch field.Type.ID() {
case arrow.INT64:
vb := rb.Field(i).(*array.Int64Builder)
vb.AppendValues(iValues, nil)
case arrow.FIXED_SIZE_LIST:
lb := rb.Field(i).(*array.FixedSizeListBuilder)
valid := make([]bool, rowNum)
for i := 0; i < rowNum; i++ {
valid[i] = true
}
lb.AppendValues(valid)
vb := lb.ValueBuilder().(*array.Float32Builder)
vb.AppendValues(vValues, nil)
}
}
rec := rb.NewRecord()
return rec
}
func WriteArrowRecord(schema *arrow.Schema, rec array.Record) []byte {
defer rec.Release()
blob := make([]byte, 0)
buf := bytes.NewBuffer(blob)
// internal/arrdata/ioutil.go
writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
defer writer.Close()
//ShowArrowRecord(rec)
if err := writer.Write(rec); err != nil {
panic("could not write record: %v" + err.Error())
}
err := writer.Close()
if err != nil {
panic(err.Error())
}
return buf.Bytes()
}
func ReadArrowRecords(schema *arrow.Schema, blobs [][]byte) array.Record {
iValues := make([]int64, 0)
vValues := make([]float32, 0)
for _, blob := range blobs {
buf := bytes.NewReader(blob)
reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
if err != nil {
panic("create reader fail: %v" + err.Error())
}
defer reader.Release()
rec, err := reader.Read()
if err != nil {
panic("read record fail: %v" + err.Error())
}
defer rec.Release()
for _, col := range rec.Columns() {
switch col.DataType().ID() {
case arrow.INT64:
arr := col.(*array.Int64)
iValues = append(iValues, arr.Int64Values()...)
case arrow.FIXED_SIZE_LIST:
arr := col.(*array.FixedSizeList).ListValues().(*array.Float32)
vValues = append(vValues, arr.Float32Values()...)
}
}
}
ret := CreateArrowRecord(schema, iValues, vValues)
ShowArrowRecord(ret)
return ret
}
func ReadArrowRecordsToTable(schema *arrow.Schema, blobs [][]byte) array.Table {
recs := make([]array.Record, 0)
for _, blob := range blobs {
buf := bytes.NewReader(blob)
reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
if err != nil {
panic("create reader fail: %v" + err.Error())
}
defer reader.Release()
rec, err := reader.Read()
if err != nil {
panic("read record fail: %v" + err.Error())
}
defer rec.Release()
recs = append(recs, rec)
}
table := array.NewTableFromRecords(schema, recs)
ShowArrowTable(table)
return table
}
func ShowArrowRecord(rec array.Record) {
fmt.Printf("\n=============================\n")
fmt.Printf("Schema: %v\n", rec.Schema())
fmt.Printf("NumCols: %v\n", rec.NumCols())
fmt.Printf("NumRows: %v\n", rec.NumRows())
//rowNum := int(rec.NumRows())
for i, col := range rec.Columns() {
fmt.Printf("Column[%d] %q: %v\n", i, rec.ColumnName(i), col)
}
}
func ShowArrowTable(tbl array.Table) {
fmt.Printf("\n=============================\n")
fmt.Printf("Schema: %v\n", tbl.Schema())
fmt.Printf("NumCols: %v\n", tbl.NumCols())
fmt.Printf("NumRows: %v\n", tbl.NumRows())
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
fmt.Printf("Column[%d] %s: %v\n", i, tbl.Schema().Field(i).Name, col.Data().Chunks())
}
}
func TestArrowIPC(t *testing.T) {
schema := CreateArrowSchema()
rec0 := CreateArrowRecord(schema, []int64{0}, []float32{0,0,0,0})
rec1 := CreateArrowRecord(schema, []int64{1,2,3}, []float32{1,1,1,1,2,2,2,2,3,3,3,3})
blob0 := WriteArrowRecord(schema, rec0)
blob1 := WriteArrowRecord(schema, rec1)
ReadArrowRecords(schema, [][]byte{blob0, blob1})
ReadArrowRecordsToTable(schema, [][]byte{blob0, blob1})
} |