From 7457d8ac81d764be8b9791ea86ef216cc7ad711d Mon Sep 17 00:00:00 2001 From: Anton Nesterov Date: Thu, 15 Aug 2024 13:31:56 +0200 Subject: [PATCH] [feat] basic query builder for js; [feat] support chunked responses Signed-off-by: Anton Nesterov --- dal/Builder.ts | 36 ++++++++++++++++++--- dal/Protocol.ts | 53 ++++++++++++++++++++++++++++++- dal/__test__/protocol.test.ts | 24 ++++++++++++++ dal/__test__/srv/go.mod | 31 ++++++++++++++++++ dal/__test__/srv/go.sum | 8 +++++ dal/__test__/srv/main.go | 32 +++++++++++++++++++ dal/__test__/srv/test.sqlite | Bin 0 -> 8192 bytes dal/index.ts | 1 + package.json | 5 +++ pkg/server/query_handler.go | 17 +++++----- pkg/server/query_handler_test.go | 2 +- 11 files changed, 195 insertions(+), 14 deletions(-) create mode 100644 dal/__test__/protocol.test.ts create mode 100644 dal/__test__/srv/go.mod create mode 100644 dal/__test__/srv/go.sum create mode 100644 dal/__test__/srv/main.go create mode 100644 dal/__test__/srv/test.sqlite diff --git a/dal/Builder.ts b/dal/Builder.ts index c005800..98ed72a 100644 --- a/dal/Builder.ts +++ b/dal/Builder.ts @@ -1,5 +1,5 @@ import type { Request } from "./Protocol"; -import { METHODS } from "./Protocol"; +import { METHODS, encodeRequest, decodeRowsIterator } from "./Protocol"; type Primitive = string | number | boolean | null; @@ -29,17 +29,23 @@ type JoinFilter = { $as?: JoinCondition; }; -export type SortOptions = Record; +type SortOptions = Record; +type Options = { + database: string; + url: string; +}; export default class Builder { private request: Request; - constructor(database: string) { + private url: string; + constructor(opts: Options) { this.request = { id: 0, - db: database, + db: opts.database, commands: [], }; + this.url = opts.url; } private format(): void { this.request.commands = METHODS.map((method) => { @@ -111,5 +117,27 @@ export default class Builder { this.request.commands.push({ method: "DoNothing", args: [] }); return this; } + async *Rows() { + this.format(); + const response = await fetch(this.url, { + method: "POST", + body: new Blob([encodeRequest(this.request)]), + headers: { + "Content-Type": "application/x-msgpack", + }, + }); + if (response.status !== 200) { + throw new Error(await response.text()); + } + + for await (const row of decodeRowsIterator(response.body!)) { + yield row; + } + this.request = { + id: 0, + db: this.request.db, + commands: [], + }; + } } \ No newline at end of file diff --git a/dal/Protocol.ts b/dal/Protocol.ts index b16aa8d..96741bb 100644 --- a/dal/Protocol.ts +++ b/dal/Protocol.ts @@ -1,4 +1,4 @@ -import { encode } from '@msgpack/msgpack'; +import { encode, decode } from '@msgpack/msgpack'; export interface Method { method: string; @@ -15,4 +15,55 @@ export const METHODS = "In|Find|Select|Fields|Join|Group|Sort|Limit|Offset|Delet export function encodeRequest(request: Request): Uint8Array { return encode(request); +} + +export interface Row { + r: unknown[]; +} + +const ROW_TAG = [0x81, 0xa1, 0x72]; + +export function decodeRows(input: Uint8Array): Row[] { + const rows = []; + let count = 0; + let buf = []; + while (count < input.length) { + if (input.at(count) != 0x81) { + buf.push(input.at(count)); + count++; + continue + } + const [a, b, c] = ROW_TAG; + const [aa, bb, cc] = input.slice(count, count + 4); + if (aa == a && bb == b && cc == c) { + rows.push([...ROW_TAG, ...buf]); + buf = []; + count += 3; + } else { + buf.push(input.at(count)); + count++; + } + } + rows.push([...ROW_TAG, ...buf]); + rows.shift(); + return rows.map((row) => decode(new Uint8Array(row as number[]))) as Row[]; +} + +export async function *decodeRowsIterator(stream: ReadableStream): AsyncGenerator { + const reader = stream.getReader(); + let buf = new Uint8Array(); + for (;;) { + const { value, done } = await reader.read(); + if (done) { + console.log("done"); + break; + } + buf = new Uint8Array([...buf, ...value]); + // the server flushes after each row + // so we decode "complete" rows + const rows = decodeRows(buf); + for (const row of rows) { + yield row; + } + } } \ No newline at end of file diff --git a/dal/__test__/protocol.test.ts b/dal/__test__/protocol.test.ts new file mode 100644 index 0000000..b56e316 --- /dev/null +++ b/dal/__test__/protocol.test.ts @@ -0,0 +1,24 @@ +import { test, expect } from "bun:test"; +import { DAL } from ".." + +const options = { + database: "test.sqlite", + url: "http://localhost:8111", +} + + +test("Rows iter", async () => { + const dal = new DAL(options); + const rows = dal + .In("test t") + .Find({ + id: 1, + }) + .Rows(); + for await (const row of rows) { + // console.log(row); + //@ts-ignore + expect(row.r.length).toBe(3); + } + expect(true).toBe(true); +}); diff --git a/dal/__test__/srv/go.mod b/dal/__test__/srv/go.mod new file mode 100644 index 0000000..449cd56 --- /dev/null +++ b/dal/__test__/srv/go.mod @@ -0,0 +1,31 @@ +module srv + +go 1.22.6 + +replace l12.xyz/dal/filters v0.0.0 => ../../../pkg/filters + +replace l12.xyz/dal/builder v0.0.0 => ../../../pkg/builder + +require l12.xyz/dal/adapter v0.0.0 + +replace l12.xyz/dal/adapter v0.0.0 => ../../../pkg/adapter + +replace l12.xyz/dal/utils v0.0.0 => ../../../pkg/utils + +require l12.xyz/dal/proto v0.0.0 // indirect + +replace l12.xyz/dal/proto v0.0.0 => ../../../pkg/proto + +require l12.xyz/dal/server v0.0.0 + +replace l12.xyz/dal/server v0.0.0 => ../../../pkg/server + +require ( + github.com/mattn/go-sqlite3 v1.14.22 + github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/tinylib/msgp v1.2.0 // indirect + l12.xyz/dal/builder v0.0.0 // indirect + l12.xyz/dal/filters v0.0.0 // indirect + l12.xyz/dal/utils v0.0.0 // indirect +) diff --git a/dal/__test__/srv/go.sum b/dal/__test__/srv/go.sum new file mode 100644 index 0000000..5cf32ca --- /dev/null +++ b/dal/__test__/srv/go.sum @@ -0,0 +1,8 @@ +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4= +github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY= +github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro= diff --git a/dal/__test__/srv/main.go b/dal/__test__/srv/main.go new file mode 100644 index 0000000..2e8e2fd --- /dev/null +++ b/dal/__test__/srv/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + "net/http" + + _ "github.com/mattn/go-sqlite3" + "l12.xyz/dal/adapter" + "l12.xyz/dal/server" +) + +func mock(adapter adapter.DBAdapter) { + db, _ := adapter.Open("test.sqlite") + defer db.Close() + db.Exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name BLOB, data TEXT)") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "test", "y") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "tost", "x") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "foo", "a") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "bar", "b") +} + +func main() { + db := adapter.DBAdapter{ + Type: "sqlite3", + } + mock(db) + queryHandler := server.QueryHandler(db) + mux := http.NewServeMux() + mux.Handle("/", queryHandler) + fmt.Println("Server running on port 8111") + http.ListenAndServe(":8111", mux) +} diff --git a/dal/__test__/srv/test.sqlite b/dal/__test__/srv/test.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..dae1d3e3c14d29ba9851f4c98bd5cbbe793f1d05 GIT binary patch literal 8192 zcmeI#ze~eF6bJCT8f~HtPmtjHMup`r)NG~nK0uJ%;M$64