From b1f9cead3474e5c3bfc2363b1cc6ad7310c02afa Mon Sep 17 00:00:00 2001 From: Anton Nesterov Date: Mon, 12 Aug 2024 15:47:06 +0200 Subject: [PATCH] [wip] cleanup unused connections Signed-off-by: Anton Nesterov --- pkg/adapter/DBAdapter.go | 92 +++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 30 deletions(-) diff --git a/pkg/adapter/DBAdapter.go b/pkg/adapter/DBAdapter.go index b9563bb..fb37595 100644 --- a/pkg/adapter/DBAdapter.go +++ b/pkg/adapter/DBAdapter.go @@ -10,61 +10,93 @@ import ( * DBAdapter * Automatically creates connections for each database URL. * Executes queries on the specified database. - * TODO: Closes connections older than 2 minutes. + * Closes connections older than ConnectionLiveTime **/ type DBAdapter struct { - Type string - MaxAttempts int - _db map[string]*sql.DB - _connect_attempts map[string]int - _connect_time map[string]int64 + Type string + MaxAttempts int + ConnectionLiveTime int + dbs *DBMap +} + +type DBMap struct { + Connections map[string]*sql.DB + ConnectionAttempts map[string]int + ConnectionTime map[string]int64 } func (a *DBAdapter) Open(url string) (*sql.DB, error) { - if a._db == nil { - a._db = make(map[string]*sql.DB) - } - if a._connect_attempts == nil { - a._connect_attempts = make(map[string]int) - } - if a._connect_time == nil { - a._connect_time = make(map[string]int64) - } + defer a.CleanUp() + if a.MaxAttempts == 0 { a.MaxAttempts = 6 } - if _, ok := a._db[url]; !ok { - a._db[url], _ = sql.Open(a.Type, url) + if a.ConnectionLiveTime == 0 { + a.ConnectionLiveTime = 120 + } + if a.dbs == nil { + a.dbs = &DBMap{ + Connections: make(map[string]*sql.DB), + ConnectionAttempts: make(map[string]int), + ConnectionTime: make(map[string]int64), + } + } + + connections := a.dbs.Connections + attempts := a.dbs.ConnectionAttempts + lastHits := a.dbs.ConnectionTime + + lastHits[url] = time.Now().Unix() + if _, ok := connections[url]; !ok { + connections[url], _ = sql.Open(a.Type, url) } else { - err := a._db[url].Ping() + err := connections[url].Ping() if err != nil { - a._db[url] = nil - a._connect_attempts[url]++ + connections[url] = nil + attempts[url]++ time.Sleep(time.Duration(5) * time.Second) - if a._connect_attempts[url] > a.MaxAttempts { - return nil, fmt.Errorf(`failed to connect to "%s"`, url) + if attempts[url] > a.MaxAttempts { + return nil, fmt.Errorf( + `failed to connect to "%s", after %v attempts`, + url, + a.MaxAttempts, + ) } return a.Open(url) } } - a._connect_attempts[url] = 0 - a._connect_time[url] = time.Now().Unix() - return a._db[url], nil + a.dbs.ConnectionAttempts[url] = 0 + return a.dbs.Connections[url], nil } func (a *DBAdapter) GetDB(url string) *sql.DB { - if a._db == nil { + if a.dbs == nil { return nil } - return a._db[url] + return a.dbs.Connections[url] } func (a *DBAdapter) Close() { - for _, db := range a._db { + for _, db := range a.dbs.Connections { db.Close() } - a._db = nil - a._connect_attempts = nil + a.dbs = nil +} + +func (a *DBAdapter) CleanUp() { + if a.dbs == nil { + return + } + lastHits := a.dbs.ConnectionTime + liveTime := a.ConnectionLiveTime + for url, db := range a.dbs.Connections { + if time.Now().Unix()-lastHits[url] > int64(liveTime) { + db.Close() + delete(a.dbs.Connections, url) + delete(a.dbs.ConnectionAttempts, url) + delete(a.dbs.ConnectionTime, url) + } + } } func (a *DBAdapter) Query(req Query) (*sql.Rows, error) {