[wip] cleanup unused connections

Signed-off-by: Anton Nesterov <anton@demiurg.io>
This commit is contained in:
Anton Nesterov 2024-08-12 15:47:06 +02:00
parent 43dd4e9234
commit b1f9cead34
No known key found for this signature in database
GPG key ID: 59121E8AE2851FB5

View file

@ -10,61 +10,93 @@ import (
* DBAdapter
* Automatically creates connections for each database URL.
* Executes queries on the specified database.
* TODO: Closes connections older than 2 minutes.
* Closes connections older than ConnectionLiveTime
**/
type DBAdapter struct {
Type string
MaxAttempts int
_db map[string]*sql.DB
_connect_attempts map[string]int
_connect_time map[string]int64
ConnectionLiveTime int
dbs *DBMap
}
type DBMap struct {
Connections map[string]*sql.DB
ConnectionAttempts map[string]int
ConnectionTime map[string]int64
}
func (a *DBAdapter) Open(url string) (*sql.DB, error) {
if a._db == nil {
a._db = make(map[string]*sql.DB)
}
if a._connect_attempts == nil {
a._connect_attempts = make(map[string]int)
}
if a._connect_time == nil {
a._connect_time = make(map[string]int64)
}
defer a.CleanUp()
if a.MaxAttempts == 0 {
a.MaxAttempts = 6
}
if _, ok := a._db[url]; !ok {
a._db[url], _ = sql.Open(a.Type, url)
if a.ConnectionLiveTime == 0 {
a.ConnectionLiveTime = 120
}
if a.dbs == nil {
a.dbs = &DBMap{
Connections: make(map[string]*sql.DB),
ConnectionAttempts: make(map[string]int),
ConnectionTime: make(map[string]int64),
}
}
connections := a.dbs.Connections
attempts := a.dbs.ConnectionAttempts
lastHits := a.dbs.ConnectionTime
lastHits[url] = time.Now().Unix()
if _, ok := connections[url]; !ok {
connections[url], _ = sql.Open(a.Type, url)
} else {
err := a._db[url].Ping()
err := connections[url].Ping()
if err != nil {
a._db[url] = nil
a._connect_attempts[url]++
connections[url] = nil
attempts[url]++
time.Sleep(time.Duration(5) * time.Second)
if a._connect_attempts[url] > a.MaxAttempts {
return nil, fmt.Errorf(`failed to connect to "%s"`, url)
if attempts[url] > a.MaxAttempts {
return nil, fmt.Errorf(
`failed to connect to "%s", after %v attempts`,
url,
a.MaxAttempts,
)
}
return a.Open(url)
}
}
a._connect_attempts[url] = 0
a._connect_time[url] = time.Now().Unix()
return a._db[url], nil
a.dbs.ConnectionAttempts[url] = 0
return a.dbs.Connections[url], nil
}
func (a *DBAdapter) GetDB(url string) *sql.DB {
if a._db == nil {
if a.dbs == nil {
return nil
}
return a._db[url]
return a.dbs.Connections[url]
}
func (a *DBAdapter) Close() {
for _, db := range a._db {
for _, db := range a.dbs.Connections {
db.Close()
}
a._db = nil
a._connect_attempts = nil
a.dbs = nil
}
func (a *DBAdapter) CleanUp() {
if a.dbs == nil {
return
}
lastHits := a.dbs.ConnectionTime
liveTime := a.ConnectionLiveTime
for url, db := range a.dbs.Connections {
if time.Now().Unix()-lastHits[url] > int64(liveTime) {
db.Close()
delete(a.dbs.Connections, url)
delete(a.dbs.ConnectionAttempts, url)
delete(a.dbs.ConnectionTime, url)
}
}
}
func (a *DBAdapter) Query(req Query) (*sql.Rows, error) {