Websocket代码示例
WebSocket代码示例,获取实时成交明细、盘口、k线。包括自动重连,心跳检测机制。
package org.example.ws;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;
@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {
// 本地session通道(使用volatile保证多线程可见性)
private volatile Session session;
// wss连接地址 business可以为stock、crypto、common;apikey为您的凭证
private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";
// 定时任务池(统一管理,避免创建多个线程池)
private ScheduledExecutorService scheduledExecutorService;
@PostConstruct
public void connectAll() {
// 初始化线程池(核心线程1个,避免资源浪费)
scheduledExecutorService = Executors.newScheduledThreadPool(1);
try {
// 建立WEBSOCKET连接
connect(WS_URL);
// 开启自动重连(延迟1秒启动,每10秒检查一次)
startReconnection(WS_URL);
} catch (Exception e) {
log.error("初始化WebSocket连接失败: {}", e.getMessage(), e);
}
}
/**
* 自动重连机制
* @param wsUrl WebSocket连接地址
*/
private void startReconnection(String wsUrl) {
Runnable reconnectTask = () -> {
if (session == null || !session.isOpen()) {
log.warn("WebSocket连接已断开,开始重连...");
connect(wsUrl);
}
};
// 延迟1秒启动,每10秒执行一次重连检查
scheduledExecutorService.scheduleAtFixedRate(reconnectTask, 1, 10, TimeUnit.SECONDS);
}
/**
* 建立WEBSOCKET连接具体实现
* @param wsUrl 连接地址
*/
private void connect(String wsUrl) {
try {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
// 关闭旧连接(防止连接泄露)
if (session != null && session.isOpen()) {
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "重新连接"));
}
// 建立新连接
session = container.connectToServer(this, URI.create(wsUrl));
log.info("WebSocket连接成功,sessionId: {}", session.getId());
} catch (DeploymentException | IOException e) {
log.error("WebSocket连接失败: {}", e.getMessage(), e);
// 连接失败时清空session,触发下次重连
session = null;
}
}
/**
* WebSocket连接建立成功后触发
*/
@OnOpen
public void onOpen(Session session) {
// 更新全局session
this.session = session;
log.info("Connection opened: {}", session.getId());
// 异步发送订阅请求(避免阻塞OnOpen线程)
scheduledExecutorService.submit(() -> {
try {
// 1. 订阅实时成交明细(协议号10000)
sendTradeSubscribe(session);
TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒
// 2. 订阅实时盘口数据(协议号10003)
sendDepthSubscribe(session);
TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒
// 3. 订阅1分钟K线数据(协议号10006)
sendKlineSubscribe(session);
// 4. 启动心跳任务(30秒一次)
startHeartbeatTask();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
log.error("发送订阅请求时线程被中断: {}", e.getMessage());
} catch (IOException e) {
log.error("发送订阅请求失败: {}", e.getMessage(), e);
}
});
}
/**
* 发送实时成交明细订阅请求
*/
private void sendTradeSubscribe(Session session) throws IOException {
JSONObject tradeSendObj = new JSONObject();
tradeSendObj.put("code", 10000); // 订阅成交明细协议号
tradeSendObj.put("trace", generateTraceId()); // 自定义traceId
JSONObject data = new JSONObject();
data.put("codes", "BTCUSDT"); // 订阅BTCUSDT
tradeSendObj.put("data", data);
session.getBasicRemote().sendText(tradeSendObj.toJSONString());
log.info("发送成交明细订阅请求: {}", tradeSendObj);
}
/**
* 发送实时盘口数据订阅请求
*/
private void sendDepthSubscribe(Session session) throws IOException {
JSONObject depthSendObj = new JSONObject();
depthSendObj.put("code", 10003); // 订阅盘口协议号
depthSendObj.put("trace", generateTraceId());
JSONObject data = new JSONObject();
data.put("codes", "BTCUSDT");
depthSendObj.put("data", data);
session.getBasicRemote().sendText(depthSendObj.toJSONString());
log.info("发送盘口数据订阅请求: {}", depthSendObj);
}
/**
* 发送实时K线数据订阅请求
*/
private void sendKlineSubscribe(Session session) throws IOException {
JSONObject klineSendObj = new JSONObject();
klineSendObj.put("code", 10006); // 订阅K线协议号
klineSendObj.put("trace", generateTraceId());
JSONObject klineData = new JSONObject();
JSONArray klineDataArray = new JSONArray();
JSONObject kline1minObj = new JSONObject();
kline1minObj.put("type", 1); // 1分钟K线
kline1minObj.put("codes", "BTCUSDT");
klineDataArray.add(kline1minObj);
klineData.put("arr", klineDataArray);
klineSendObj.put("data", klineData);
session.getBasicRemote().sendText(klineSendObj.toJSONString());
log.info("发送K线数据订阅请求: {}", klineSendObj);
}
/**
* 启动心跳保活任务
*/
private void startHeartbeatTask() {
Runnable heartbeatTask = () -> {
try {
if (session != null && session.isOpen()) {
JSONObject pingObj = new JSONObject();
pingObj.put("code", 10010); // 心跳协议号
pingObj.put("trace", generateTraceId());
session.getBasicRemote().sendText(pingObj.toJSONString());
log.debug("发送心跳包: {}", pingObj);
}
} catch (IOException e) {
log.error("发送心跳包失败: {}", e.getMessage(), e);
}
};
// 延迟30秒启动,每30秒发送一次心跳
scheduledExecutorService.scheduleAtFixedRate(heartbeatTask, 30, 30, TimeUnit.SECONDS);
}
/**
* 接收服务端推送的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到服务端消息: {}", message);
// 这里可以解析消息并处理业务逻辑
// 示例:解析JSON消息
try {
JSONObject msgObj = JSONObject.parseObject(message);
Integer code = msgObj.getInteger("code");
String trace = msgObj.getString("trace");
JSONObject data = msgObj.getJSONObject("data");
// 根据协议号区分消息类型
switch (code) {
case 10000: // 成交明细数据
handleTradeData(data);
break;
case 10003: // 盘口数据
handleDepthData(data);
break;
case 10006: // K线数据
handleKlineData(data);
break;
case 10010: // 心跳响应
log.debug("收到心跳响应,trace: {}", trace);
break;
default:
log.warn("未知协议号的消息: {}", code);
}
} catch (Exception e) {
log.error("解析消息失败: {}", e.getMessage(), e);
}
}
/**
* 处理成交明细数据
*/
private void handleTradeData(JSONObject data) {
// 实现业务逻辑,比如入库、推送前端等
log.info("处理成交明细数据: {}", data);
}
/**
* 处理盘口数据
*/
private void handleDepthData(JSONObject data) {
// 实现业务逻辑
log.info("处理盘口数据: {}", data);
}
/**
* 处理K线数据
*/
private void handleKlineData(JSONObject data) {
// 实现业务逻辑
log.info("处理K线数据: {}", data);
}
/**
* 连接关闭时触发
*/
@OnClose
public void onClose(Session session, CloseReason reason) {
log.info("Connection closed: {}, reason: {}", session.getId(), reason);
// 清空session,触发重连
this.session = null;
}
/**
* 连接出错时触发
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket错误,sessionId: {}", session.getId(), error);
// 清空session,触发重连
this.session = null;
}
/**
* 生成唯一traceId(示例实现,可替换为UUID)
*/
private String generateTraceId() {
return java.util.UUID.randomUUID().toString();
}
/**
* 销毁Bean时关闭线程池和连接
*/
@PreDestroy
public void destroy() {
log.info("销毁WebSocket客户端,关闭资源...");
// 关闭session
if (session != null && session.isOpen()) {
try {
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "应用关闭"));
} catch (IOException e) {
log.error("关闭session失败", e);
}
}
// 关闭线程池
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
}
}
}
}最后更新于
