0
点赞
收藏
分享

微信扫一扫

tornado异步非阻塞框架

Aliven888 2022-05-13 阅读 118

同步   普通阻塞,其它client也会被阻塞

import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):
def get(self):
import time
time.sleep(10)
self.write("Hello, world")

class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index")
application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
])

if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

 引入Future后的 同样是sleep 但是其它客户端可以正常访问

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.concurrent import Future
import time

class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
future = Future()
# 特殊的形式等待5s
tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.done)
yield future
def done(self, *args, **kwargs):
self.write('Main')
self.finish()


class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index")
application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
])

if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

 方法中有请求阻塞的  同步 其它也会阻塞住

import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):
def get(self):
import requests
requests.get('http://www.google.com')
self.write('xxxxx')

class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index")
application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
])

if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

  使用tornado的httpclient.AsyncHTTPClient异步发送请求  其它客户端可以正常访问

import tornado.ioloop
import tornado.web
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
from tornado import httpclient
http = httpclient.AsyncHTTPClient()
yield http.fetch("http://www.google.com", self.done)

def done(self, *args, **kwargs):
self.write('Main')
self.finish()

class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Index")
application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
])

if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

 引入future.set_result 当有异步io阻塞的时候 使用future.set_result可以终止阻塞

import tornado.ioloop
import tornado.web
from tornado import gen
from tornado.concurrent import Future

future = None
class MainHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
global future
future = Future()
future.add_done_callback(self.done)

yield future

def done(self, *args, **kwargs):
self.write('Main')
self.finish()

class IndexHandler(tornado.web.RequestHandler):
def get(self):
global future
future.set_result(None)
self.write("Index")

application = tornado.web.Application([
(r"/main", MainHandler),
(r"/index", IndexHandler),
])

if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()

最后看看future到底做了些什么可以异步非阻塞 自定义异步非阻塞框架第一版(同步请求)

import socket
import select

class HttpRequest(object):
"""
用户封装用户请求信息
"""
def __init__(self, content):
"""

:param content:用户发送的请求数据:请求头和请求体
"""
self.content = content

self.header_bytes = bytes()
self.body_bytes = bytes()

self.header_dict = {}

self.method = ""
self.url = ""
self.protocol = ""

self.initialize()
self.initialize_headers()

def initialize(self):

temp = self.content.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b

@property
def header_str(self):
return str(self.header_bytes, encoding='utf-8')

def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v

# class Future(object):
# def __init__(self):
# self.result = None

def main(request):
return "main"

def index(request):
return "indexasdfasdfasdf"


routers = [
('/main/',main),
('/index/',index),
]

def run():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 9999,))
sock.setblocking(False)
sock.listen(128)

inputs = []
inputs.append(sock)
while True:
rlist,wlist,elist = select.select(inputs,[],[],0.05)
for r in rlist:
if r == sock:
"""新请求到来"""
conn,addr = sock.accept()
conn.setblocking(False)
inputs.append(conn)
else:
"""客户端发来数据"""
data = b""
while True:
try:
chunk = r.recv(1024)
data = data + chunk
except Exception as e:
chunk = None
if not chunk:
break
# data进行处理:请求头和请求体
request = HttpRequest(data)
# 1. 请求头中获取url
# 2. 去路由中匹配,获取指定的函数
# 3. 执行函数,获取返回值
# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')
import re
flag = False
func = None
for route in routers:
if re.match(route[0],request.url):
flag = True
func = route[1]
break
if flag:
result = func(request)
r.sendall(bytes(result,encoding='utf-8'))
else:
r.sendall(b"404")

inputs.remove(r)
r.close()

if __name__ == '__main__':
run()

自定义异步非阻塞框架第2版(异步请求)server端

import socket
import select

class HttpRequest(object):
"""
用户封装用户请求信息
"""
def __init__(self, content):
"""

:param content:用户发送的请求数据:请求头和请求体
"""
self.content = content

self.header_bytes = bytes()
self.body_bytes = bytes()

self.header_dict = {}

self.method = ""
self.url = ""
self.protocol = ""

self.initialize()
self.initialize_headers()

def initialize(self):

temp = self.content.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b

@property
def header_str(self):
return str(self.header_bytes, encoding='utf-8')

def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v

class Future(object):
def __init__(self):
self.result = None
F = None
def main(request):
global F
F = Future()
return F

def stop(request):
global F
F.result = b"xxxxxxxxxxxxx"
return "stop"


def index(request):
return "indexasdfasdfasdf"


routers = [
('/main/',main),
('/index/',index),
('/stop/',stop),
]

def run():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 9999,))
sock.setblocking(False)
sock.listen(128)

inputs = []
inputs.append(sock)

async_request_dict = {
# 'socket': futrue
}

