diff --git a/dal/Builder.ts b/dal/Builder.ts index c005800..98ed72a 100644 --- a/dal/Builder.ts +++ b/dal/Builder.ts @@ -1,5 +1,5 @@ import type { Request } from "./Protocol"; -import { METHODS } from "./Protocol"; +import { METHODS, encodeRequest, decodeRowsIterator } from "./Protocol"; type Primitive = string | number | boolean | null; @@ -29,17 +29,23 @@ type JoinFilter = { $as?: JoinCondition; }; -export type SortOptions = Record; +type SortOptions = Record; +type Options = { + database: string; + url: string; +}; export default class Builder { private request: Request; - constructor(database: string) { + private url: string; + constructor(opts: Options) { this.request = { id: 0, - db: database, + db: opts.database, commands: [], }; + this.url = opts.url; } private format(): void { this.request.commands = METHODS.map((method) => { @@ -111,5 +117,27 @@ export default class Builder { this.request.commands.push({ method: "DoNothing", args: [] }); return this; } + async *Rows() { + this.format(); + const response = await fetch(this.url, { + method: "POST", + body: new Blob([encodeRequest(this.request)]), + headers: { + "Content-Type": "application/x-msgpack", + }, + }); + if (response.status !== 200) { + throw new Error(await response.text()); + } + + for await (const row of decodeRowsIterator(response.body!)) { + yield row; + } + this.request = { + id: 0, + db: this.request.db, + commands: [], + }; + } } \ No newline at end of file diff --git a/dal/Protocol.ts b/dal/Protocol.ts index b16aa8d..96741bb 100644 --- a/dal/Protocol.ts +++ b/dal/Protocol.ts @@ -1,4 +1,4 @@ -import { encode } from '@msgpack/msgpack'; +import { encode, decode } from '@msgpack/msgpack'; export interface Method { method: string; @@ -15,4 +15,55 @@ export const METHODS = "In|Find|Select|Fields|Join|Group|Sort|Limit|Offset|Delet export function encodeRequest(request: Request): Uint8Array { return encode(request); +} + +export interface Row { + r: unknown[]; +} + +const ROW_TAG = [0x81, 0xa1, 0x72]; + +export function decodeRows(input: Uint8Array): Row[] { + const rows = []; + let count = 0; + let buf = []; + while (count < input.length) { + if (input.at(count) != 0x81) { + buf.push(input.at(count)); + count++; + continue + } + const [a, b, c] = ROW_TAG; + const [aa, bb, cc] = input.slice(count, count + 4); + if (aa == a && bb == b && cc == c) { + rows.push([...ROW_TAG, ...buf]); + buf = []; + count += 3; + } else { + buf.push(input.at(count)); + count++; + } + } + rows.push([...ROW_TAG, ...buf]); + rows.shift(); + return rows.map((row) => decode(new Uint8Array(row as number[]))) as Row[]; +} + +export async function *decodeRowsIterator(stream: ReadableStream): AsyncGenerator { + const reader = stream.getReader(); + let buf = new Uint8Array(); + for (;;) { + const { value, done } = await reader.read(); + if (done) { + console.log("done"); + break; + } + buf = new Uint8Array([...buf, ...value]); + // the server flushes after each row + // so we decode "complete" rows + const rows = decodeRows(buf); + for (const row of rows) { + yield row; + } + } } \ No newline at end of file diff --git a/dal/__test__/protocol.test.ts b/dal/__test__/protocol.test.ts new file mode 100644 index 0000000..b56e316 --- /dev/null +++ b/dal/__test__/protocol.test.ts @@ -0,0 +1,24 @@ +import { test, expect } from "bun:test"; +import { DAL } from ".." + +const options = { + database: "test.sqlite", + url: "http://localhost:8111", +} + + +test("Rows iter", async () => { + const dal = new DAL(options); + const rows = dal + .In("test t") + .Find({ + id: 1, + }) + .Rows(); + for await (const row of rows) { + // console.log(row); + //@ts-ignore + expect(row.r.length).toBe(3); + } + expect(true).toBe(true); +}); diff --git a/dal/__test__/srv/go.mod b/dal/__test__/srv/go.mod new file mode 100644 index 0000000..449cd56 --- /dev/null +++ b/dal/__test__/srv/go.mod @@ -0,0 +1,31 @@ +module srv + +go 1.22.6 + +replace l12.xyz/dal/filters v0.0.0 => ../../../pkg/filters + +replace l12.xyz/dal/builder v0.0.0 => ../../../pkg/builder + +require l12.xyz/dal/adapter v0.0.0 + +replace l12.xyz/dal/adapter v0.0.0 => ../../../pkg/adapter + +replace l12.xyz/dal/utils v0.0.0 => ../../../pkg/utils + +require l12.xyz/dal/proto v0.0.0 // indirect + +replace l12.xyz/dal/proto v0.0.0 => ../../../pkg/proto + +require l12.xyz/dal/server v0.0.0 + +replace l12.xyz/dal/server v0.0.0 => ../../../pkg/server + +require ( + github.com/mattn/go-sqlite3 v1.14.22 + github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/tinylib/msgp v1.2.0 // indirect + l12.xyz/dal/builder v0.0.0 // indirect + l12.xyz/dal/filters v0.0.0 // indirect + l12.xyz/dal/utils v0.0.0 // indirect +) diff --git a/dal/__test__/srv/go.sum b/dal/__test__/srv/go.sum new file mode 100644 index 0000000..5cf32ca --- /dev/null +++ b/dal/__test__/srv/go.sum @@ -0,0 +1,8 @@ +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4= +github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY= +github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro= diff --git a/dal/__test__/srv/main.go b/dal/__test__/srv/main.go new file mode 100644 index 0000000..2e8e2fd --- /dev/null +++ b/dal/__test__/srv/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + "net/http" + + _ "github.com/mattn/go-sqlite3" + "l12.xyz/dal/adapter" + "l12.xyz/dal/server" +) + +func mock(adapter adapter.DBAdapter) { + db, _ := adapter.Open("test.sqlite") + defer db.Close() + db.Exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name BLOB, data TEXT)") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "test", "y") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "tost", "x") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "foo", "a") + db.Exec("INSERT INTO test (name, data) VALUES (?,?)", "bar", "b") +} + +func main() { + db := adapter.DBAdapter{ + Type: "sqlite3", + } + mock(db) + queryHandler := server.QueryHandler(db) + mux := http.NewServeMux() + mux.Handle("/", queryHandler) + fmt.Println("Server running on port 8111") + http.ListenAndServe(":8111", mux) +} diff --git a/dal/__test__/srv/test.sqlite b/dal/__test__/srv/test.sqlite new file mode 100644 index 0000000..dae1d3e Binary files /dev/null and b/dal/__test__/srv/test.sqlite differ diff --git a/dal/index.ts b/dal/index.ts index e69de29..6486bff 100644 --- a/dal/index.ts +++ b/dal/index.ts @@ -0,0 +1 @@ +export { default as DAL } from './Builder'; \ No newline at end of file diff --git a/package.json b/package.json index 206dca6..86e7953 100644 --- a/package.json +++ b/package.json @@ -10,5 +10,10 @@ }, "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2" + }, + "scripts": { + "test:client": "bun test:*", + "test:dal" : "bun test dal/__test__", + "test:serve": "cd dal/__test__/srv && go run main.go" } } \ No newline at end of file diff --git a/pkg/server/query_handler.go b/pkg/server/query_handler.go index 5f83e2c..a1738ff 100644 --- a/pkg/server/query_handler.go +++ b/pkg/server/query_handler.go @@ -22,13 +22,7 @@ func QueryHandler(db adapter.DBAdapter) http.Handler { dialect := adapter.GetDialect(db.Type) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - bodyReader, err := r.GetBody() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - body, err := io.ReadAll(bodyReader) + body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -50,12 +44,18 @@ func QueryHandler(db adapter.DBAdapter) http.Handler { } defer rows.Close() + w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("Content-Type", "application/x-msgpack") - + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "expected http.ResponseWriter to be an http.Flusher", http.StatusInternalServerError) + return + } columns, _ := rows.Columns() types, _ := rows.ColumnTypes() cols, _ := proto.MarshalRow(columns) w.Write(cols) + flusher.Flush() for rows.Next() { data := make([]interface{}, len(columns)) @@ -66,6 +66,7 @@ func QueryHandler(db adapter.DBAdapter) http.Handler { rows.Scan(data...) cols, _ := proto.MarshalRow(data) w.Write(cols) + flusher.Flush() } }) } diff --git a/pkg/server/query_handler_test.go b/pkg/server/query_handler_test.go index 8f6f624..ee34ef2 100644 --- a/pkg/server/query_handler_test.go +++ b/pkg/server/query_handler_test.go @@ -33,7 +33,7 @@ func TestQueryHandler(t *testing.T) { data := proto.Request{ Id: 0, Db: "file::memory:?cache=shared", - Commands: []proto.BuildCmd{ + Commands: []proto.BuilderMethod{ {Method: "In", Args: []interface{}{"test t"}}, {Method: "Find", Args: []interface{}{ map[string]interface{}{"id": 1},