[wip] adapter
Signed-off-by: Anton Nesterov <anton@demiurg.io>
This commit is contained in:
parent
05f155ecc0
commit
25f3e18f4a
44
pkg/__test__/adapter_test.go
Normal file
44
pkg/__test__/adapter_test.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"l12.xyz/dal/adapter"
|
||||
)
|
||||
|
||||
func TestAdapterBasic(t *testing.T) {
|
||||
a := adapter.DBAdapter{Type: "sqlite3"}
|
||||
db, err := a.Open("file::memory:?cache=shared")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open db: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
_, err = db.Exec("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create table: %v", err)
|
||||
}
|
||||
_, err = db.Exec("INSERT INTO test (name) VALUES (?)", "test")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to insert data: %v", err)
|
||||
}
|
||||
rows, err := a.Query(adapter.Query{
|
||||
Db: "file::memory:?cache=shared",
|
||||
Expression: "SELECT * FROM test",
|
||||
Data: []interface{}{},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to query data: %v", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var name string
|
||||
err = rows.Scan(&id, &name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to scan row: %v", err)
|
||||
}
|
||||
fmt.Printf("id: %d, name: %s\n", id, name)
|
||||
}
|
||||
}
|
17
pkg/__test__/go.mod
Normal file
17
pkg/__test__/go.mod
Normal file
|
@ -0,0 +1,17 @@
|
|||
module l12.xyz/dal/tests
|
||||
|
||||
go 1.22.6
|
||||
|
||||
replace l12.xyz/dal/builder v0.0.0 => ../builder
|
||||
|
||||
replace l12.xyz/dal/utils v0.0.0 => ../utils
|
||||
|
||||
require l12.xyz/dal/adapter v0.0.0
|
||||
|
||||
replace l12.xyz/dal/adapter v0.0.0 => ../adapter
|
||||
|
||||
require (
|
||||
github.com/mattn/go-sqlite3 v1.14.22
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
l12.xyz/dal/utils v0.0.0 // indirect
|
||||
)
|
4
pkg/__test__/go.sum
Normal file
4
pkg/__test__/go.sum
Normal file
|
@ -0,0 +1,4 @@
|
|||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
92
pkg/adapter/DBAdapter.go
Normal file
92
pkg/adapter/DBAdapter.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package adapter
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
/**
|
||||
* DBAdapter
|
||||
* Automatically creates connections for each database URL.
|
||||
* Executes queries on the specified database.
|
||||
* TODO: Closes connections older than 2 minutes.
|
||||
**/
|
||||
type DBAdapter struct {
|
||||
Type string
|
||||
MaxAttempts int
|
||||
_db map[string]*sql.DB
|
||||
_connect_attempts map[string]int
|
||||
_connect_time map[string]int64
|
||||
}
|
||||
|
||||
func (a *DBAdapter) Open(url string) (*sql.DB, error) {
|
||||
if a._db == nil {
|
||||
a._db = make(map[string]*sql.DB)
|
||||
}
|
||||
if a._connect_attempts == nil {
|
||||
a._connect_attempts = make(map[string]int)
|
||||
}
|
||||
if a._connect_time == nil {
|
||||
a._connect_time = make(map[string]int64)
|
||||
}
|
||||
if a.MaxAttempts == 0 {
|
||||
a.MaxAttempts = 6
|
||||
}
|
||||
if _, ok := a._db[url]; !ok {
|
||||
a._db[url], _ = sql.Open(a.Type, url)
|
||||
} else {
|
||||
err := a._db[url].Ping()
|
||||
if err != nil {
|
||||
a._db[url] = nil
|
||||
a._connect_attempts[url]++
|
||||
time.Sleep(time.Duration(5) * time.Second)
|
||||
if a._connect_attempts[url] > a.MaxAttempts {
|
||||
return nil, fmt.Errorf(`failed to connect to "%s"`, url)
|
||||
}
|
||||
return a.Open(url)
|
||||
}
|
||||
}
|
||||
a._connect_attempts[url] = 0
|
||||
a._connect_time[url] = time.Now().Unix()
|
||||
return a._db[url], nil
|
||||
}
|
||||
|
||||
func (a *DBAdapter) GetDB(url string) *sql.DB {
|
||||
if a._db == nil {
|
||||
return nil
|
||||
}
|
||||
return a._db[url]
|
||||
}
|
||||
|
||||
func (a *DBAdapter) Close() {
|
||||
for _, db := range a._db {
|
||||
db.Close()
|
||||
}
|
||||
a._db = nil
|
||||
a._connect_attempts = nil
|
||||
}
|
||||
|
||||
func (a *DBAdapter) Query(req Query) (*sql.Rows, error) {
|
||||
db, err := a.Open(req.Db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sfmt, err := db.Prepare(req.Expression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sfmt.Query(req.Data...)
|
||||
}
|
||||
|
||||
func (a *DBAdapter) Exec(req Query) (sql.Result, error) {
|
||||
db, err := a.Open(req.Db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sfmt, err := db.Prepare(req.Expression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sfmt.Exec(req.Data...)
|
||||
}
|
|
@ -1,5 +1,11 @@
|
|||
package adapter
|
||||
|
||||
type Query struct {
|
||||
Db string `json:"db"`
|
||||
Expression string `json:"expr"`
|
||||
Data []interface{} `json:"data"`
|
||||
}
|
||||
|
||||
type CtxOpts map[string]string
|
||||
|
||||
type Context interface {
|
||||
|
|
|
@ -5,6 +5,13 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
type Builder struct {
|
||||
TableName string
|
||||
TableAlias string
|
||||
Parts SQLParts
|
||||
Ctx Context
|
||||
}
|
||||
|
||||
type SQLParts struct {
|
||||
Operation string
|
||||
From string
|
||||
|
@ -21,13 +28,6 @@ type SQLParts struct {
|
|||
Update UpdateData
|
||||
}
|
||||
|
||||
type Builder struct {
|
||||
Parts SQLParts
|
||||
TableName string
|
||||
TableAlias string
|
||||
Ctx Context
|
||||
}
|
||||
|
||||
func New(ctx Context) *Builder {
|
||||
return &Builder{
|
||||
Parts: SQLParts{
|
||||
|
@ -135,6 +135,8 @@ func (b *Builder) OnConflict(fields ...string) *Builder {
|
|||
if b.Parts.Operation == "UPDATE" {
|
||||
b.Parts.Update.Upsert = convertConflict(b.Ctx, fields...)
|
||||
b.Parts.Update.UpsertExp = "DO NOTHING"
|
||||
} else {
|
||||
panic("OnConflict is only available for UPDATE operation")
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
@ -142,6 +144,8 @@ func (b *Builder) OnConflict(fields ...string) *Builder {
|
|||
func (b *Builder) DoUpdate(fields ...string) *Builder {
|
||||
if b.Parts.Operation == "UPDATE" {
|
||||
b.Parts.Update.UpsertExp = convertUpsert(fields)
|
||||
} else {
|
||||
panic("DoUpdate is only available for UPDATE operation")
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
@ -149,6 +153,8 @@ func (b *Builder) DoUpdate(fields ...string) *Builder {
|
|||
func (b *Builder) DoNothing() *Builder {
|
||||
if b.Parts.Operation == "UPDATE" {
|
||||
b.Parts.Update.UpsertExp = "DO NOTHING"
|
||||
} else {
|
||||
panic("DoNothing is only available for UPDATE operation")
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ module l12.xyz/dal/builder
|
|||
|
||||
go 1.22.6
|
||||
|
||||
require l12.xyz/dal/utils v0.0.0 // indirect
|
||||
require l12.xyz/dal/utils v0.0.0
|
||||
|
||||
replace l12.xyz/dal/utils v0.0.0 => ../utils
|
||||
|
||||
|
|
3
pkg/server/go.mod
Normal file
3
pkg/server/go.mod
Normal file
|
@ -0,0 +1,3 @@
|
|||
module l12.xyz/dal/server
|
||||
|
||||
go 1.22.6
|
Loading…
Reference in a new issue