[feat] basic query builder for js; [feat] support chunked responses

Signed-off-by: Anton Nesterov <anton@demiurg.io>
This commit is contained in:
Anton Nesterov 2024-08-15 13:31:56 +02:00
parent aa580b158e
commit 7457d8ac81
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
11 changed files with 195 additions and 14 deletions

View file

@ -1,5 +1,5 @@
import type { Request } from "./Protocol";
import { METHODS } from "./Protocol";
import { METHODS, encodeRequest, decodeRowsIterator } from "./Protocol";
type Primitive = string | number | boolean | null;
@ -29,17 +29,23 @@ type JoinFilter = {
$as?: JoinCondition;
};
export type SortOptions = Record<string, 1 | -1 | "asc" | "desc">;
type SortOptions = Record<string, 1 | -1 | "asc" | "desc">;
type Options = {
database: string;
url: string;
};
export default class Builder {
private request: Request;
constructor(database: string) {
private url: string;
constructor(opts: Options) {
this.request = {
id: 0,
db: database,
db: opts.database,
commands: [],
};
this.url = opts.url;
}
private format(): void {
this.request.commands = METHODS.map((method) => {
@ -111,5 +117,27 @@ export default class Builder {
this.request.commands.push({ method: "DoNothing", args: [] });
return this;
}
async *Rows() {
this.format();
const response = await fetch(this.url, {
method: "POST",
body: new Blob([encodeRequest(this.request)]),
headers: {
"Content-Type": "application/x-msgpack",
},
});
if (response.status !== 200) {
throw new Error(await response.text());
}
for await (const row of decodeRowsIterator(response.body!)) {
yield row;
}
this.request = {
id: 0,
db: this.request.db,
commands: [],
};
}
}

View file

@ -1,4 +1,4 @@
import { encode } from '@msgpack/msgpack';
import { encode, decode } from '@msgpack/msgpack';
export interface Method {
method: string;
@ -16,3 +16,54 @@ export const METHODS = "In|Find|Select|Fields|Join|Group|Sort|Limit|Offset|Delet
export function encodeRequest(request: Request): Uint8Array {
return encode(request);
}
export interface Row {
r: unknown[];
}
const ROW_TAG = [0x81, 0xa1, 0x72];
export function decodeRows(input: Uint8Array): Row[] {
const rows = [];
let count = 0;
let buf = [];
while (count < input.length) {
if (input.at(count) != 0x81) {
buf.push(input.at(count));
count++;
continue
}
const [a, b, c] = ROW_TAG;
const [aa, bb, cc] = input.slice(count, count + 4);
if (aa == a && bb == b && cc == c) {
rows.push([...ROW_TAG, ...buf]);
buf = [];
count += 3;
} else {
buf.push(input.at(count));
count++;
}
}
rows.push([...ROW_TAG, ...buf]);
rows.shift();
return rows.map((row) => decode(new Uint8Array(row as number[]))) as Row[];
}
export async function *decodeRowsIterator(stream: ReadableStream<Uint8Array>): AsyncGenerator<Row> {
const reader = stream.getReader();
let buf = new Uint8Array();
for (;;) {
const { value, done } = await reader.read();
if (done) {
console.log("done");
break;
}
buf = new Uint8Array([...buf, ...value]);
// the server flushes after each row
// so we decode "complete" rows
const rows = decodeRows(buf);
for (const row of rows) {
yield row;
}
}
}

View file

@ -0,0 +1,24 @@
import { test, expect } from "bun:test";
import { DAL } from ".."
const options = {
database: "test.sqlite",
url: "http://localhost:8111",
}
test("Rows iter", async () => {
const dal = new DAL(options);
const rows = dal
.In("test t")
.Find({
id: 1,
})
.Rows();
for await (const row of rows) {
// console.log(row);
//@ts-ignore
expect(row.r.length).toBe(3);
}
expect(true).toBe(true);
});

31
dal/__test__/srv/go.mod Normal file
View file

@ -0,0 +1,31 @@
module srv
go 1.22.6
replace l12.xyz/dal/filters v0.0.0 => ../../../pkg/filters
replace l12.xyz/dal/builder v0.0.0 => ../../../pkg/builder
require l12.xyz/dal/adapter v0.0.0
replace l12.xyz/dal/adapter v0.0.0 => ../../../pkg/adapter
replace l12.xyz/dal/utils v0.0.0 => ../../../pkg/utils
require l12.xyz/dal/proto v0.0.0 // indirect
replace l12.xyz/dal/proto v0.0.0 => ../../../pkg/proto
require l12.xyz/dal/server v0.0.0
replace l12.xyz/dal/server v0.0.0 => ../../../pkg/server
require (
github.com/mattn/go-sqlite3 v1.14.22
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/tinylib/msgp v1.2.0 // indirect
l12.xyz/dal/builder v0.0.0 // indirect
l12.xyz/dal/filters v0.0.0 // indirect
l12.xyz/dal/utils v0.0.0 // indirect
)

8
dal/__test__/srv/go.sum Normal file
View file

@ -0,0 +1,8 @@
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY=
github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro=

32
dal/__test__/srv/main.go Normal file
View file

@ -0,0 +1,32 @@
package main
import (
"fmt"
"net/http"
_ "github.com/mattn/go-sqlite3"
"l12.xyz/dal/adapter"
"l12.xyz/dal/server"
)
func mock(adapter adapter.DBAdapter) {
db, _ := adapter.Open("test.sqlite")
defer db.Close()
db.Exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name BLOB, data TEXT)")
db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "test", "y")
db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "tost", "x")
db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "foo", "a")
db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "bar", "b")
}
func main() {
db := adapter.DBAdapter{
Type: "sqlite3",
}
mock(db)
queryHandler := server.QueryHandler(db)
mux := http.NewServeMux()
mux.Handle("/", queryHandler)
fmt.Println("Server running on port 8111")
http.ListenAndServe(":8111", mux)
}

