最近一直在弄一个python的服务器端,使用了mysql数据库,因为想要访问数据库快一些,所以打算使用多线程建一个连接线程池,当有mysql数据库请求时,只要使用列队将请求数据发送给mysql连接池中的空闲线程来请求数据就好了。请求到数据后发送给客户端的用户数据缓存中。由用户逻辑处理线程处理结束后再返回结果给客户端。
当然了,实际上这个工具还可以用于其他想使用多线程处理的地方,比如像btc程序化交易系统。可以在交易平台申请很多个交易帐号,建立交易连接时每一个帐号一个线程,同时使用tor网络与交易平台建立连接,这样我就可以使用一个客户端同时使用多个IP地址管理多个btc交易帐号同时参与同一个交易平台交易。可以达到不引起btc交易平台和其他交易者察觉的方式控制币市价格了。据我所知,有的庄家就是这么干的。
下边是我写的一个mysql多线程池测试代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#mysql连接线程池,初始化时设置3个连接池,连接池保持使用率小于80%,如果连接池使用率大于等于80%时则自动增加mysql连接量直到使用率小于80%,连接池最大50个
import
import time
import
from Queue import
_CONNECTTOOL = None
def
_CONNECTTOOL.requestComplete(n)
class
def __init__(self, t_name,requstqueue,responeQueue,signal,ptime = 3):
threading.Thread.__init__(self, name=t_name)
#线程条件锁
self.mysqlobj = mysqlobj.mysqlobj()
None #mysql调用返回值
#mysql请求列队
#mysql返回列队
0 #超时定时器
'' #作数据库mysql请求的帐号
self.askType = 1 #1.线程空闲,可以接受请求,2.正在发送数据库请求,等待返回结果,3.数据库结果已返回或请求超时,正在处理请求结果
None #当前请求连接数据对象
#默认请求3秒无返回,则认为数据库请求超时
def
2
#开始发送mysql请求时间
#接收mysqly请求数据对象
self._sendRequest()
def
if self.connectobj.backfunc != None:
print 'thname=%s\n'%(self.getName())
4)
#self.connectobj.cmdstr = self.mysqlobj.execute(self.connectobj.cmdstr)
3
self.responeQueue.put(self.connectobj)
self.connectobj.comfunc(int(self.getName()))
1
def
while(True):
if self.askType == 1 and (not self.requstqueue.empty()): #线程是否空闲,请求队列中是否有请求对象
objtmp = self.requstqueue.get()
self._setNewConnect(objtmp)
else:#没有数据请求,线程进入等待唤醒模式
print 'thread wait:%s\n'%(self.getName())
self.signal.clear()
#请求结束等待下一次请求来唤醒
class
def __init__(self,account,cmdstr,backfunc,ptype = 'sreach'):#inset,del,update,sreach,分别为增加数据,删除数据,修改数据,查找数据,backfunc为查找到的数据返回
self.account = account
self.cmdstr = cmdstr
#数据库请求类型
#数据库请求返回回调函数
#数据库查找时间,用作查找超时处理,现在定义默认查找5秒未返回就超时,
''
self.comfunc = requestComplete
class
def __init__(self,maxcon = 50,mincon = 3,addpencent = 80):
self.maxcon = maxcon
self.mincon = mincon
self.singal = threading.Event()
self.addpencent = addpencent
0 #当前已连接mysql数量
self.conthreads = {}
self.singals = {}
self.runthread = []
self.waitthread = []
0
0.0 #当前线程池使用率
#mysql请求列队
#mysql返回列队
self._createConncets()
#初始化mysql连接池
def
global
_CONNECTTOOL = self
'1'] = threading.Event()
'1'] = _connectThread('1',self.requestQueue, self.responeQueue,self.singals['1'])
'1'].setDaemon(True)
'1'].start()
'2'] = threading.Event()
'2'] = _connectThread('2',self.requestQueue, self.responeQueue,self.singals['2'])
'2'].setDaemon(True)
'2'].start()
'3'] = threading.Event()
'3'] = _connectThread('3',self.requestQueue, self.responeQueue,self.singals['3'])
'3'].setDaemon(True)
'3'].start()
3
1,2,3]
#使用线程池中线程发送mysql命令
def mysqlexecute(self,account,cmdstr,backfunc,ptype = 'sreach'):
reqtmp = _askObj(account,cmdstr,backfunc,ptype)
self.requestQueue.put(reqtmp)
qs = self.requestQueue.qsize()
self.conpencent = (float)(qs/len(self.conthreads))
print 'qs=%d\n'%(qs)
if self.conpencent >= 0.8 and
1
self.singals[str(self.threadCount)] = threading.Event()
self.conthreads[str(self.threadCount)] = _connectThread(str(self.threadCount),self.requestQueue,self.responeQueue,self.singals[str(self.threadCount)])
True)
self.conthreads[str(self.threadCount)].start()
print 'thread count:%d\n'%(self.threadCount)
if len(self.waitthread) > 0:
n = self.waitthread.pop()
self.runthread.append(n)
self.singals[str(n)].set()
def
if not
reqtmp = self.responeQueue.get()
reqtmp.backfunc(reqtmp.account,reqtmp.cmdstr)
self.waitthread.append(n)
def
print 'asktest:%s,%s'%(backaccount,backdat)
#classtest
if __name__ == '__main__':
con = mysqlConnectThreads()
1
while(True):
1
'ask'+str(ac), asktest)
1)
然后是mysql访问数据库的代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
from subprocess import
# host='localhost',
# port = 3306,
# user='root',
# passwd='7668150Tt00',
# db ='sanguogame',
# CREATE TABLE `sanguogame`.`new_table` (
# `id` INT NOT NULL AUTO_INCREMENT,
# `name` VARCHAR(45) NOT NULL,
# `class` VARCHAR(45) NOT NULL,
# `age` VARCHAR(45) NOT NULL,
# PRIMARY KEY (`id`, `name`));
class
def __init__(self,addr = 'localhost',port = 3306,usname = 'root',uspw = '123456Qq',defDB = 'test'):
#mysql地址
#mysql端口
#mysql登陆用户名
#mysql登陆密码
#mysql默认登陆数据库
None #mysql连接管理器
None #mysql消息收发器
#连接mysql数据库
def
self.connectManger = MySQLdb.connect(
host = self.mysqladdr,
port = self.mysqlport,
user = self.mysqlusername,
passwd = self.mysqlpassword,
db = self.mysqlDefaleDB,
"utf8"
)
self.mysqlcursor = self.connectManger.cursor()
#调用mysql命令
def
if
return
else:
return -999#mysql 未连接
def
'/usr/local/mysql/bin/mysql -h%s -P%s -u%s -p%s %s' %(self.mysqladdr, self.mysqlport, self.mysqlusername, self.mysqlpassword, self.mysqlDefaleDB), stdout=PIPE, stdin=PIPE, shell=True)
'source '+sqlfilepath)
多线程间等待与启动交互事件参考:
http://www.jb51.net/article/63512.htm