> For the complete documentation index, see [llms.txt](https://docs.infoway.io/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://docs.infoway.io/websocket-api/code-examples.md).

# Websocket代码示例

{% tabs %}
{% tab title="Java" %}

```java
package com.ws.server;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;

@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {

    // 本地session通道（使用volatile保证多线程可见性）
    private volatile Session session;

    // wss连接地址 business可以为stock、crypto、common；apikey为您的凭证
    private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";

    // 定时任务池（统一管理，避免创建多个线程池）
    private ScheduledExecutorService scheduledExecutorService;

    // 心跳定时任务（全局仅保留一份，重连前须取消）
    private volatile ScheduledFuture<?> heartbeatFuture;

    @PostConstruct
    public void connectAll() {
        // 初始化线程池（核心线程1个，避免资源浪费）
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

        try {
            // 建立WEBSOCKET连接
            connect(WS_URL);
            // 开启自动重连（延迟1秒启动，每10秒检查一次）
            startReconnection(WS_URL);
        } catch (Exception e) {
            log.error("初始化WebSocket连接失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 自动重连机制
     * @param wsUrl WebSocket连接地址
     */
    private void startReconnection(String wsUrl) {
        Runnable reconnectTask = () -> {
            if (session == null || !session.isOpen()) {
                log.warn("WebSocket连接已断开，开始重连...");
                connect(wsUrl);
            }
        };
        // 延迟1秒启动，每10秒执行一次重连检查
        scheduledExecutorService.scheduleAtFixedRate(reconnectTask, 1, 10, TimeUnit.SECONDS);
    }

    /**
     * 建立WEBSOCKET连接具体实现
     * @param wsUrl 连接地址
     */
    private void connect(String wsUrl) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();

            cancelHeartbeatTask();

            // 关闭旧连接（防止连接泄露）
            if (session != null && session.isOpen()) {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "重新连接"));
            }
            // 建立新连接
            session = container.connectToServer(this, URI.create(wsUrl));
            log.info("WebSocket连接成功，sessionId: {}", session.getId());
        } catch (DeploymentException | IOException e) {
            log.error("WebSocket连接失败: {}", e.getMessage(), e);
            // 连接失败时清空session，触发下次重连
            session = null;
        }
    }

    /**
     * WebSocket连接建立成功后触发
     */
    @OnOpen
    public void onOpen(Session session) {
        // 更新全局session
        this.session = session;
        log.info("Connection opened: {}", session.getId());

        // 异步发送订阅请求（避免阻塞OnOpen线程）
        scheduledExecutorService.submit(() -> {
            try {
                // 1. 订阅实时成交明细（协议号10000）
                sendTradeSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 2. 订阅实时盘口数据（协议号10003）
                sendDepthSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 3. 订阅1分钟K线数据（协议号10006）
                sendKlineSubscribe(session);

                // 4. 启动心跳任务（30秒一次，绑定当前 session）
                startHeartbeatTask(session);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                log.error("发送订阅请求时线程被中断: {}", e.getMessage());
            } catch (IOException e) {
                log.error("发送订阅请求失败: {}", e.getMessage(), e);
            }
        });
    }

    /**
     * 发送实时成交明细订阅请求
     */
    private void sendTradeSubscribe(Session session) throws IOException {
        JSONObject tradeSendObj = new JSONObject();
        tradeSendObj.put("code", 10000); // 订阅成交明细协议号
        tradeSendObj.put("trace", generateTraceId()); // 自定义traceId
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT"); // 订阅BTCUSDT
        tradeSendObj.put("data", data);
        session.getBasicRemote().sendText(tradeSendObj.toJSONString());
        log.info("发送成交明细订阅请求: {}", tradeSendObj);
    }

    /**
     * 发送实时盘口数据订阅请求
     */
    private void sendDepthSubscribe(Session session) throws IOException {
        JSONObject depthSendObj = new JSONObject();
        depthSendObj.put("code", 10003); // 订阅盘口协议号
        depthSendObj.put("trace", generateTraceId());
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT");
        depthSendObj.put("data", data);
        session.getBasicRemote().sendText(depthSendObj.toJSONString());
        log.info("发送盘口数据订阅请求: {}", depthSendObj);
    }

    /**
     * 发送实时K线数据订阅请求
     */
    private void sendKlineSubscribe(Session session) throws IOException {
        JSONObject klineSendObj = new JSONObject();
        klineSendObj.put("code", 10006); // 订阅K线协议号
        klineSendObj.put("trace", generateTraceId());

        JSONObject klineData = new JSONObject();
        JSONArray klineDataArray = new JSONArray();

        JSONObject kline1minObj = new JSONObject();
        kline1minObj.put("type", 1); // 1分钟K线
        kline1minObj.put("codes", "BTCUSDT");
        klineDataArray.add(kline1minObj);
        klineData.put("arr", klineDataArray);

        klineSendObj.put("data", klineData);
        session.getBasicRemote().sendText(klineSendObj.toJSONString());
        log.info("发送K线数据订阅请求: {}", klineSendObj);
    }

    /**
     * 取消心跳定时任务
     */
    private void cancelHeartbeatTask() {
        ScheduledFuture<?> future = heartbeatFuture;
        if (future != null) {
            future.cancel(false);
            heartbeatFuture = null;
        }
    }

    /**
     * 启动心跳保活任务（重连时会先取消旧任务，保证全局只有一个心跳调度）
     */
    private void startHeartbeatTask(Session activeSession) {
        cancelHeartbeatTask();

        Runnable heartbeatTask = () -> {
            try {
                if (activeSession.isOpen() && activeSession == this.session) {
                    JSONObject pingObj = new JSONObject();
                    pingObj.put("code", 10010); // 心跳协议号
                    pingObj.put("trace", generateTraceId());
                    activeSession.getBasicRemote().sendText(pingObj.toJSONString());
                    log.debug("发送心跳包: {}", pingObj);
                }
            } catch (IOException e) {
                log.error("发送心跳包失败: {}", e.getMessage(), e);
            }
        };
        // 延迟30秒启动，每30秒发送一次心跳
        heartbeatFuture = scheduledExecutorService.scheduleAtFixedRate(heartbeatTask, 30, 30, TimeUnit.SECONDS);
    }

    /**
     * 接收服务端推送的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到服务端消息: {}", message);
        // 这里可以解析消息并处理业务逻辑
        // 示例：解析JSON消息
        try {
            JSONObject msgObj = JSONObject.parseObject(message);
            Integer code = msgObj.getInteger("code");
            String trace = msgObj.getString("trace");
            JSONObject data = msgObj.getJSONObject("data");

            // 根据协议号区分消息类型
            switch (code) {
                case 10000: // 成交明细数据
                    handleTradeData(data);
                    break;
                case 10003: // 盘口数据
                    handleDepthData(data);
                    break;
                case 10006: // K线数据
                    handleKlineData(data);
                    break;
                case 10010: // 心跳响应
                    log.debug("收到心跳响应，trace: {}", trace);
                    break;
                default:
                    log.warn("未知协议号的消息: {}", code);
            }
        } catch (Exception e) {
            log.error("解析消息失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 处理成交明细数据
     */
    private void handleTradeData(JSONObject data) {
        // 实现业务逻辑，比如入库、推送前端等
        log.info("处理成交明细数据: {}", data);
    }

    /**
     * 处理盘口数据
     */
    private void handleDepthData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理盘口数据: {}", data);
    }

    /**
     * 处理K线数据
     */
    private void handleKlineData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理K线数据: {}", data);
    }

    /**
     * 连接关闭时触发
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("Connection closed: {}, reason: {}", session.getId(), reason);
        cancelHeartbeatTask();
        // 清空session，触发重连
        this.session = null;
    }

    /**
     * 连接出错时触发
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket错误，sessionId: {}", session.getId(), error);
        cancelHeartbeatTask();
        // 清空session，触发重连
        this.session = null;
    }

    /**
     * 生成唯一traceId（示例实现，可替换为UUID）
     */
    private String generateTraceId() {
        return java.util.UUID.randomUUID().toString();
    }

    /**
     * 销毁Bean时关闭线程池和连接
     */
    @PreDestroy
    public void destroy() {
        log.info("销毁WebSocket客户端，关闭资源...");
        cancelHeartbeatTask();
        // 关闭session
        if (session != null && session.isOpen()) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "应用关闭"));
            } catch (IOException e) {
                log.error("关闭session失败", e);
            }
        }
        // 关闭线程池
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
            try {
                if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduledExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduledExecutorService.shutdownNow();
            }
        }
    }
}
```

{% endtab %}

{% tab title="Python" %}

```python
import os
import asyncio
import json
import uuid
import logging
from typing import Optional

import websockets
from websockets.asyncio.client import ClientConnection
from websockets.exceptions import ConnectionClosed

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("websocket-client")

# --- 协议号 -----------------------------------------------------------------
# 服务端遵循 “请求码 +1 = 订阅确认，请求码 +2 = 数据推送” 的约定。
# 订阅请求码（由客户端发送）：
REQ_TRADE = 10000   # 实时成交明细
REQ_DEPTH = 10003   # 盘口 / 深度
REQ_KLINE = 10006   # K线
REQ_HEARTBEAT = 10010

# 数据推送码（由服务端发送）：
PUSH_CONNECTED = 200     # 连接建立成功
PUSH_TRADE = 10002       # 成交数据
PUSH_DEPTH = 10005       # 盘口数据
PUSH_KLINE = 10008       # K线数据
# 订阅确认码（10001 / 10004 / 10007）仅表示订阅成功。
ACK_CODES = {10001, 10004, 10007}


class CryptoWebsocketClient:
    """加密货币行情 WebSocket 客户端（兼容 websockets >= 14）。

    单连接设计：由一个主循环统一管理连接生命周期
    （连接 -> 订阅 -> 监听），断开后按指数退避重连。
    不再有并行的重连任务，避免重复开启连接（这是触发服务端
    429 限流的根因之一）。
    """

    def __init__(self, api_key: str, business: str = "crypto"):
        # WebSocket 连接配置
        self.ws_url = f"wss://data.infoway.io/ws?business={business}&apikey={api_key}"
        # 核心状态
        self.ws: Optional[ClientConnection] = None
        self.running = True
        # 重连退避配置（秒）
        self.reconnect_base = 5
        self.reconnect_max = 60
        # 心跳
        self.heartbeat_interval = 30  # 秒
        self.heartbeat_task: Optional[asyncio.Task] = None

    @staticmethod
    def _generate_trace_id() -> str:
        """生成唯一 trace ID（对等 Java 的 UUID）。"""
        return str(uuid.uuid4())

    # --- 订阅 ---------------------------------------------------------------
    async def _send(self, msg: dict) -> None:
        """序列化并通过当前连接发送消息。"""
        assert self.ws is not None
        await self.ws.send(json.dumps(msg))

    async def _send_all_subscribe_requests(self) -> None:
        """发送所有订阅请求（成交明细、盘口、K线）。"""
        # 1. 实时成交明细（协议号 10000）
        await self._send({
            "code": REQ_TRADE,
            "trace": self._generate_trace_id(),
            "data": {"codes": "BTCUSDT"},
        })
        logger.info("发送成交明细订阅 (code=%s)", REQ_TRADE)

        # 2. 实时盘口数据（协议号 10003）
        await self._send({
            "code": REQ_DEPTH,
            "trace": self._generate_trace_id(),
            "data": {"codes": "BTCUSDT"},
        })
        logger.info("发送盘口数据订阅 (code=%s)", REQ_DEPTH)

        # 3. 1 分钟 K 线（协议号 10006，type=1 表示 1 分钟）
        await self._send({
            "code": REQ_KLINE,
            "trace": self._generate_trace_id(),
            "data": {"arr": [{"type": 1, "codes": "BTCUSDT"}]},
        })
        logger.info("发送K线数据订阅 (code=%s)", REQ_KLINE)

    # --- 心跳 ---------------------------------------------------------------
    def _start_heartbeat_task(self) -> None:
        """为当前连接启动后台心跳任务。"""
        self._cancel_heartbeat_task()

        async def heartbeat_loop():
            try:
                while True:
                    await asyncio.sleep(self.heartbeat_interval)
                    if self.ws is None or self.ws.close_code is not None:
                        break
                    await self._send({
                        "code": REQ_HEARTBEAT,
                        "trace": self._generate_trace_id(),
                    })
                    logger.debug("发送心跳包")
            except (ConnectionClosed, asyncio.CancelledError):
                pass
            except Exception as e:
                logger.error("心跳任务异常: %s", e)

        self.heartbeat_task = asyncio.create_task(heartbeat_loop())

    def _cancel_heartbeat_task(self) -> None:
        """取消正在运行的心跳任务（如果有）。"""
        if self.heartbeat_task and not self.heartbeat_task.done():
            self.heartbeat_task.cancel()
        self.heartbeat_task = None

    # --- 消息处理 -----------------------------------------------------------
    def _handle_received_message(self, message) -> None:
        """按协议号分发处理收到的消息。

        ``message`` 可能是 ``str`` 或 ``bytes``，取决于帧类型。
        """
        try:
            msg_data = json.loads(message)
        except json.JSONDecodeError:
            logger.error("消息格式错误，无法解析 JSON: %s", message)
            return

        code = msg_data.get("code")
        trace = msg_data.get("trace")
        data = msg_data.get("data", {})

        if code == PUSH_CONNECTED:
            logger.info("连接建立成功: %s", msg_data.get("msg"))
        elif code == PUSH_TRADE:
            logger.info("成交数据: %s", data)
        elif code == PUSH_DEPTH:
            logger.info("盘口数据: %s", data)
        elif code == PUSH_KLINE:
            logger.info("K线数据: %s", data)
        elif code in ACK_CODES:
            logger.info("订阅确认 [code=%s, trace=%s]: %s",
                        code, trace, msg_data.get("msg"))
        elif code == REQ_HEARTBEAT:
            logger.debug("收到心跳响应 [trace=%s]", trace)
        else:
            logger.warning("未处理的消息 [code=%s]: %s", code, message)

    # --- 连接生命周期 -------------------------------------------------------
    async def _connect_once(self) -> None:
        """建立一次连接，完成订阅，并监听直到连接关闭。"""
        async with websockets.connect(self.ws_url) as ws:
            self.ws = ws
            logger.info("WebSocket 连接成功，地址: %s", self.ws_url)

            await self._send_all_subscribe_requests()
            self._start_heartbeat_task()

            try:
                # 持续迭代直到连接关闭（会抛出 ConnectionClosed）
                async for message in ws:
                    self._handle_received_message(message)
            finally:
                self._cancel_heartbeat_task()
                self.ws = None

    async def start(self) -> None:
        """启动客户端，并以指数退避保持连接。"""
        backoff = self.reconnect_base
        try:
            while self.running:
                try:
                    await self._connect_once()
                    # 服务端正常结束流；重置退避。
                    backoff = self.reconnect_base
                    logger.warning("连接被服务端关闭")
                except ConnectionClosed as e:
                    logger.warning("连接已关闭: %s", e)
                    backoff = self.reconnect_base
                except Exception as e:
                    logger.error("连接异常: %s", e)

                if not self.running:
                    break

                logger.info("将在 %s 秒后重连...", backoff)
                await asyncio.sleep(backoff)
                # 指数退避，上限为 reconnect_max。
                backoff = min(backoff * 2, self.reconnect_max)
        finally:
            self.running = False
            self._cancel_heartbeat_task()
            if self.ws is not None and self.ws.close_code is None:
                await self.ws.close()
            logger.info("WebSocket 客户端已停止")

    def stop(self) -> None:
        """通知客户端停止重连。"""
        self.running = False


async def main():
    """主函数。"""
    # API Key 从环境变量读取，请勿将真实 Key 硬编码到代码中：
    #   export INFOWAY_API_KEY="你的API Key"
    api_key = os.environ.get("INFOWAY_API_KEY", "YOUR_API_KEY")
    if api_key == "YOUR_API_KEY":
        logger.warning("未设置 INFOWAY_API_KEY 环境变量，请填入你的真实 API Key")
    client = CryptoWebsocketClient(api_key=api_key)
    await client.start()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("用户中断，退出客户端")
    except Exception as e:
        logger.error("客户端运行异常: %s", e)

```

{% endtab %}

{% tab title="Go" %}

```go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// CryptoWebsocketClient 加密货币行情WebSocket客户端
type CryptoWebsocketClient struct {
	apiKey            string
	business          string
	wsURL             string
	conn              *websocket.Conn
	isConnected       bool
	reconnectInterval time.Duration
	heartbeatInterval time.Duration
	done              chan struct{}
	connCancel        context.CancelFunc // 取消当前连接关联的后台任务（心跳/监听/订阅）
}

// NewCryptoWebsocketClient 创建客户端实例
func NewCryptoWebsocketClient(apiKey, business string) *CryptoWebsocketClient {
	params := url.Values{}
	params.Set("business", business)
	params.Set("apikey", apiKey)
	wsURL := fmt.Sprintf("wss://data.infoway.io/ws?%s", params.Encode())

	return &CryptoWebsocketClient{
		apiKey:            apiKey,
		business:          business,
		wsURL:             wsURL,
		reconnectInterval: 10 * time.Second,
		heartbeatInterval: 30 * time.Second,
		done:              make(chan struct{}),
	}
}

func (c *CryptoWebsocketClient) generateTraceID() string {
	return uuid.New().String()
}

// stopConnectionTasks 停止当前连接的所有后台 goroutine，防止重连后心跳任务累积
func (c *CryptoWebsocketClient) stopConnectionTasks() {
	if c.connCancel != nil {
		c.connCancel()
		c.connCancel = nil
	}
}

func wait(ctx context.Context, d time.Duration) bool {
	timer := time.NewTimer(d)
	defer timer.Stop()
	select {
	case <-ctx.Done():
		return false
	case <-timer.C:
		return true
	}
}

func (c *CryptoWebsocketClient) connect() error {
	c.stopConnectionTasks()

	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}

	conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, nil)
	if err != nil {
		c.isConnected = false
		return fmt.Errorf("连接失败: %w", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	c.connCancel = cancel
	c.conn = conn
	c.isConnected = true
	log.Printf("WebSocket连接成功: %s", c.wsURL)

	go c.sendAllSubscribeRequests(ctx)
	go c.startHeartbeat(ctx)
	go c.listenMessages(ctx)

	return nil
}

func (c *CryptoWebsocketClient) sendAllSubscribeRequests(ctx context.Context) {
	if !c.isConnected || c.conn == nil {
		log.Println("连接未建立，无法发送订阅请求")
		return
	}

	if err := c.sendTradeSubscribe(); err != nil {
		log.Printf("发送成交明细订阅失败: %v", err)
		return
	}
	if !wait(ctx, 5*time.Second) {
		return
	}

	if err := c.sendDepthSubscribe(); err != nil {
		log.Printf("发送盘口订阅失败: %v", err)
		return
	}
	if !wait(ctx, 5*time.Second) {
		return
	}

	if err := c.sendKlineSubscribe(); err != nil {
		log.Printf("发送K线订阅失败: %v", err)
	}
}

func (c *CryptoWebsocketClient) sendTradeSubscribe() error {
	msg := map[string]interface{}{
		"code":  10000,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

func (c *CryptoWebsocketClient) sendDepthSubscribe() error {
	msg := map[string]interface{}{
		"code":  10003,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

func (c *CryptoWebsocketClient) sendKlineSubscribe() error {
	msg := map[string]interface{}{
		"code":  10006,
		"trace": c.generateTraceID(),
		"data": map[string]interface{}{
			"arr": []map[string]interface{}{
				{
					"type":  1,
					"codes": "BTCUSDT",
				},
			},
		},
	}
	return c.sendJSONMessage(msg)
}

func (c *CryptoWebsocketClient) sendHeartbeat() error {
	msg := map[string]interface{}{
		"code":  10010,
		"trace": c.generateTraceID(),
	}
	return c.sendJSONMessage(msg)
}

func (c *CryptoWebsocketClient) sendJSONMessage(msg interface{}) error {
	if !c.isConnected || c.conn == nil {
		return fmt.Errorf("连接已断开")
	}

	data, err := json.Marshal(msg)
	if err != nil {
		return fmt.Errorf("消息序列化失败: %w", err)
	}

	if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
		c.isConnected = false
		return fmt.Errorf("发送消息失败: %w", err)
	}

	log.Printf("发送消息: %s", string(data))
	return nil
}

func (c *CryptoWebsocketClient) startHeartbeat(ctx context.Context) {
	ticker := time.NewTicker(c.heartbeatInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-c.done:
			return
		case <-ticker.C:
			if err := c.sendHeartbeat(); err != nil {
				log.Printf("发送心跳失败: %v", err)
				return
			}
			log.Printf("发送心跳包成功")
		}
	}
}

func (c *CryptoWebsocketClient) listenMessages(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-c.done:
			return
		default:
		}

		if !c.isConnected || c.conn == nil {
			return
		}

		_, msgData, err := c.conn.ReadMessage()
		if err != nil {
			c.isConnected = false
			log.Printf("读取消息失败/连接断开: %v", err)
			return
		}

		log.Printf("收到服务端消息: %s", string(msgData))
		c.handleReceivedMessage(msgData)
	}
}

func (c *CryptoWebsocketClient) handleReceivedMessage(msgData []byte) {
	var msg map[string]interface{}
	if err := json.Unmarshal(msgData, &msg); err != nil {
		log.Printf("解析消息失败: %v", err)
		return
	}

	code := int(msg["code"].(float64))
	trace := msg["trace"].(string)
	data := msg["data"]

	switch code {
	case 10000:
		log.Printf("处理成交明细数据 [trace=%s]: %+v", trace, data)
	case 10003:
		log.Printf("处理盘口数据 [trace=%s]: %+v", trace, data)
	case 10006:
		log.Printf("处理K线数据 [trace=%s]: %+v", trace, data)
	case 10010:
		log.Printf("收到心跳响应 [trace=%s]", trace)
	default:
		log.Printf("未知协议号消息 [code=%d]: %s", code, string(msgData))
	}
}

func (c *CryptoWebsocketClient) startReconnectLoop() {
	for {
		select {
		case <-c.done:
			return
		default:
			if !c.isConnected {
				log.Printf("尝试重连WebSocket（间隔%v）...", c.reconnectInterval)
				time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
				if err := c.connect(); err != nil {
					log.Printf("重连失败: %v", err)
					time.Sleep(c.reconnectInterval)
					continue
				}
			}
			time.Sleep(1 * time.Second)
		}
	}
}

func (c *CryptoWebsocketClient) Start() {
	go c.startReconnectLoop()

	if err := c.connect(); err != nil {
		log.Printf("首次连接失败: %v", err)
	}

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	log.Println("收到退出信号，关闭客户端...")
	close(c.done)
	c.stopConnectionTasks()
	if c.conn != nil {
		c.conn.Close()
	}
	c.isConnected = false
}

func main() {
	apiKey := "yourApikey"
	business := "crypto"

	client := NewCryptoWebsocketClient(apiKey, business)
	client.Start()
}

```

{% endtab %}

{% tab title="Php" %}

```php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use WebSocket\Client;
use WebSocket\ConnectionException;

/**
 * 加密货币行情WebSocket客户端
 */
class CryptoWebsocketClient
{
    // 配置项
    private $apiKey;             // 认证API Key
    private $business;           // 业务类型：stock/crypto/common
    private $wsUrl;              // WebSocket连接地址
    private $reconnectInterval;  // 重连间隔（秒）
    private $heartbeatInterval;  // 心跳间隔（秒）
    
    // 运行状态
    private $client;             // WebSocket客户端实例
    private $isConnected = false;// 连接状态
    private $running = true;     // 客户端运行状态

    /**
     * 构造函数
     * @param string $apiKey API密钥
     * @param string $business 业务类型
     */
    public function __construct(string $apiKey, string $business = 'crypto')
    {
        $this->apiKey = $apiKey;
        $this->business = $business;
        // 构建WS URL
        $this->wsUrl = sprintf(
            'wss://data.infoway.io/ws?business=%s&apikey=%s',
            urlencode($business),
            urlencode($apiKey)
        );
        $this->reconnectInterval = 10;  // 10秒重连间隔
        $this->heartbeatInterval = 30;  // 30秒心跳间隔
    }

    /**
     * 生成唯一Trace ID
     * @return string
     */
    private function generateTraceId(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }

    /**
     * 建立WebSocket连接
     * @return bool
     */
    private function connect(): bool
    {
        try {
            // 关闭旧连接
            if ($this->client) {
                try {
                    $this->client->close();
                } catch (Exception $e) {
                    // 忽略关闭异常
                }
                $this->client = null;
            }

            // 创建新连接
            $this->client = new Client($this->wsUrl, [
                'timeout' => 30,
                'fragment_size' => 8192,
            ]);
            $this->isConnected = true;
            echo date('Y-m-d H:i:s') . " - WebSocket连接成功: {$this->wsUrl}\n";
            
            // 发送所有订阅请求
            $this->sendAllSubscribeRequests();
            
            return true;
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - WebSocket连接失败: {$e->getMessage()}\n";
            return false;
        } catch (Exception $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接异常: {$e->getMessage()}\n";
            return false;
        }
    }

    /**
     * 发送所有订阅请求
     */
    private function sendAllSubscribeRequests(): void
    {
        if (!$this->isConnected || !$this->client) {
            echo date('Y-m-d H:i:s') . " - 连接未建立，无法发送订阅请求\n";
            return;
        }

        try {
            // 1. 订阅实时成交明细（协议号10000）
            $this->sendTradeSubscribe();
            sleep(5); // 间隔5秒

            // 2. 订阅实时盘口数据（协议号10003）
            $this->sendDepthSubscribe();
            sleep(5); // 间隔5秒

            // 3. 订阅1分钟K线数据（协议号10006）
            $this->sendKlineSubscribe();
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 发送订阅请求失败: {$e->getMessage()}\n";
            $this->isConnected = false;
        }
    }

    /**
     * 发送成交明细订阅请求
     */
    private function sendTradeSubscribe(): void
    {
        $msg = [
            'code'  => 10000,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送盘口数据订阅请求
     */
    private function sendDepthSubscribe(): void
    {
        $msg = [
            'code'  => 10003,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送K线数据订阅请求
     */
    private function sendKlineSubscribe(): void
    {
        $msg = [
            'code'  => 10006,
            'trace' => $this->generateTraceId(),
            'data'  => [
                'arr' => [
                    ['type' => 1, 'codes' => 'BTCUSDT'] // 1分钟K线
                ]
            ]
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送心跳包
     */
    private function sendHeartbeat(): void
    {
        $msg = [
            'code'  => 10010,
            'trace' => $this->generateTraceId()
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送JSON格式消息
     * @param array $msg 消息数组
     */
    private function sendJsonMessage(array $msg): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            $jsonStr = json_encode($msg, JSON_UNESCAPED_UNICODE);
            $this->client->send($jsonStr);
            echo date('Y-m-d H:i:s') . " - 发送消息: {$jsonStr}\n";
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 发送消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 消息序列化失败: {$e->getMessage()}\n";
        }
    }

    /**
     * 监听服务端消息
     */
    private function listenMessages(): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            // 设置非阻塞读取（避免卡死）
            $socket = $this->client->getSocket();
            stream_set_blocking($socket, false);

            $message = $this->client->receive();
            if ($message !== '') {
                echo date('Y-m-d H:i:s') . " - 收到消息: {$message}\n";
                $this->handleReceivedMessage($message);
            }
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接断开/读取消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            // 忽略非阻塞读取的空消息异常
            if (strpos($e->getMessage(), 'no data received') === false) {
                echo date('Y-m-d H:i:s') . " - 监听消息异常: {$e->getMessage()}\n";
            }
        }
    }

    /**
     * 处理接收到的消息
     * @param string $message 原始消息字符串
     */
    private function handleReceivedMessage(string $message): void
    {
        try {
            $msg = json_decode($message, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                echo date('Y-m-d H:i:s') . " - 解析消息失败: 格式错误\n";
                return;
            }

            $code = $msg['code'] ?? 0;
            $trace = $msg['trace'] ?? '';
            $data = $msg['data'] ?? [];

            switch ($code) {
                case 10000:
                    echo date('Y-m-d H:i:s') . " - 处理成交明细 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10003:
                    echo date('Y-m-d H:i:s') . " - 处理盘口数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10006:
                    echo date('Y-m-d H:i:s') . " - 处理K线数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10010:
                    echo date('Y-m-d H:i:s') . " - 收到心跳响应 [trace={$trace}]\n";
                    break;
                default:
                    echo date('Y-m-d H:i:s') . " - 未知协议号 [code={$code}]: {$message}\n";
            }
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 处理消息异常: {$e->getMessage()}\n";
        }
    }

    /**
     * 启动心跳任务
     * @param int $lastHeartbeatTime 上次心跳时间戳引用
     */
    private function checkHeartbeat(&$lastHeartbeatTime): void
    {
        $currentTime = time();
        if ($currentTime - $lastHeartbeatTime >= $this->heartbeatInterval) {
            $this->sendHeartbeat();
            $lastHeartbeatTime = $currentTime;
        }
    }

    /**
     * 启动客户端
     */
    public function start(): void
    {
        echo date('Y-m-d H:i:s') . " - 启动WebSocket客户端...\n";
        
        // 注册退出信号处理
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, function () {
            echo "\n" . date('Y-m-d H:i:s') . " - 收到退出信号，停止客户端...\n";
            $this->running = false;
            if ($this->client) {
                $this->client->close();
            }
            exit(0);
        });

        $lastHeartbeatTime = 0;
        $reconnectDelay = 0;

        // 主循环
        while ($this->running) {
            // 处理重连逻辑
            if (!$this->isConnected) {
                if ($reconnectDelay <= 0) {
                    $this->connect();
                    $reconnectDelay = $this->reconnectInterval;
                    $lastHeartbeatTime = time();
                } else {
                    $reconnectDelay--;
                    echo date('Y-m-d H:i:s') . " - 等待重连... ({$reconnectDelay}秒)\n";
                }
            } else {
                // 监听消息
                $this->listenMessages();
                // 检查心跳
                $this->checkHeartbeat($lastHeartbeatTime);
            }

            // 主循环延迟（避免CPU占用过高）
            usleep(100000); // 100毫秒
        }

        // 清理资源
        if ($this->client) {
            $this->client->close();
        }
        echo date('Y-m-d H:i:s') . " - 客户端已停止\n";
    }

    /**
     * 析构函数
     */
    public function __destruct()
    {
        if ($this->client) {
            try {
                $this->client->close();
            } catch (Exception $e) {
                // 忽略
            }
        }
    }
}

// 主程序入口
if (php_sapi_name() !== 'cli') {
    die("该脚本仅支持CLI模式运行！");
}

// 配置（替换为你的实际API Key）
$apiKey = 'yourApikey';
$business = 'crypto';

// 创建并启动客户端
$client = new CryptoWebsocketClient($apiKey, $business);
$client->start();
```

{% endtab %}

{% tab title="JavaScript" %}

```javascript
/**
 * 加密货币行情WebSocket客户端（JS通用版）
 * 支持浏览器/Node.js环境（Node.js需确保版本≥10.0.0）
 */
class CryptoWebsocketClient {
  /**
   * 构造函数
   * @param {string} apiKey - 认证API Key
   * @param {string} business - 业务类型：stock/crypto/common
   */
  constructor(apiKey, business = 'crypto') {
    // 配置项
    this.apiKey = apiKey;
    this.business = business;
    this.wsUrl = `wss://data.infoway.io/ws?business=${encodeURIComponent(business)}&apikey=${encodeURIComponent(apiKey)}`;
    this.reconnectInterval = 10000; // 重连间隔（10秒，单位ms）
    this.heartbeatInterval = 30000; // 心跳间隔（30秒，单位ms）

    // 运行状态
    this.ws = null; // WebSocket实例
    this.isConnected = false; // 连接状态
    this.heartbeatTimer = null; // 心跳定时器
    this.reconnectTimer = null; // 重连定时器
    this.subscribeSent = false; // 订阅请求是否已发送

    // 绑定方法上下文（避免回调中this丢失）
    this.onOpen = this.onOpen.bind(this);
    this.onMessage = this.onMessage.bind(this);
    this.onClose = this.onClose.bind(this);
    this.onError = this.onError.bind(this);
    this.sendHeartbeat = this.sendHeartbeat.bind(this);
  }

  /**
   * 生成唯一Trace ID（模拟UUID）
   * @returns {string}
   */
  generateTraceId() {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
      const r = Math.random() * 16 | 0;
      const v = c === 'x' ? r : (r & 0x3 | 0x8);
      return v.toString(16);
    });
  }

  /**
   * 建立WebSocket连接
   */
  connect() {
    // 清理旧连接
    if (this.ws) {
      try {
        this.ws.close(1000, 'Reconnecting');
      } catch (e) {
        console.error('关闭旧连接失败:', e);
      }
      this.ws = null;
    }

    try {
      // 创建新连接
      this.ws = new WebSocket(this.wsUrl);
      // 注册事件回调
      this.ws.onopen = this.onOpen;
      this.ws.onmessage = this.onMessage;
      this.ws.onclose = this.onClose;
      this.ws.onerror = this.onError;
      console.log(`[${new Date().toLocaleString()}] 正在连接WebSocket: ${this.wsUrl}`);
    } catch (e) {
      this.isConnected = false;
      console.error(`[${new Date().toLocaleString()}] 连接失败:`, e);
      this.scheduleReconnect();
    }
  }

  /**
   * 连接成功回调
   */
  onOpen() {
    this.isConnected = true;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] WebSocket连接成功`);

    // 清理重连定时器
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 发送订阅请求
    this.sendAllSubscribeRequests();

    // 启动心跳
    this.startHeartbeat();
  }

  /**
   * 发送所有订阅请求
   */
  sendAllSubscribeRequests() {
    if (this.subscribeSent || !this.isConnected) return;
    this.subscribeSent = true;

    // 1. 订阅实时成交明细（协议号10000）
    this.sendTradeSubscribe();
    
    // 间隔5秒发送盘口订阅
    setTimeout(() => {
      if (this.isConnected) {
        this.sendDepthSubscribe();
      }
    }, 5000);

    // 间隔10秒发送K线订阅（5+5）
    setTimeout(() => {
      if (this.isConnected) {
        this.sendKlineSubscribe();
      }
    }, 10000);
  }

  /**
   * 发送成交明细订阅请求
   */
  sendTradeSubscribe() {
    const msg = {
      code: 10000,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送盘口数据订阅请求
   */
  sendDepthSubscribe() {
    const msg = {
      code: 10003,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送K线数据订阅请求
   */
  sendKlineSubscribe() {
    const msg = {
      code: 10006,
      trace: this.generateTraceId(),
      data: {
        arr: [{ type: 1, codes: 'BTCUSDT' }] // 1分钟K线
      }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送心跳包
   */
  sendHeartbeat() {
    const msg = {
      code: 10010,
      trace: this.generateTraceId()
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送JSON格式消息
   * @param {object} msg - 消息对象
   */
  sendJsonMessage(msg) {
    if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
      console.warn(`[${new Date().toLocaleString()}] 连接未就绪，跳过消息发送`);
      return;
    }

    try {
      const jsonStr = JSON.stringify(msg);
      this.ws.send(jsonStr);
      console.log(`[${new Date().toLocaleString()}] 发送消息:`, jsonStr);
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 发送消息失败:`, e);
      this.isConnected = false;
      this.scheduleReconnect();
    }
  }

  /**
   * 接收消息回调
   * @param {MessageEvent} event - 消息事件
   */
  onMessage(event) {
    const message = event.data;
    console.log(`[${new Date().toLocaleString()}] 收到消息:`, message);
    this.handleReceivedMessage(message);
  }

  /**
   * 处理接收到的消息
   * @param {string} message - 原始消息字符串
   */
  handleReceivedMessage(message) {
    try {
      const msg = JSON.parse(message);
      const code = msg.code || 0;
      const trace = msg.trace || '';
      const data = msg.data || {};

      switch (code) {
        case 10000:
          console.log(`[${new Date().toLocaleString()}] 处理成交明细 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10003:
          console.log(`[${new Date().toLocaleString()}] 处理盘口数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10006:
          console.log(`[${new Date().toLocaleString()}] 处理K线数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10010:
          console.log(`[${new Date().toLocaleString()}] 收到心跳响应 [trace=${trace}]`);
          break;
        default:
          console.warn(`[${new Date().toLocaleString()}] 未知协议号 [code=${code}]:`, message);
      }
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 处理消息失败:`, e);
    }
  }

  /**
   * 连接关闭回调
   * @param {CloseEvent} event - 关闭事件
   */
  onClose(event) {
    this.isConnected = false;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] 连接关闭: 代码=${event.code}, 原因=${event.reason}`);
    
    // 清理心跳定时器
    this.stopHeartbeat();
    
    // 触发重连
    this.scheduleReconnect();
  }

  /**
   * 错误回调
   * @param {Event} error - 错误事件
   */
  onError(error) {
    console.error(`[${new Date().toLocaleString()}] WebSocket错误:`, error);
    this.isConnected = false;
    this.scheduleReconnect();
  }

  /**
   * 启动心跳任务
   */
  startHeartbeat() {
    // 清理旧定时器
    this.stopHeartbeat();
    
    // 立即发送一次心跳，然后定时发送
    this.sendHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.isConnected) {
        this.sendHeartbeat();
      }
    }, this.heartbeatInterval);
  }

  /**
   * 停止心跳任务
   */
  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  /**
   * 调度重连
   */
  scheduleReconnect() {
    if (this.reconnectTimer) return;
    
    console.log(`[${new Date().toLocaleString()}] ${this.reconnectInterval/1000}秒后尝试重连...`);
    this.reconnectTimer = setTimeout(() => {
      this.reconnectTimer = null;
      this.connect();
    }, this.reconnectInterval);
  }

  /**
   * 启动客户端
   */
  start() {
    console.log(`[${new Date().toLocaleString()}] 启动WebSocket客户端...`);
    this.connect();

    // Node.js环境下监听退出信号
    if (typeof process !== 'undefined' && process.on) {
      process.on('SIGINT', () => {
        console.log(`\n[${new Date().toLocaleString()}] 收到退出信号，关闭客户端...`);
        this.stop();
        process.exit(0);
      });
    }
  }

  /**
   * 停止客户端
   */
  stop() {
    this.isConnected = false;
    this.subscribeSent = false;
    
    // 清理定时器
    this.stopHeartbeat();
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 关闭连接
    if (this.ws) {
      this.ws.close(1000, 'Client stopped');
      this.ws = null;
    }

    console.log(`[${new Date().toLocaleString()}] 客户端已停止`);
  }
}

// ==================== 运行示例 ====================
// 浏览器环境：直接在控制台执行以下代码
// Node.js环境：直接运行该脚本

// 替换为你的实际API Key
const API_KEY = 'yourApikey';
const BUSINESS = 'crypto';

// 创建并启动客户端
const client = new CryptoWebsocketClient(API_KEY, BUSINESS);
client.start();

// 如需停止客户端，执行：client.stop();
```

{% endtab %}
{% endtabs %}


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.infoway.io/websocket-api/code-examples.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
