websocket是一种网络通信协议。
全双工。
弊端:HTTP协议无法实现服务器主动向客户端发起消息,
握手阶段
握手是基于http协议的
数据交互阶段
前端环境搭建
使用vscode,搭建咯
客户端创建WebSocket
给websocket绑定回调函数
//onopen onopen
ws.addEventListener('open', function (event) {
ws.send('Hello Server!');
});
ws.addEventListener("close", function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
});
ws.addEventListener("message", function(event) {
//文本数据
if(typeOf event.data === String) {
console.log("Received data string");
}
var data = event.data;
// 处理数据
});
ws.addEventListener("error", function(event) {
// handle error event
});
send()发送数据
websocket方法
ws.send('your message');
webSocket.readyState
readyState
属性返回实例对象的当前状态,共有四种。
- CONNECTING:值为0,表示正在连接。
- OPEN:值为1,表示连接成功,可以通信了。
- CLOSING:值为2,表示连接正在关闭。
- CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
服务端
1、依赖引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、配置类
注入使用@ServerEndpoint的Websocket
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
Tomcat7.0.5支持 websocket
3、三个属性的必要性
4、消息格式
5、websocket测试
websocket在线测试
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocket {
//存储的消息内容
public static ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<String>(100);
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* 用户ID
*/
private String userId;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
//虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
//注:底下WebSocket是当前类名
private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
// 用来存在线连接用户信息
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
/**
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
//向客户端发送连接成功
sendOneMessage(userId, "建立连接成功");
} catch (Exception e) {
}
}
/**
* 链接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
//发送队列中的内容
if (!arrayBlockingQueue.isEmpty() && message.equals("1")) {//在请求数据
String msa = (String) arrayBlockingQueue.element();
sendOneMessage(userId, msa);
Object remove = arrayBlockingQueue.remove();//移除成功返回移除的数据
}
}
/**
* 发送错误时的处理
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:" + error.getMessage());
error.printStackTrace();
}
// 此为广播消息
public void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
for (WebSocket webSocket : webSockets) {
try {
if (webSocket.session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息(多人)
public void sendMoreMessage(String[] userIds, String message) {
for (String userId : userIds) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}