[ref] ts client; [fix] free mem Bun ffi

This commit is contained in:
Anton Nesterov 2024-08-30 08:53:32 +02:00
parent 849dbff420
commit ddc2e1ba0e
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5
18 changed files with 125 additions and 153 deletions

View file

@ -187,7 +187,7 @@ export default class Builder<I extends abstract new (...args: any) => any> {
}
async Query<T = InstanceType<I>>(): Promise<T[]> {
const rows = this.Rows();
const result = [];
const result: T[] = [];
for await (const row of rows) {
result.push(row);
}

View file

@ -2,9 +2,10 @@
* This file is responsible for binding the C library to the Bun runtime.
*/
import { dlopen, FFIType, suffix, ptr, toBuffer } from "bun:ffi";
import { join } from "path";
const libname = `../clib/clib.${suffix}`;
const libpath = libname;
const libname = `clib.${suffix}`;
const libpath = join("clib", libname);
const {
symbols: { InitSQLite, CreateRowIterator, NextRow, GetLen, Free, Cleanup },
@ -46,8 +47,8 @@ function rowIterator(buf: Buffer) {
if (pointer === null) {
return null;
}
const buf = toBuffer(pointer, 0, GetLen(iter));
//Free(pointer) //should be resolved by GC;
const buf = Buffer.from(toBuffer(pointer, 0, GetLen(iter)));
Free(pointer);
return buf;
};

21
client/Library.ts Normal file
View file

@ -0,0 +1,21 @@
import { createRequire } from "node:module";
const require = createRequire(import.meta.url);
type RowIterator = {
next: () => Buffer;
cleanup: () => void;
};
type SQLite = {
initSQLite: (pragmas: Buffer) => void;
rowIterator: (input: Buffer) => RowIterator;
};
let Library: SQLite;
if (process.isBun) {
Library = require("./Bunding") as SQLite;
} else {
Library = require("./Binding") as SQLite;
}
export default Library;

View file

@ -0,0 +1,90 @@
import fs from "fs";
import dal from "../Bunding";
const Mb = (num) => Math.round(num / 1024 / 1024);
class Stats {
avg_rss;
avg_heapTotal;
avg_heapUsed;
avg_external;
calls;
constructor() {
this.calls = 0;
this.avg_rss = 0;
this.avg_heapTotal = 0;
this.avg_heapUsed = 0;
this.avg_external = 0;
}
add(mem) {
this.calls++;
this.avg_rss += mem.rss;
this.avg_heapTotal += mem.heapTotal;
this.avg_heapUsed += mem.heapUsed;
this.avg_external += mem.external;
}
avg() {
const n = this.calls;
this.avg_rss /= n;
this.avg_heapTotal /= n;
this.avg_heapUsed /= n;
this.avg_external /= n;
}
print() {
console.log(`
AVERAGE:
rss: ${Mb(this.avg_rss)} Mb
external: ${Mb(this.avg_external)} Mb
buffers: ${Mb(this.avg_heapUsed)} Mb
total: ${Mb(this.avg_heapTotal)} Mb`);
}
}
const stats = new Stats();
let prevMem = process.memoryUsage();
stats.add(prevMem);
function MEM(when = "") {
const mem = process.memoryUsage();
stats.add(mem);
console.log(`
${when}
rss: ${Mb(mem.rss)} Mb [delta> ${mem.rss - prevMem.rss}]
external: ${Mb(mem.external)} Mb [delta> ${mem.external - prevMem.external}]
buffers: ${Mb(mem.heapUsed)} Mb [delta> ${mem.heapUsed - prevMem.heapUsed}]
total: ${Mb(mem.heapTotal)} Mb [delta> ${mem.heapTotal - prevMem.heapTotal}]`);
}
console.time("Time to end");
MEM("START");
const buf = fs.readFileSync("./pkg/__test__/proto_test.msgpack");
const Iterator = dal.rowIterator(buf);
MEM("AFTER INIT");
let dataTransferedBytes = 0;
for (let i = 0; i < 100000000; i++) {
const b = Iterator.next()!;
if (b.length === 0) {
break;
}
dataTransferedBytes += b.length;
if (i % 1000000 === 0) {
MEM(`ITERATION ${i}`);
}
}
MEM("AFTER ITERATION");
Iterator.cleanup();
MEM("AFTER CLEANUP");
console.log("\nData transfered: ", Mb(dataTransferedBytes), "Mb");
console.timeEnd("Time to end");
setTimeout(() => {
MEM("AFTER SOME TIME");
stats.avg();
stats.print();
}, 30000);

View file

@ -55,7 +55,7 @@ MEM("START");
const buf = fs.readFileSync("./pkg/__test__/proto_test.msgpack");
const Iterator = dal.RowIterator(buf);
const Iterator = dal.rowIterator(buf);
MEM("AFTER INIT");
let dataTransferedBytes = 0;
@ -72,7 +72,7 @@ for (let i = 0; i < 100000000; i++) {
MEM("AFTER ITERATION");
Iterator.free();
Iterator.cleanup();
MEM("AFTER CLEANUP");
console.log("\nData transfered: ", Mb(dataTransferedBytes), "Mb");

View file

@ -1,5 +1,5 @@
import Builder from "./Builder";
import Binding from "./Binding";
import Binding from "./Library";
import { encodeRequest, decodeRows, decodeResponse } from "./Protocol";
import type { ExecResult } from "./Protocol";

View file

@ -1,65 +0,0 @@
import Builder from "./Builder";
import Bunding from "./Bunding";
import { encodeRequest, decodeRows, decodeResponse } from "./Protocol";
import type { ExecResult } from "./Protocol";
type Options = {
database: string;
};
/**
* Allows to use SQLite databases in BunJS
*/
export default class CBuilder<
I extends abstract new (...args: any) => any,
> extends Builder<I> {
constructor(opts: Options) {
super({ database: opts.database, url: "" });
}
/**
* TODO: handle responses
*/
async *Rows<T = InstanceType<I>>(): AsyncGenerator<T> {
this.formatRequest();
const req = Buffer.from(encodeRequest(this.request));
const iter = Bunding.rowIterator(req);
for (;;) {
const response = iter.next();
if (response === null) {
iter.cleanup();
break;
}
const rows = decodeRows(response);
if (rows.length === 0) {
iter.cleanup();
break;
}
for (const row of rows) {
if (this.headerRow === null) {
this.headerRow = row.r;
continue;
}
yield this.formatRow(row.r);
}
}
}
async Query<T = InstanceType<I>>(): Promise<T[]> {
const rows = this.Rows();
const result: T[] = [];
for await (const row of rows) {
result.push(row);
}
return result;
}
async Exec(): Promise<ExecResult> {
this.formatRequest();
const req = Buffer.from(encodeRequest(this.request));
const iter = Bunding.rowIterator(req);
const response = iter.next();
if (response === null) {
iter.cleanup();
throw new Error("No response");
}
return decodeResponse(response);
}
}

View file

@ -21,9 +21,10 @@
},
"scripts": {
"test:client": "bun test:*",
"test:dal": "bun test dal/__test__",
"test:serve": "cd dal/__test__/srv && go run main.go",
"bench:node": "node ./dal/__test__/bench.node.cjs",
"test:dal": "bun test client/__test__",
"test:serve": "cd client/__test__/srv && go run main.go",
"bench:node": "node ./client/__test__/bench.node.cjs",
"bench:bun": "bun ./client/__test__/bench.bun.ts",
"fmt": "prettier --write .",
"build": "tsc",
"prepublish": "tsc",

View file

@ -99,79 +99,3 @@ func (r *RowsIter) Next() []byte {
cols, _ := proto.MarshalRow(data)
return cols
}
func HandleQuery(input *[]byte, output *[]byte) int {
InitSQLite([]string{})
req := proto.Request{}
_, err := req.UnmarshalMsg(*input)
if err != nil {
res := proto.Response{
Id: 0,
RowsAffected: -1,
LastInsertId: -1,
Msg: "failed to unmarshal request",
}
*output, _ = res.MarshalMsg(nil)
return 0
}
query, err := req.Parse(adapter.GetDialect(db.Type))
if err != nil {
res := proto.Response{
Id: 0,
RowsAffected: -1,
LastInsertId: -1,
Msg: err.Error(),
}
*output, _ = res.MarshalMsg(nil)
return 0
}
if query.Exec {
result, err := db.Exec(query)
if err != nil {
res := proto.Response{
Id: 0,
RowsAffected: -1,
LastInsertId: -1,
Msg: err.Error(),
}
*output, _ = res.MarshalMsg(nil)
return 0
}
ra, _ := result.RowsAffected()
la, _ := result.LastInsertId()
res := proto.Response{
Id: 0,
RowsAffected: ra,
LastInsertId: la,
}
*output, _ = res.MarshalMsg(nil)
return 0
}
rows, err := db.Query(query)
if err != nil {
res := proto.Response{
Id: 0,
RowsAffected: -1,
LastInsertId: -1,
Msg: err.Error(),
}
*output, _ = res.MarshalMsg(nil)
return 0
}
defer rows.Close()
columns, _ := rows.Columns()
types, _ := rows.ColumnTypes()
cols, _ := proto.MarshalRow(columns)
*output = append(*output, cols...)
for rows.Next() {
data := make([]interface{}, len(columns))
for i := range data {
typ := reflect.New(types[i].ScanType()).Interface()
data[i] = &typ
}
rows.Scan(data...)
cols, _ := proto.MarshalRow(data)
*output = append(*output, cols...)
}
return 0
}

View file

@ -1,5 +1,5 @@
{
"files": ["dal/index.ts", "dal/Binding.ts", "dal/node.napi.ts"],
"files": ["client/index.ts", "client/Binding.ts", "client/libdal.ts"],
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",