1、安装websocket相关库
pip install websocket
pip install websocket-client
2、代码案例
# coding=utf-8
# 安装websocket相关库
# pip install websocket
# pip install websocket-client
import json
import websocket
from threading import Thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
# send the message, then wait
# so thread doesn't exit and socket
# isn't closed
data = {
"namespace": "middleware-test",
"container": "mysql",
"clusterId": "default--test"
}
ws.send(json.dumps(data))
time.sleep(1000)
time.sleep(1)
ws.close()
print("Thread terminating...")
Thread(target=run).start()
if __name__ == "__main__":
websocket.enableTrace(True)
url = "ws://localhost:8080/terminal"
ws = websocket.WebSocketApp(url, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever()