[wip] proto: streaming rows with msgpack

Signed-off-by: Anton Nesterov <anton@demiurg.io>
This commit is contained in:
Anton Nesterov 2024-08-15 06:23:36 +02:00
parent 8fd9a369d1
commit 920098e80c
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
11 changed files with 393 additions and 672 deletions

View file

@ -20,6 +20,11 @@ func convertFields(fields []Map) (string, error) {
continue continue
} }
asNum, ok := as.(int) asNum, ok := as.(int)
if !ok {
n, k := as.(int64)
asNum = int(n)
ok = k
}
if ok { if ok {
if asNum == 1 { if asNum == 1 {
expressions = append(expressions, field) expressions = append(expressions, field)

View file

@ -1,14 +0,0 @@
package proto
//go:generate msgp
type RequestError struct {
Message string `msg:"msg"`
ErrorCode int `msg:"error_code"`
}
type Response struct {
Id uint32 `msg:"id"`
Result []interface{} `msg:"result"`
Error RequestError `msg:"error"`
}

View file

@ -1,412 +0,0 @@
package proto
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *RequestError) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "msg":
z.Message, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Message")
return
}
case "error_code":
z.ErrorCode, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "ErrorCode")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z RequestError) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "msg"
err = en.Append(0x82, 0xa3, 0x6d, 0x73, 0x67)
if err != nil {
return
}
err = en.WriteString(z.Message)
if err != nil {
err = msgp.WrapError(err, "Message")
return
}
// write "error_code"
err = en.Append(0xaa, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x63, 0x6f, 0x64, 0x65)
if err != nil {
return
}
err = en.WriteInt(z.ErrorCode)
if err != nil {
err = msgp.WrapError(err, "ErrorCode")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z RequestError) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "msg"
o = append(o, 0x82, 0xa3, 0x6d, 0x73, 0x67)
o = msgp.AppendString(o, z.Message)
// string "error_code"
o = append(o, 0xaa, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x63, 0x6f, 0x64, 0x65)
o = msgp.AppendInt(o, z.ErrorCode)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *RequestError) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "msg":
z.Message, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Message")
return
}
case "error_code":
z.ErrorCode, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ErrorCode")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z RequestError) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.Message) + 11 + msgp.IntSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *Response) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.Id, err = dc.ReadUint32()
if err != nil {
err = msgp.WrapError(err, "Id")
return
}
case "result":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Result")
return
}
if cap(z.Result) >= int(zb0002) {
z.Result = (z.Result)[:zb0002]
} else {
z.Result = make([]interface{}, zb0002)
}
for za0001 := range z.Result {
z.Result[za0001], err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Result", za0001)
return
}
}
case "error":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
for zb0003 > 0 {
zb0003--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
switch msgp.UnsafeString(field) {
case "msg":
z.Error.Message, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Error", "Message")
return
}
case "error_code":
z.Error.ErrorCode, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Error", "ErrorCode")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *Response) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "id"
err = en.Append(0x83, 0xa2, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteUint32(z.Id)
if err != nil {
err = msgp.WrapError(err, "Id")
return
}
// write "result"
err = en.Append(0xa6, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.Result)))
if err != nil {
err = msgp.WrapError(err, "Result")
return
}
for za0001 := range z.Result {
err = en.WriteIntf(z.Result[za0001])
if err != nil {
err = msgp.WrapError(err, "Result", za0001)
return
}
}
// write "error"
err = en.Append(0xa5, 0x65, 0x72, 0x72, 0x6f, 0x72)
if err != nil {
return
}
// map header, size 2
// write "msg"
err = en.Append(0x82, 0xa3, 0x6d, 0x73, 0x67)
if err != nil {
return
}
err = en.WriteString(z.Error.Message)
if err != nil {
err = msgp.WrapError(err, "Error", "Message")
return
}
// write "error_code"
err = en.Append(0xaa, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x63, 0x6f, 0x64, 0x65)
if err != nil {
return
}
err = en.WriteInt(z.Error.ErrorCode)
if err != nil {
err = msgp.WrapError(err, "Error", "ErrorCode")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *Response) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "id"
o = append(o, 0x83, 0xa2, 0x69, 0x64)
o = msgp.AppendUint32(o, z.Id)
// string "result"
o = append(o, 0xa6, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74)
o = msgp.AppendArrayHeader(o, uint32(len(z.Result)))
for za0001 := range z.Result {
o, err = msgp.AppendIntf(o, z.Result[za0001])
if err != nil {
err = msgp.WrapError(err, "Result", za0001)
return
}
}
// string "error"
o = append(o, 0xa5, 0x65, 0x72, 0x72, 0x6f, 0x72)
// map header, size 2
// string "msg"
o = append(o, 0x82, 0xa3, 0x6d, 0x73, 0x67)
o = msgp.AppendString(o, z.Error.Message)
// string "error_code"
o = append(o, 0xaa, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x63, 0x6f, 0x64, 0x65)
o = msgp.AppendInt(o, z.Error.ErrorCode)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.Id, bts, err = msgp.ReadUint32Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Id")
return
}
case "result":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Result")
return
}
if cap(z.Result) >= int(zb0002) {
z.Result = (z.Result)[:zb0002]
} else {
z.Result = make([]interface{}, zb0002)
}
for za0001 := range z.Result {
z.Result[za0001], bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Result", za0001)
return
}
}
case "error":
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
for zb0003 > 0 {
zb0003--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
switch msgp.UnsafeString(field) {
case "msg":
z.Error.Message, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Error", "Message")
return
}
case "error_code":
z.Error.ErrorCode, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Error", "ErrorCode")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Error")
return
}
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Response) Msgsize() (s int) {
s = 1 + 3 + msgp.Uint32Size + 7 + msgp.ArrayHeaderSize
for za0001 := range z.Result {
s += msgp.GuessSize(z.Result[za0001])
}
s += 6 + 1 + 4 + msgp.StringPrefixSize + len(z.Error.Message) + 11 + msgp.IntSize
return
}

