[fix] handle errors

Signed-off-by: Anton Nesterov <anton@demiurg.io>
This commit is contained in:
Anton Nesterov 2024-09-03 01:11:17 +02:00
parent 059fe4ca76
commit e62a155e04
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
15 changed files with 107 additions and 84 deletions

View file

@ -1,4 +1,4 @@
import type { Request, ExecResult } from "./Protocol"; import type { Request, ExecResult, IError } from "./Protocol";
import { import {
METHODS, METHODS,
encodeRequest, encodeRequest,
@ -163,7 +163,7 @@ export default class Builder<I extends abstract new (...args: any) => any> {
this.dtoTemplate = template; this.dtoTemplate = template;
return this; return this;
} }
async *Rows<T = InstanceType<I>>(): AsyncGenerator<T> { async *Rows<T = InstanceType<I>>(): AsyncGenerator<[T, IError]> {
this.formatRequest(); this.formatRequest();
const response = await fetch(this.url, { const response = await fetch(this.url, {
method: "POST", method: "POST",
@ -174,26 +174,38 @@ export default class Builder<I extends abstract new (...args: any) => any> {
}, },
}); });
if (response.status !== 200) { if (response.status !== 200) {
throw new Error(await response.text()); return [[], await response.text()];
} }
const iterator = decodeRowsIterator(response.body!); const iterator = decodeRowsIterator(response.body!);
for await (const row of iterator) { for await (const result of iterator) {
const [row, err] = result;
if (err) {
yield [{} as T, err];
return;
}
if (this.headerRow === null) { if (this.headerRow === null) {
this.headerRow = row.r; this.headerRow = row.r;
continue; continue;
} }
yield this.formatRow(row.r); yield [this.formatRow(row.r), null];
} }
} }
async Query<T = InstanceType<I>>(): Promise<T[]> { async Query<T = InstanceType<I>>(): Promise<T[]> {
const rows = this.Rows(); const rows = this.Rows();
const result: T[] = []; const result: T[] = [];
for await (const row of rows) { for await (const res of rows) {
const [row, error] = res;
if (error) {
if (String(error).includes("RangeError")) {
break;
}
throw new Error(error);
}
result.push(row); result.push(row);
} }
return result; return result;
} }
async Exec(): Promise<ExecResult> { async Exec(): Promise<[ExecResult, IError]> {
this.formatRequest(); this.formatRequest();
const response = await fetch(this.url, { const response = await fetch(this.url, {
method: "POST", method: "POST",

View file

@ -15,13 +15,15 @@ export interface ExecResult {
Id: number; Id: number;
RowsAffected: number; RowsAffected: number;
LastInsertId: number; LastInsertId: number;
Msg?: string; Error?: string;
} }
export interface Row { export interface Row {
r: unknown[]; r: unknown[];
} }
export type IError = any;
export const METHODS = export const METHODS =
"Raw|In|Find|Select|Fields|Join|Group|Sort|Limit|Offset|Delete|Insert|Set|Update|OnConflict|DoUpdate|DoNothing|Tx".split( "Raw|In|Find|Select|Fields|Join|Group|Sort|Limit|Offset|Delete|Insert|Set|Update|OnConflict|DoUpdate|DoNothing|Tx".split(
"|", "|",
@ -31,28 +33,29 @@ export function encodeRequest(request: Request): Uint8Array {
return encode(request); return encode(request);
} }
export function decodeResponse(input: Uint8Array): ExecResult | null { export function decodeResponse(input: Uint8Array): [ExecResult, IError] {
try { try {
const res = decode(input) as { const res = decode(input) as {
i: number; i: number;
ra: number; ra: number;
li: number; li: number;
m?: string; e?: string;
}; };
return { const result = {
Id: res.i, Id: res.i,
RowsAffected: res.ra, RowsAffected: res.ra,
LastInsertId: res.li, LastInsertId: res.li,
Msg: res.m, Error: res.e,
}; };
return [result, result.Error];
} catch (e) { } catch (e) {
return null; return [{} as ExecResult, e];
} }
} }
const ROW_TAG = [0x81, 0xa1, 0x72]; const ROW_TAG = [0x81, 0xa1, 0x72];
export function decodeRows(input: Uint8Array): Row[] | null { export function decodeRows(input: Uint8Array): [Row[], IError] {
try { try {
const rows = []; const rows = [];
let count = 0; let count = 0;
@ -76,24 +79,31 @@ export function decodeRows(input: Uint8Array): Row[] | null {
} }
rows.push([...ROW_TAG, ...buf]); rows.push([...ROW_TAG, ...buf]);
rows.shift(); rows.shift();
return rows.map((row) => decode(new Uint8Array(row as number[]))) as Row[]; return [
rows.map((row) => decode(new Uint8Array(row as number[]))) as Row[],
null,
];
} catch (e) { } catch (e) {
return null; return [[], e];
} }
} }
export async function* decodeRowsIterator( export async function* decodeRowsIterator(
stream: ReadableStream<Uint8Array>, stream: ReadableStream<Uint8Array>,
): AsyncGenerator<Row> { ): AsyncGenerator<[Row, IError]> {
const reader = stream.getReader(); const reader = stream.getReader();
for (;;) { for (;;) {
const { value, done } = await reader.read(); const { value, done } = await reader.read();
if (done) { if (done) {
break; break;
} }
const rows = decodeRows(value); const [rows, err] = decodeRows(value);
for (const row of rows!) { if (err) {
yield row; yield [{} as Row, err];
break;
}
for (const row of rows) {
yield [row, null];
} }
} }
} }

View file

@ -1,5 +1,5 @@
import fs from "fs"; import fs from "fs";
import dal from "../Bunding"; import dal from "../BunFFI";
const Mb = (num) => Math.round(num / 1024 / 1024); const Mb = (num) => Math.round(num / 1024 / 1024);

View file

@ -21,8 +21,8 @@ test("Rows iter, no format", async () => {
id: 1, id: 1,
}) })
.Rows<any[]>(); .Rows<any[]>();
for await (const row of rows) { for await (const result of rows) {
//console.log(row); const [row, error] = result;
expect(row.length).toBe(3); expect(row.length).toBe(3);
} }
expect(true).toBe(true); expect(true).toBe(true);

Binary file not shown.

View file

@ -13,7 +13,7 @@ type SQLite = {
let Library: SQLite; let Library: SQLite;
if (process.isBun) { if (process.isBun) {
Library = require("./Bunding") as SQLite; Library = require("./BunFFI") as SQLite;
} else { } else {
Library = require("./Binding") as SQLite; Library = require("./Binding") as SQLite;
} }

View file

@ -1,12 +1,10 @@
import Builder from "./Builder"; import Builder from "./Builder";
import Bunding from "./Library"; import Napi from "./napi";
import { encodeRequest, decodeRows, decodeResponse } from "./Protocol"; import { encodeRequest, decodeRows, decodeResponse } from "./Protocol";
import type { ExecResult } from "./Protocol"; import type { ExecResult, IError } from "./Protocol";
//@ts-ignore //@ts-ignore
const Binding = Bunding.default ?? Bunding; const Binding = Napi.default ?? Napi;
Binding.initSQLite(Buffer.from(" "));
type Options = { type Options = {
database: string; database: string;
@ -15,27 +13,26 @@ type Options = {
/** /**
* Allows to use SQLite databases in a NodeJS process. * Allows to use SQLite databases in a NodeJS process.
*/ */
export default class CBuilder< export default class C <
I extends abstract new (...args: any) => any, I extends abstract new (...args: any) => any,
> extends Builder<I> { > extends Builder<I> {
constructor(opts: Options) { constructor(opts: Options) {
super({ database: opts.database, url: "" }); super({ database: opts.database, url: "" });
} }
/** async *Rows<T = InstanceType<I>>(): AsyncGenerator<[T, IError]> {
* TODO: handle responses
*/
async *Rows<T = InstanceType<I>>(): AsyncGenerator<T> {
this.formatRequest(); this.formatRequest();
const req = Buffer.from(encodeRequest(this.request)); const req = Buffer.from(encodeRequest(this.request));
const iter = Binding.rowIterator(req); const iter = Binding.rowIterator(req);
for (;;) { for (;;) {
const response = iter.next() as Buffer; const data = iter.next() as Buffer;
const error = decodeResponse(response); const [_, error] = decodeResponse(data);
if (error?.Msg) { if (error) {
throw new Error(error.Msg); yield [{} as T, error];
iter.cleanup();
return;
} }
const rows = decodeRows(response); const [rows, err] = decodeRows(data);
if (!rows || rows.length === 0) { if (err || !rows || rows.length === 0) {
iter.cleanup(); iter.cleanup();
return; return;
} }
@ -44,19 +41,11 @@ export default class CBuilder<
this.headerRow = row.r; this.headerRow = row.r;
continue; continue;
} }
yield this.formatRow(row.r); yield [this.formatRow(row.r), null];
} }
} }
} }
async Query<T = InstanceType<I>>(): Promise<T[]> { async Exec(): Promise<[ExecResult, IError]> {
const rows = this.Rows();
const result: T[] = [];
for await (const row of rows) {
result.push(row);
}
return result;
}
async Exec(): Promise<ExecResult> {
this.formatRequest(); this.formatRequest();
const req = Buffer.from(encodeRequest(this.request)); const req = Buffer.from(encodeRequest(this.request));
const iter = Binding.rowIterator(req); const iter = Binding.rowIterator(req);

View file

@ -9,7 +9,7 @@ bun install
To run: To run:
```bash ```bash
bun run index.ts bun test
``` ```
This project was created using `bun init` in bun v1.1.25. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime. This project was created using `bun init` in bun v1.1.25. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime.

View file

@ -1,6 +1,6 @@
import { describe, expect, test } from "bun:test"; import { describe, expect, test } from "bun:test";
import path from "path"; import path from "path";
import DAL from "@nesterow/dal/client/libdal"; import DAL from "@nesterow/dal/client/native";
// in this case we need to use absolute path // in this case we need to use absolute path
const DATABASE_PATH = path.join(import.meta.dir, "..", "data", "chinook.db"); const DATABASE_PATH = path.join(import.meta.dir, "..", "data", "chinook.db");
@ -10,7 +10,7 @@ const db = new DAL({
}); });
describe("Query Interface", () => { describe("Query Interface", () => {
test(".Find", async () => { test(".Find [find 10 artists whose names start with 'A']", async () => {
const items = db const items = db
.In("artists") .In("artists")
.Find({ .Find({
@ -19,14 +19,15 @@ describe("Query Interface", () => {
.Limit(10) .Limit(10)
.Rows(); .Rows();
for await (const item of items) { for await (const result of items) {
const [item, error] = result;
console.log(item); console.log(item);
} }
expect(true).toBe(true); expect(true).toBe(true);
}); });
test(".Find.As", async () => { test(".Find.As [find 5 artists whose names start with 'B'; Represent each row as an Artist object]", async () => {
class Artist { class Artist {
ArtistId = 0; ArtistId = 0;
Name = ""; Name = "";
@ -38,13 +39,23 @@ describe("Query Interface", () => {
name: { $glob: "B*" }, name: { $glob: "B*" },
}) })
.As(Artist) .As(Artist)
.Limit(1) .Limit(5)
.Rows(); .Rows();
for await (const item of items) { for await (const result of items) {
console.log(item); const [item, error] = result;
console.log(123, item);
} }
console.log("done");
const all_rows = await db
.In("artists")
.Find({
name: { $glob: "B*" },
})
.As(Artist)
.Limit(5)
.Query();
expect(true).toBe(true); expect(true).toBe(true);
}); });
}); });

View file

@ -1,6 +1,6 @@
import { describe, expect, test } from "bun:test"; import { describe, expect, test } from "bun:test";
import path from "path"; import path from "path";
import DAL from "@nesterow/dal/client/libdal"; import DAL from "@nesterow/dal/client/native";
// in this case we need to use absolute path // in this case we need to use absolute path
const DATABASE_PATH = path.join(import.meta.dir, "..", "data", "chinook.db"); const DATABASE_PATH = path.join(import.meta.dir, "..", "data", "chinook.db");
@ -37,16 +37,17 @@ describe("Query Interface", () => {
"ar.Name": { $glob: "A*" }, "ar.Name": { $glob: "A*" },
}) })
.Fields({ .Fields({
"tr.TrackId" : "TrackId", "tr.TrackId": "TrackId",
"tr.Name" : "TrackName", "tr.Name": "TrackName",
"ar.Name" : "ArtistName", "ar.Name": "ArtistName",
"al.Title" : "AlbumTitle", "al.Title": "AlbumTitle",
}) })
.Limit(10) .Limit(10)
.As(Album) .As(Album)
.Rows(); .Rows();
for await (const item of items) { for await (const result of items) {
const [item, error] = result;
console.log(item); console.log(item);
} }

View file

@ -39,7 +39,7 @@ func (r *RowsIter) Exec(input []byte) {
query, err := req.Parse(adapter.GetDialect(db.Type)) query, err := req.Parse(adapter.GetDialect(db.Type))
if err != nil || e != nil { if err != nil || e != nil {
res := proto.Response{ res := proto.Response{
Msg: "failed to unmarshal request", Error: "failed to unmarshal request",
} }
r.Result, _ = res.MarshalMsg(nil) r.Result, _ = res.MarshalMsg(nil)
return return
@ -48,7 +48,7 @@ func (r *RowsIter) Exec(input []byte) {
result, err := db.Exec(query) result, err := db.Exec(query)
if err != nil { if err != nil {
res := proto.Response{ res := proto.Response{
Msg: err.Error(), Error: err.Error(),
} }
r.Result, _ = res.MarshalMsg(nil) r.Result, _ = res.MarshalMsg(nil)
return return
@ -66,7 +66,7 @@ func (r *RowsIter) Exec(input []byte) {
rows, err := db.Query(query) rows, err := db.Query(query)
if err != nil { if err != nil {
res := proto.Response{ res := proto.Response{
Msg: err.Error(), Error: err.Error(),
} }
r.Result, _ = res.MarshalMsg(nil) r.Result, _ = res.MarshalMsg(nil)
return return

View file

@ -6,5 +6,5 @@ type Response struct {
Id uint32 `msg:"i"` Id uint32 `msg:"i"`
RowsAffected int64 `msg:"ra"` RowsAffected int64 `msg:"ra"`
LastInsertId int64 `msg:"li"` LastInsertId int64 `msg:"li"`
Msg string `msg:"m"` Error string `msg:"e"`
} }

View file

@ -42,10 +42,10 @@ func (z *Response) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "LastInsertId") err = msgp.WrapError(err, "LastInsertId")
return return
} }
case "m": case "e":
z.Msg, err = dc.ReadString() z.Error, err = dc.ReadString()
if err != nil { if err != nil {
err = msgp.WrapError(err, "Msg") err = msgp.WrapError(err, "Error")
return return
} }
default: default:
@ -92,14 +92,14 @@ func (z *Response) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "LastInsertId") err = msgp.WrapError(err, "LastInsertId")
return return
} }
// write "m" // write "e"
err = en.Append(0xa1, 0x6d) err = en.Append(0xa1, 0x65)
if err != nil { if err != nil {
return return
} }
err = en.WriteString(z.Msg) err = en.WriteString(z.Error)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Msg") err = msgp.WrapError(err, "Error")
return return
} }
return return
@ -118,9 +118,9 @@ func (z *Response) MarshalMsg(b []byte) (o []byte, err error) {
// string "li" // string "li"
o = append(o, 0xa2, 0x6c, 0x69) o = append(o, 0xa2, 0x6c, 0x69)
o = msgp.AppendInt64(o, z.LastInsertId) o = msgp.AppendInt64(o, z.LastInsertId)
// string "m" // string "e"
o = append(o, 0xa1, 0x6d) o = append(o, 0xa1, 0x65)
o = msgp.AppendString(o, z.Msg) o = msgp.AppendString(o, z.Error)
return return
} }
@ -160,10 +160,10 @@ func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "LastInsertId") err = msgp.WrapError(err, "LastInsertId")
return return
} }
case "m": case "e":
z.Msg, bts, err = msgp.ReadStringBytes(bts) z.Error, bts, err = msgp.ReadStringBytes(bts)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Msg") err = msgp.WrapError(err, "Error")
return return
} }
default: default:
@ -180,6 +180,6 @@ func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Response) Msgsize() (s int) { func (z *Response) Msgsize() (s int) {
s = 1 + 2 + msgp.Uint32Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 2 + msgp.StringPrefixSize + len(z.Msg) s = 1 + 2 + msgp.Uint32Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 2 + msgp.StringPrefixSize + len(z.Error)
return return
} }

View file

@ -1,5 +1,5 @@
{ {
"files": ["client/index.ts", "client/Binding.ts", "client/libdal.ts"], "files": ["client/index.ts", "client/Binding.ts", "client/native.ts"],
"compilerOptions": { "compilerOptions": {
"target": "ESNext", "target": "ESNext",
"module": "ESNext", "module": "ESNext",