Binary file not shown.

View file

@ -0,0 +1 @@
export { default as DAL } from './Builder';

View file

@ -10,5 +10,10 @@
},
"dependencies": {
"@msgpack/msgpack": "^3.0.0-beta2"
},
"scripts": {
"test:client": "bun test:*",
"test:dal" : "bun test dal/__test__",
"test:serve": "cd dal/__test__/srv && go run main.go"
}
}

View file

@ -22,13 +22,7 @@ func QueryHandler(db adapter.DBAdapter) http.Handler {
dialect := adapter.GetDialect(db.Type)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bodyReader, err := r.GetBody()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body, err := io.ReadAll(bodyReader)
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -50,12 +44,18 @@ func QueryHandler(db adapter.DBAdapter) http.Handler {
}
defer rows.Close()
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("Content-Type", "application/x-msgpack")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "expected http.ResponseWriter to be an http.Flusher", http.StatusInternalServerError)
return
}
columns, _ := rows.Columns()
types, _ := rows.ColumnTypes()
cols, _ := proto.MarshalRow(columns)
w.Write(cols)
flusher.Flush()
for rows.Next() {
data := make([]interface{}, len(columns))
@ -66,6 +66,7 @@ func QueryHandler(db adapter.DBAdapter) http.Handler {
rows.Scan(data...)
cols, _ := proto.MarshalRow(data)
w.Write(cols)
flusher.Flush()
}
})
}

View file

@ -33,7 +33,7 @@ func TestQueryHandler(t *testing.T) {
data := proto.Request{
Id: 0,
Db: "file::memory:?cache=shared",
Commands: []proto.BuildCmd{
Commands: []proto.BuilderMethod{
{Method: "In", Args: []interface{}{"test t"}},
{Method: "Find", Args: []interface{}{
map[string]interface{}{"id": 1},