Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

https://arrow.apache.org/docs/


Arrow Test Code (Go)

Code Block
import (
	"bytes"
	"fmt"
	"testing"

	"github.com/apache/arrow/go/arrow"
	"github.com/apache/arrow/go/arrow/array"
	"github.com/apache/arrow/go/arrow/ipc"
	"github.com/apache/arrow/go/arrow/memory"
)

const (
	_DIM = 4
)

var pool = memory.NewGoAllocator()

func CreateArrowSchema() *arrow.Schema {
	fieldVector := arrow.Field{
		Name: "field_vector",
		Type: arrow.FixedSizeListOf(_DIM, arrow.PrimitiveTypes.Float32),
	}
	fieldVal := arrow.Field{
		Name: "field_val",
		Type: arrow.PrimitiveTypes.Int64,
	}
	schema := arrow.NewSchema([]arrow.Field{fieldVector, fieldVal}, nil)
	return schema
}

func CreateArrowRecord(schema *arrow.Schema, iValues []int64, vValues []float32) array.Record {
	rb := array.NewRecordBuilder(pool, schema)
	defer rb.Release()
	rb.Reserve(len(iValues))

	rowNum := len(iValues)
	for i, field := range rb.Schema().Fields() {
		switch field.Type.ID() {
		case arrow.INT64:
			vb := rb.Field(i).(*array.Int64Builder)
			vb.AppendValues(iValues, nil)
		case arrow.FIXED_SIZE_LIST:
			lb := rb.Field(i).(*array.FixedSizeListBuilder)
			valid := make([]bool, rowNum)
			for i := 0; i < rowNum; i++ {
				valid[i] = true
			}
			lb.AppendValues(valid)
			vb := lb.ValueBuilder().(*array.Float32Builder)
			vb.AppendValues(vValues, nil)
		}
	}

	rec := rb.NewRecord()

	return rec
}

func WriteArrowRecord(schema *arrow.Schema, rec array.Record) []byte {
	defer rec.Release()
	blob := make([]byte, 0)
	buf := bytes.NewBuffer(blob)

	// internal/arrdata/ioutil.go
	writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
	defer writer.Close()

	//ShowArrowRecord(rec)
	if err := writer.Write(rec); err != nil {
		panic("could not write record: %v" + err.Error())
	}

	err := writer.Close()
	if err != nil {
		panic(err.Error())
	}

	return buf.Bytes()
}

func ReadArrowRecords(schema *arrow.Schema, blobs [][]byte) array.Record {
	iValues := make([]int64, 0)
	vValues := make([]float32, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		for _, col := range rec.Columns() {
			switch col.DataType().ID() {
			case arrow.INT64:
				arr := col.(*array.Int64)
				iValues = append(iValues, arr.Int64Values()...)
			case arrow.FIXED_SIZE_LIST:
				arr := col.(*array.FixedSizeList).ListValues().(*array.Float32)
				vValues = append(vValues, arr.Float32Values()...)
			}
		}
	}
	ret := CreateArrowRecord(schema, iValues, vValues)
	ShowArrowRecord(ret)

	return ret
}

func ReadArrowRecordsToTable(schema *arrow.Schema, blobs [][]byte) array.Table {
	recs := make([]array.Record, 0)
	for _, blob := range blobs {
		buf := bytes.NewReader(blob)

		reader, err := ipc.NewReader(buf, ipc.WithSchema(schema), ipc.WithAllocator(pool))
		if err != nil {
			panic("create reader fail: %v" + err.Error())
		}
		defer reader.Release()

		rec, err := reader.Read()
		if err != nil {
			panic("read record fail: %v" + err.Error())
		}
		defer rec.Release()

		recs = append(recs, rec)
	}
	table := array.NewTableFromRecords(schema, recs)
	ShowArrowTable(table)

	return table
}

func ShowArrowRecord(rec array.Record) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", rec.Schema())
	fmt.Printf("NumCols: %v\n", rec.NumCols())
	fmt.Printf("NumRows: %v\n", rec.NumRows())
	//rowNum := int(rec.NumRows())
	for i, col := range rec.Columns() {
		fmt.Printf("Column[%d] %q: %v\n", i, rec.ColumnName(i), col)
	}
}

func ShowArrowTable(tbl array.Table) {
	fmt.Printf("\n=============================\n")
	fmt.Printf("Schema: %v\n", tbl.Schema())
	fmt.Printf("NumCols: %v\n", tbl.NumCols())
	fmt.Printf("NumRows: %v\n", tbl.NumRows())
	for i := 0; i < int(tbl.NumCols()); i++ {
		col := tbl.Column(i)
		fmt.Printf("Column[%d] %s: %v\n", i, tbl.Schema().Field(i).Name, col.Data().Chunks())
	}
}

func TestArrowIPC(t *testing.T) {
	schema := CreateArrowSchema()
	rec0 := CreateArrowRecord(schema, []int64{0}, []float32{0,0,0,0})
	rec1 := CreateArrowRecord(schema, []int64{1,2,3}, []float32{1,1,1,1,2,2,2,2,3,3,3,3})
	blob0 := WriteArrowRecord(schema, rec0)
	blob1 := WriteArrowRecord(schema, rec1)
	ReadArrowRecords(schema, [][]byte{blob0, blob1})
	ReadArrowRecordsToTable(schema, [][]byte{blob0, blob1})
}