View file

@ -1,236 +0,0 @@
package proto
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalRequestError(t *testing.T) {
v := RequestError{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgRequestError(b *testing.B) {
v := RequestError{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgRequestError(b *testing.B) {
v := RequestError{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalRequestError(b *testing.B) {
v := RequestError{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeRequestError(t *testing.T) {
v := RequestError{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeRequestError Msgsize() is inaccurate")
}
vn := RequestError{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeRequestError(b *testing.B) {
v := RequestError{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeRequestError(b *testing.B) {
v := RequestError{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalResponse(t *testing.T) {
v := Response{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgResponse(b *testing.B) {
v := Response{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgResponse(b *testing.B) {
v := Response{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalResponse(b *testing.B) {
v := Response{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeResponse(t *testing.T) {
v := Response{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeResponse Msgsize() is inaccurate")
}
vn := Response{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeResponse(b *testing.B) {
v := Response{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeResponse(b *testing.B) {
v := Response{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

51
pkg/proto/row.go Normal file
View file

@ -0,0 +1,51 @@
package proto
import (
"bytes"
)
//go:generate msgp
/**
* In most cases we need streaming responses for the table data
**/
type Row struct {
Data []interface{} `msg:"r"`
}
func MarshalRow[T interface{}](columns []T) ([]byte, error) {
s := make([]interface{}, len(columns))
for i, v := range columns {
s[i] = v
}
row := Row{Data: s}
return row.MarshalMsg(nil)
}
func UnmarshalRows(input []byte) []Row {
tag := []byte{0x81, 0xa1, 0x72}
result := [][]byte{}
buf := []byte{}
count := 0
for count < len(input) {
if input[count] != 0x81 {
buf = append(buf, input[count])
count += 1
continue
}
seq := input[count : len(tag)+count]
if bytes.Equal(seq, tag) {
result = append(result, append(tag, buf...))
buf = []byte{}
} else {
buf = append(buf, seq...)
}
count += len(tag)
}
result = append(result, append(tag, buf...))
rows := make([]Row, len(result))
for i, r := range result {
rows[i].UnmarshalMsg(r)
}
return rows[1:]
}

153
pkg/proto/row_gen.go Normal file
View file

@ -0,0 +1,153 @@
package proto
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *Row) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "r":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
if cap(z.Data) >= int(zb0002) {
z.Data = (z.Data)[:zb0002]
} else {
z.Data = make([]interface{}, zb0002)
}
for za0001 := range z.Data {
z.Data[za0001], err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Data", za0001)
return
}
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *Row) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "r"
err = en.Append(0x81, 0xa1, 0x72)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.Data)))
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
for za0001 := range z.Data {
err = en.WriteIntf(z.Data[za0001])
if err != nil {
err = msgp.WrapError(err, "Data", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *Row) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "r"
o = append(o, 0x81, 0xa1, 0x72)
o = msgp.AppendArrayHeader(o, uint32(len(z.Data)))
for za0001 := range z.Data {
o, err = msgp.AppendIntf(o, z.Data[za0001])
if err != nil {
err = msgp.WrapError(err, "Data", za0001)
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Row) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "r":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
if cap(z.Data) >= int(zb0002) {
z.Data = (z.Data)[:zb0002]
} else {
z.Data = make([]interface{}, zb0002)
}
for za0001 := range z.Data {
z.Data[za0001], bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Data", za0001)
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Row) Msgsize() (s int) {
s = 1 + 2 + msgp.ArrayHeaderSize
for za0001 := range z.Data {
s += msgp.GuessSize(z.Data[za0001])
}
return
}

123
pkg/proto/row_gen_test.go Normal file
View file

@ -0,0 +1,123 @@
package proto
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalRow(t *testing.T) {
v := Row{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgRow(b *testing.B) {
v := Row{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgRow(b *testing.B) {
v := Row{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalRow(b *testing.B) {
v := Row{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeRow(t *testing.T) {
v := Row{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeRow Msgsize() is inaccurate")
}
vn := Row{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeRow(b *testing.B) {
v := Row{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeRow(b *testing.B) {
v := Row{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View file

@ -6,6 +6,8 @@ replace l12.xyz/dal/filters v0.0.0 => ../filters
replace l12.xyz/dal/builder v0.0.0 => ../builder replace l12.xyz/dal/builder v0.0.0 => ../builder
require l12.xyz/dal/adapter v0.0.0
replace l12.xyz/dal/adapter v0.0.0 => ../adapter replace l12.xyz/dal/adapter v0.0.0 => ../adapter
replace l12.xyz/dal/utils v0.0.0 => ../utils replace l12.xyz/dal/utils v0.0.0 => ../utils
@ -15,10 +17,11 @@ require l12.xyz/dal/proto v0.0.0
replace l12.xyz/dal/proto v0.0.0 => ../proto replace l12.xyz/dal/proto v0.0.0 => ../proto
require ( require (
github.com/mattn/go-sqlite3 v1.14.22
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/tinylib/msgp v1.2.0 // indirect github.com/tinylib/msgp v1.2.0 // indirect
l12.xyz/dal/adapter v0.0.0 // indirect
l12.xyz/dal/builder v0.0.0 // indirect l12.xyz/dal/builder v0.0.0 // indirect
l12.xyz/dal/filters v0.0.0 // indirect l12.xyz/dal/filters v0.0.0 // indirect
l12.xyz/dal/utils v0.0.0 // indirect l12.xyz/dal/utils v0.0.0 // indirect

View file

@ -1,3 +1,5 @@
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4= github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"reflect"
"l12.xyz/dal/adapter" "l12.xyz/dal/adapter"
"l12.xyz/dal/proto" "l12.xyz/dal/proto"
@ -37,19 +38,29 @@ func QueryHandler(db adapter.DBAdapter) http.Handler {
return return
} }
fmt.Println(query, "QueryHandler")
rows, err := db.Query(query) rows, err := db.Query(query)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
defer rows.Close()
w.Header().Set("Content-Type", "application/x-msgpack") w.Header().Set("Content-Type", "application/x-msgpack")
defer rows.Close()
columns, _ := rows.Columns()
types, _ := rows.ColumnTypes()
cols, _ := proto.MarshalRow(columns)
w.Write(cols)
for rows.Next() { for rows.Next() {
row := []byte{} data := make([]interface{}, len(columns))
rows.Scan(row) for i := range data {
w.Write(row) typ := reflect.New(types[i].ScanType()).Interface()
data[i] = &typ
}
rows.Scan(data...)
cols, _ := proto.MarshalRow(data)
w.Write(cols)
} }
}) })
} }

View file

@ -3,17 +3,50 @@ package server
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
_ "github.com/mattn/go-sqlite3"
"l12.xyz/dal/adapter" "l12.xyz/dal/adapter"
"l12.xyz/dal/proto"
) )
func TestQueryHandler(t *testing.T) { func TestQueryHandler(t *testing.T) {
body := []byte(`{ a := adapter.DBAdapter{Type: "sqlite3"}
"something": "wrong", db, err := a.Open("file::memory:?cache=shared")
}`) if err != nil {
t.Fatalf("failed to open db: %v", err)
}
defer db.Close()
_, err = db.Exec("CREATE TABLE test (id INTEGER PRIMARY KEY, name BLOB, data TEXT)")
if err != nil {
t.Fatalf("failed to create table: %v", err)
}
_, err = db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "test", "y")
if err != nil {
t.Fatalf("failed to insert data: %v", err)
}
data := proto.Request{
Id: 0,
Db: "file::memory:?cache=shared",
Commands: []proto.BuildCmd{
{Method: "In", Args: []interface{}{"test t"}},
{Method: "Find", Args: []interface{}{
map[string]interface{}{"id": 1},
}},
{Method: "Fields", Args: []interface{}{
map[string]interface{}{
"id": 1,
"name": "Name",
"data": 1,
},
}},
},
}
body, _ := data.MarshalMsg(nil)
req, err := http.NewRequest("POST", "/", bytes.NewBuffer(body)) req, err := http.NewRequest("POST", "/", bytes.NewBuffer(body))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -23,5 +56,7 @@ func TestQueryHandler(t *testing.T) {
Type: "sqlite3", Type: "sqlite3",
}) })
handler.ServeHTTP(rr, req) handler.ServeHTTP(rr, req)
fmt.Println(rr.Code == 400) res, _ := io.ReadAll(rr.Result().Body)
result := proto.UnmarshalRows(res)
fmt.Println(result)
} }