0
点赞
收藏
分享

微信扫一扫

Go-MySQL(二)Go实现MySQL连接池

老罗话编程 2022-07-29 阅读 92


文章目录

  • ​​Go-MySQL(二)Go实现MySQL连接池​​
  • ​​连接池数据结构​​
  • ​​获取连接​​
  • ​​释放连接​​
  • ​​关闭连接池​​
  • ​​测试​​
  • ​​完整代码​​

Go-MySQL(二)Go实现MySQL连接池

连接池数据结构

利用channel来存储数据库连接,消费channel中的消息获取连接,连接池未满时则新建连接后将连接放入channel,采用的带缓冲区的channel,缓冲区大小就是连接池的最大容纳的连接数,如果缓冲区还有空间,那么获取和释放连接都不会阻塞,如果缓冲区为空,那么就是阻塞连接获取,从而走新建连接的逻辑;同理,缓冲区满了,就阻塞向channel放入连接的过程,需要先消费.

var (
PoolUnInvaildSizeError = errors.New("pool size is unvaild")
PoolIsClosedError = errors.New("pool had closed")
)

// 连接池定义
type Pool struct {
sync.Mutex // 保证连接池线程安全
Size int // 连接池连接数量
ConnChan chan io.Closer // 存储连接的管道
IsClose bool
ctx context.Context
}

初始化连接池:

// 初始化
func New(size int) (*Pool,error){
if size <= 0{
return nil,PoolUnInvaildSizeError
}
return &Pool{
ConnChan: make(chan io.Closer,size),
ctx: context.Background(),
},nil
}

获取连接

获取连接:

  • 从管道中消费,如果没有连接则新建连接
  • 新建连接后放入连接池管道中

// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
if pool.IsClose == true{
return nil,PoolIsClosedError
}
select {
// 从管道中消费
case conn,ok := <- pool.ConnChan:
if !ok{
return nil,PoolIsClosedError
}
fmt.Println("获取到连接:",conn)
return conn,nil
default:
// 连接池中没有连接,新建连接
return pool.getNewConn(pool.ctx)
}
}

构造新连接,这里使用的是go自带的sql.open获取到的DB对象的conn作为连接,其实DB对象自己也是由维护了一个连接池的,这里只是作为练习,获取其中的连接

// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
if err != nil{
log.Fatal("数据库连接失败",err)
return nil, err
}
conn,_ := db.Conn(ctx)
select {
case pool.ConnChan <- conn:
fmt.Println("连接放入连接池")
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return conn,nil
}

释放连接

释放连接,如果连接池没满则直接放入原来的channel中,满了则直接close丢弃

// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
pool.Lock()
defer pool.Unlock()

if pool.IsClose == true{
return PoolIsClosedError
}

select {
case pool.ConnChan <- conn:
fmt.Println("连接已放回",conn)
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return nil
}

关闭连接池

关闭连接池除了需要修改连接池状态,关闭管道,还需要遍历管道中还存在的连接,将其一一关闭。

// 关闭连接池
func (pool *Pool) ClosePool() error{
pool.Lock()
defer pool.Unlock()

if pool.IsClose == true{
return PoolIsClosedError
}

pool.IsClose = true
close(pool.ConnChan)

for conn := range pool.ConnChan{
conn.Close()
}
return nil
}

测试

开启20个协程模拟从连接池获取连接和释放连接的过程:

func Test() {
wg := sync.WaitGroup{}
pool, err := New(10)
if err != nil {
log.Fatal(err)
}
wg.Add(20)
fmt.Println("开启20个协程获取连接")
for i := 0; i < 20; i++ {
go TestReleaseAndGetConn(pool,wg)
}
wg.Wait()
fmt.Println("main end")
}

func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup) {
s := rand.Int63n(2)
time.Sleep(time.Duration(s) * time.Second)
conn,err := pool.GetConnFromPool()
if err != nil{
log.Fatal(err)
}
fmt.Println("连接池连接数:",len(pool.ConnChan))
time.Sleep(time.Duration(s) * time.Second)
pool.ReleaseConn(conn)
wg.Done()
}

结果:

Go-MySQL(二)Go实现MySQL连接池_连接池

完整代码

package go_mysql

import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"log"
"math/rand"
"sync"
"time"
)

var (
PoolUnInvaildSizeError = errors.New("pool size is unvaild")
PoolIsClosedError = errors.New("pool had closed")
)

// 连接池定义
type Pool struct {
sync.Mutex // 保证连接池线程安全
Size int // 连接池连接数量
ConnChan chan io.Closer // 存储连接的管道
IsClose bool
ctx context.Context
}

// 初始化
func New(size int) (*Pool,error){
if size <= 0{
return nil,PoolUnInvaildSizeError
}
return &Pool{
ConnChan: make(chan io.Closer,size),
ctx: context.Background(),
},nil
}

// 获取连接
func(pool *Pool) GetConnFromPool() (io.Closer,error){
if pool.IsClose == true{
return nil,PoolIsClosedError
}
select {
case conn,ok := <- pool.ConnChan:
if !ok{
return nil,PoolIsClosedError
}
fmt.Println("获取到连接:",conn)
return conn,nil
default:
return pool.getNewConn(pool.ctx)
}
}

// 关闭连接池
func (pool *Pool) ClosePool() error{
pool.Lock()
defer pool.Unlock()

if pool.IsClose == true{
return PoolIsClosedError
}

pool.IsClose = true
close(pool.ConnChan)

for conn := range pool.ConnChan{
conn.Close()
}
return nil
}

// 释放连接
func(pool *Pool) ReleaseConn(conn io.Closer) error{
pool.Lock()
defer pool.Unlock()

if pool.IsClose == true{
return PoolIsClosedError
}

select {
case pool.ConnChan <- conn:
fmt.Println("连接已放回",conn)
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return nil
}

// 构造新连接
func (pool *Pool) getNewConn(ctx context.Context) (io.Closer, error) {
db,err := sql.Open("mysql","root:123@tcp(127.0.0.1:3306)/test?charset=utf8&parseTime=True")
if err != nil{
log.Fatal("数据库连接失败",err)
return nil, err
}
conn,_ := db.Conn(ctx)
select {
case pool.ConnChan <- conn:
fmt.Println("连接放入连接池")
default:
fmt.Println("连接池满了,连接丢弃")
conn.Close()
}
return conn,nil
}

func Test() {
wg := sync.WaitGroup{}
pool, err := New(10)
if err != nil {
log.Fatal(err)
}
wg.Add(20)
fmt.Println("开启20个协程获取连接")
for i := 0; i < 20; i++ {
go TestReleaseAndGetConn(pool,wg)
}
wg.Wait()
fmt.Println("main end")
}

func TestReleaseAndGetConn(pool *Pool,wg sync.WaitGroup) {
s := rand.Int63n(2)
time.Sleep(time.Duration(s) * time.Second)
conn,err := pool.GetConnFromPool()
if err != nil{
log.Fatal(err)
}
fmt.Println("连接池连接数:",len(pool.ConnChan))
time.Sleep(time.Duration(s) * time.Second)
pool.ReleaseConn(conn)
wg.Done()
}


举报

相关推荐

0 条评论