Websocket代码示例

WebSocket代码示例,获取实时成交明细、盘口、k线。包括自动重连,心跳检测机制。

package org.example.ws;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;

@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {

    // 本地session通道(使用volatile保证多线程可见性)
    private volatile Session session;

    // wss连接地址 business可以为stock、crypto、common;apikey为您的凭证
    private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";

    // 定时任务池(统一管理,避免创建多个线程池)
    private ScheduledExecutorService scheduledExecutorService;

    @PostConstruct
    public void connectAll() {
        // 初始化线程池(核心线程1个,避免资源浪费)
        scheduledExecutorService = Executors.newScheduledThreadPool(1);
        
        try {
            // 建立WEBSOCKET连接
            connect(WS_URL);
            // 开启自动重连(延迟1秒启动,每10秒检查一次)
            startReconnection(WS_URL);
        } catch (Exception e) {
            log.error("初始化WebSocket连接失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 自动重连机制
     * @param wsUrl WebSocket连接地址
     */
    private void startReconnection(String wsUrl) {
        Runnable reconnectTask = () -> {
            if (session == null || !session.isOpen()) {
                log.warn("WebSocket连接已断开,开始重连...");
                connect(wsUrl);
            }
        };
        // 延迟1秒启动,每10秒执行一次重连检查
        scheduledExecutorService.scheduleAtFixedRate(reconnectTask, 1, 10, TimeUnit.SECONDS);
    }

    /**
     * 建立WEBSOCKET连接具体实现
     * @param wsUrl 连接地址
     */
    private void connect(String wsUrl) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            // 关闭旧连接(防止连接泄露)
            if (session != null && session.isOpen()) {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "重新连接"));
            }
            // 建立新连接
            session = container.connectToServer(this, URI.create(wsUrl));
            log.info("WebSocket连接成功,sessionId: {}", session.getId());
        } catch (DeploymentException | IOException e) {
            log.error("WebSocket连接失败: {}", e.getMessage(), e);
            // 连接失败时清空session,触发下次重连
            session = null;
        }
    }

    /**
     * WebSocket连接建立成功后触发
     */
    @OnOpen
    public void onOpen(Session session) {
        // 更新全局session
        this.session = session;
        log.info("Connection opened: {}", session.getId());

        // 异步发送订阅请求(避免阻塞OnOpen线程)
        scheduledExecutorService.submit(() -> {
            try {
                // 1. 订阅实时成交明细(协议号10000)
                sendTradeSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 2. 订阅实时盘口数据(协议号10003)
                sendDepthSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 3. 订阅1分钟K线数据(协议号10006)
                sendKlineSubscribe(session);

                // 4. 启动心跳任务(30秒一次)
                startHeartbeatTask();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                log.error("发送订阅请求时线程被中断: {}", e.getMessage());
            } catch (IOException e) {
                log.error("发送订阅请求失败: {}", e.getMessage(), e);
            }
        });
    }

    /**
     * 发送实时成交明细订阅请求
     */
    private void sendTradeSubscribe(Session session) throws IOException {
        JSONObject tradeSendObj = new JSONObject();
        tradeSendObj.put("code", 10000); // 订阅成交明细协议号
        tradeSendObj.put("trace", generateTraceId()); // 自定义traceId
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT"); // 订阅BTCUSDT
        tradeSendObj.put("data", data);
        session.getBasicRemote().sendText(tradeSendObj.toJSONString());
        log.info("发送成交明细订阅请求: {}", tradeSendObj);
    }

    /**
     * 发送实时盘口数据订阅请求
     */
    private void sendDepthSubscribe(Session session) throws IOException {
        JSONObject depthSendObj = new JSONObject();
        depthSendObj.put("code", 10003); // 订阅盘口协议号
        depthSendObj.put("trace", generateTraceId());
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT");
        depthSendObj.put("data", data);
        session.getBasicRemote().sendText(depthSendObj.toJSONString());
        log.info("发送盘口数据订阅请求: {}", depthSendObj);
    }

    /**
     * 发送实时K线数据订阅请求
     */
    private void sendKlineSubscribe(Session session) throws IOException {
        JSONObject klineSendObj = new JSONObject();
        klineSendObj.put("code", 10006); // 订阅K线协议号
        klineSendObj.put("trace", generateTraceId());
        
        JSONObject klineData = new JSONObject();
        JSONArray klineDataArray = new JSONArray();
        
        JSONObject kline1minObj = new JSONObject();
        kline1minObj.put("type", 1); // 1分钟K线
        kline1minObj.put("codes", "BTCUSDT");
        klineDataArray.add(kline1minObj);
        klineData.put("arr", klineDataArray);
        
        klineSendObj.put("data", klineData);
        session.getBasicRemote().sendText(klineSendObj.toJSONString());
        log.info("发送K线数据订阅请求: {}", klineSendObj);
    }

    /**
     * 启动心跳保活任务
     */
    private void startHeartbeatTask() {
        Runnable heartbeatTask = () -> {
            try {
                if (session != null && session.isOpen()) {
                    JSONObject pingObj = new JSONObject();
                    pingObj.put("code", 10010); // 心跳协议号
                    pingObj.put("trace", generateTraceId());
                    session.getBasicRemote().sendText(pingObj.toJSONString());
                    log.debug("发送心跳包: {}", pingObj);
                }
            } catch (IOException e) {
                log.error("发送心跳包失败: {}", e.getMessage(), e);
            }
        };
        // 延迟30秒启动,每30秒发送一次心跳
        scheduledExecutorService.scheduleAtFixedRate(heartbeatTask, 30, 30, TimeUnit.SECONDS);
    }

    /**
     * 接收服务端推送的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到服务端消息: {}", message);
        // 这里可以解析消息并处理业务逻辑
        // 示例:解析JSON消息
        try {
            JSONObject msgObj = JSONObject.parseObject(message);
            Integer code = msgObj.getInteger("code");
            String trace = msgObj.getString("trace");
            JSONObject data = msgObj.getJSONObject("data");
            
            // 根据协议号区分消息类型
            switch (code) {
                case 10000: // 成交明细数据
                    handleTradeData(data);
                    break;
                case 10003: // 盘口数据
                    handleDepthData(data);
                    break;
                case 10006: // K线数据
                    handleKlineData(data);
                    break;
                case 10010: // 心跳响应
                    log.debug("收到心跳响应,trace: {}", trace);
                    break;
                default:
                    log.warn("未知协议号的消息: {}", code);
            }
        } catch (Exception e) {
            log.error("解析消息失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 处理成交明细数据
     */
    private void handleTradeData(JSONObject data) {
        // 实现业务逻辑,比如入库、推送前端等
        log.info("处理成交明细数据: {}", data);
    }

    /**
     * 处理盘口数据
     */
    private void handleDepthData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理盘口数据: {}", data);
    }

    /**
     * 处理K线数据
     */
    private void handleKlineData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理K线数据: {}", data);
    }

    /**
     * 连接关闭时触发
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("Connection closed: {}, reason: {}", session.getId(), reason);
        // 清空session,触发重连
        this.session = null;
    }

    /**
     * 连接出错时触发
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket错误,sessionId: {}", session.getId(), error);
        // 清空session,触发重连
        this.session = null;
    }

    /**
     * 生成唯一traceId(示例实现,可替换为UUID)
     */
    private String generateTraceId() {
        return java.util.UUID.randomUUID().toString();
    }

    /**
     * 销毁Bean时关闭线程池和连接
     */
    @PreDestroy
    public void destroy() {
        log.info("销毁WebSocket客户端,关闭资源...");
        // 关闭session
        if (session != null && session.isOpen()) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "应用关闭"));
            } catch (IOException e) {
                log.error("关闭session失败", e);
            }
        }
        // 关闭线程池
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
            try {
                if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduledExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduledExecutorService.shutdownNow();
            }
        }
    }
}

最后更新于