while True:
rlist,wlist,elist = select.select(inputs,[],[],0.05)
for r in rlist:
if r == sock:
"""新请求到来"""
conn,addr = sock.accept()
conn.setblocking(False)
inputs.append(conn)
else:
"""客户端发来数据"""
data = b""
while True:
try:
chunk = r.recv(1024)
data = data + chunk
except Exception as e:
chunk = None
if not chunk:
break
# data进行处理:请求头和请求体
request = HttpRequest(data)
# 1. 请求头中获取url
# 2. 去路由中匹配,获取指定的函数
# 3. 执行函数,获取返回值
# 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')
import re
flag = False
func = None
for route in routers:
if re.match(route[0],request.url):
flag = True
func = route[1]
break
if flag:
result = func(request)
if isinstance(result,Future):
async_request_dict[r] = result
else:
r.sendall(bytes(result,encoding='utf-8'))
inputs.remove(r)
r.close()
else:
r.sendall(b"404")
inputs.remove(r)
r.close()

for conn in async_request_dict.keys():
future = async_request_dict[conn]
if future.result:
conn.sendall(future.result)
conn.close()
del async_request_dict[conn]
inputs.remove(conn)

if __name__ == '__main__':
run()

 第二版中其实就是方法中返回一个future对象 然后该请求一直hang住(挂起),其它请求正常返回数据;

 然后把返回future对象的请求(conn)存进到字典中{conn:futrue},循环字典,判断future的result属性,如果有值则send数据给客户端 conn.sendall(b''),

   然后关闭链接conn.close(),把链接从字典中移除。

最终版本中对future对象增加了超时时间,如果请求超时返回timeout

tornado异步非阻塞框架_tornadotornado异步非阻塞框架_客户端_02

1 import socket
2 import select
3 import time
4
5 class HttpRequest(object):
6 """
7 用户封装用户请求信息
8 """
9 def __init__(self, content):
10 """
11
12 :param content:用户发送的请求数据:请求头和请求体
13 """
14 self.content = content
15
16 self.header_bytes = bytes()
17 self.body_bytes = bytes()
18
19 self.header_dict = {}
20
21 self.method = ""
22 self.url = ""
23 self.protocol = ""
24
25 self.initialize()
26 self.initialize_headers()
27
28 def initialize(self):
29
30 temp = self.content.split(b'\r\n\r\n', 1)
31 if len(temp) == 1:
32 self.header_bytes += temp
33 else:
34 h, b = temp
35 self.header_bytes += h
36 self.body_bytes += b
37
38 @property
39 def header_str(self):
40 return str(self.header_bytes, encoding='utf-8')
41
42 def initialize_headers(self):
43 headers = self.header_str.split('\r\n')
44 first_line = headers[0].split(' ')
45 if len(first_line) == 3:
46 self.method, self.url, self.protocol = headers[0].split(' ')
47 for line in headers:
48 kv = line.split(':')
49 if len(kv) == 2:
50 k, v = kv
51 self.header_dict[k] = v
52
53 class Future(object):
54 def __init__(self,timeout=0):
55 self.result = None
56 self.timeout = timeout
57 self.start = time.time()
58 def main(request):
59 f = Future(5)
60 return f
61
62 def index(request):
63 return "indexasdfasdfasdf"
64
65
66 routers = [
67 ('/main/',main),
68 ('/index/',index),
69 ]
70
71 def run():
72 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
74 sock.bind(("127.0.0.1", 9999,))
75 sock.setblocking(False)
76 sock.listen(128)
77
78 inputs = []
79 inputs.append(sock)
80
81 async_request_dict = {
82 # 'socket': futrue
83 }
84
85 while True:
86 rlist,wlist,elist = select.select(inputs,[],[],0.05)
87 for r in rlist:
88 if r == sock:
89 """新请求到来"""
90 conn,addr = sock.accept()
91 conn.setblocking(False)
92 inputs.append(conn)
93 else:
94 """客户端发来数据"""
95 data = b""
96 while True:
97 try:
98 chunk = r.recv(1024)
99 data = data + chunk
100 except Exception as e:
101 chunk = None
102 if not chunk:
103 break
104 # data进行处理:请求头和请求体
105 request = HttpRequest(data)
106 # 1. 请求头中获取url
107 # 2. 去路由中匹配,获取指定的函数
108 # 3. 执行函数,获取返回值
109 # 4. 将返回值 r.sendall(b'alskdjalksdjf;asfd')
110 import re
111 flag = False
112 func = None
113 for route in routers:
114 if re.match(route[0],request.url):
115 flag = True
116 func = route[1]
117 break
118 if flag:
119 result = func(request)
120 if isinstance(result,Future):
121 async_request_dict[r] = result
122 else:
123 r.sendall(bytes(result,encoding='utf-8'))
124 inputs.remove(r)
125 r.close()
126 else:
127 r.sendall(b"404")
128 inputs.remove(r)
129 r.close()
130
131 for conn in async_request_dict.keys():
132 future = async_request_dict[conn]
133 start = future.start
134 timeout = future.timeout
135 ctime = time.time()
136 if (start + timeout) <= ctime :
137 future.result = b"timeout"
138 if future.result:
139 conn.sendall(future.result)
140 conn.close()
141 del async_request_dict[conn]
142 inputs.remove(conn)
143
144 if __name__ == '__main__':
145 run()

最终版本


举报

相关推荐

0 条评论