# Websocket代码示例

{% tabs %}
{% tab title="Java" %}

```java
package org.example.ws;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;

@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {

    // 本地session通道（使用volatile保证多线程可见性）
    private volatile Session session;

    // wss连接地址 business可以为stock、crypto、common；apikey为您的凭证
    private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";

    // 定时任务池（统一管理，避免创建多个线程池）
    private ScheduledExecutorService scheduledExecutorService;

    @PostConstruct
    public void connectAll() {
        // 初始化线程池（核心线程1个，避免资源浪费）
        scheduledExecutorService = Executors.newScheduledThreadPool(1);
        
        try {
            // 建立WEBSOCKET连接
            connect(WS_URL);
            // 开启自动重连（延迟1秒启动，每10秒检查一次）
            startReconnection(WS_URL);
        } catch (Exception e) {
            log.error("初始化WebSocket连接失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 自动重连机制
     * @param wsUrl WebSocket连接地址
     */
    private void startReconnection(String wsUrl) {
        Runnable reconnectTask = () -> {
            if (session == null || !session.isOpen()) {
                log.warn("WebSocket连接已断开，开始重连...");
                connect(wsUrl);
            }
        };
        // 延迟1秒启动，每10秒执行一次重连检查
        scheduledExecutorService.scheduleAtFixedRate(reconnectTask, 1, 10, TimeUnit.SECONDS);
    }

    /**
     * 建立WEBSOCKET连接具体实现
     * @param wsUrl 连接地址
     */
    private void connect(String wsUrl) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            // 关闭旧连接（防止连接泄露）
            if (session != null && session.isOpen()) {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "重新连接"));
            }
            // 建立新连接
            session = container.connectToServer(this, URI.create(wsUrl));
            log.info("WebSocket连接成功，sessionId: {}", session.getId());
        } catch (DeploymentException | IOException e) {
            log.error("WebSocket连接失败: {}", e.getMessage(), e);
            // 连接失败时清空session，触发下次重连
            session = null;
        }
    }

    /**
     * WebSocket连接建立成功后触发
     */
    @OnOpen
    public void onOpen(Session session) {
        // 更新全局session
        this.session = session;
        log.info("Connection opened: {}", session.getId());

        // 异步发送订阅请求（避免阻塞OnOpen线程）
        scheduledExecutorService.submit(() -> {
            try {
                // 1. 订阅实时成交明细（协议号10000）
                sendTradeSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 2. 订阅实时盘口数据（协议号10003）
                sendDepthSubscribe(session);
                TimeUnit.MILLISECONDS.sleep(5000); // 间隔5秒

                // 3. 订阅1分钟K线数据（协议号10006）
                sendKlineSubscribe(session);

                // 4. 启动心跳任务（30秒一次）
                startHeartbeatTask();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                log.error("发送订阅请求时线程被中断: {}", e.getMessage());
            } catch (IOException e) {
                log.error("发送订阅请求失败: {}", e.getMessage(), e);
            }
        });
    }

    /**
     * 发送实时成交明细订阅请求
     */
    private void sendTradeSubscribe(Session session) throws IOException {
        JSONObject tradeSendObj = new JSONObject();
        tradeSendObj.put("code", 10000); // 订阅成交明细协议号
        tradeSendObj.put("trace", generateTraceId()); // 自定义traceId
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT"); // 订阅BTCUSDT
        tradeSendObj.put("data", data);
        session.getBasicRemote().sendText(tradeSendObj.toJSONString());
        log.info("发送成交明细订阅请求: {}", tradeSendObj);
    }

    /**
     * 发送实时盘口数据订阅请求
     */
    private void sendDepthSubscribe(Session session) throws IOException {
        JSONObject depthSendObj = new JSONObject();
        depthSendObj.put("code", 10003); // 订阅盘口协议号
        depthSendObj.put("trace", generateTraceId());
        JSONObject data = new JSONObject();
        data.put("codes", "BTCUSDT");
        depthSendObj.put("data", data);
        session.getBasicRemote().sendText(depthSendObj.toJSONString());
        log.info("发送盘口数据订阅请求: {}", depthSendObj);
    }

    /**
     * 发送实时K线数据订阅请求
     */
    private void sendKlineSubscribe(Session session) throws IOException {
        JSONObject klineSendObj = new JSONObject();
        klineSendObj.put("code", 10006); // 订阅K线协议号
        klineSendObj.put("trace", generateTraceId());
        
        JSONObject klineData = new JSONObject();
        JSONArray klineDataArray = new JSONArray();
        
        JSONObject kline1minObj = new JSONObject();
        kline1minObj.put("type", 1); // 1分钟K线
        kline1minObj.put("codes", "BTCUSDT");
        klineDataArray.add(kline1minObj);
        klineData.put("arr", klineDataArray);
        
        klineSendObj.put("data", klineData);
        session.getBasicRemote().sendText(klineSendObj.toJSONString());
        log.info("发送K线数据订阅请求: {}", klineSendObj);
    }

    /**
     * 启动心跳保活任务
     */
    private void startHeartbeatTask() {
        Runnable heartbeatTask = () -> {
            try {
                if (session != null && session.isOpen()) {
                    JSONObject pingObj = new JSONObject();
                    pingObj.put("code", 10010); // 心跳协议号
                    pingObj.put("trace", generateTraceId());
                    session.getBasicRemote().sendText(pingObj.toJSONString());
                    log.debug("发送心跳包: {}", pingObj);
                }
            } catch (IOException e) {
                log.error("发送心跳包失败: {}", e.getMessage(), e);
            }
        };
        // 延迟30秒启动，每30秒发送一次心跳
        scheduledExecutorService.scheduleAtFixedRate(heartbeatTask, 30, 30, TimeUnit.SECONDS);
    }

    /**
     * 接收服务端推送的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到服务端消息: {}", message);
        // 这里可以解析消息并处理业务逻辑
        // 示例：解析JSON消息
        try {
            JSONObject msgObj = JSONObject.parseObject(message);
            Integer code = msgObj.getInteger("code");
            String trace = msgObj.getString("trace");
            JSONObject data = msgObj.getJSONObject("data");
            
            // 根据协议号区分消息类型
            switch (code) {
                case 10000: // 成交明细数据
                    handleTradeData(data);
                    break;
                case 10003: // 盘口数据
                    handleDepthData(data);
                    break;
                case 10006: // K线数据
                    handleKlineData(data);
                    break;
                case 10010: // 心跳响应
                    log.debug("收到心跳响应，trace: {}", trace);
                    break;
                default:
                    log.warn("未知协议号的消息: {}", code);
            }
        } catch (Exception e) {
            log.error("解析消息失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 处理成交明细数据
     */
    private void handleTradeData(JSONObject data) {
        // 实现业务逻辑，比如入库、推送前端等
        log.info("处理成交明细数据: {}", data);
    }

    /**
     * 处理盘口数据
     */
    private void handleDepthData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理盘口数据: {}", data);
    }

    /**
     * 处理K线数据
     */
    private void handleKlineData(JSONObject data) {
        // 实现业务逻辑
        log.info("处理K线数据: {}", data);
    }

    /**
     * 连接关闭时触发
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("Connection closed: {}, reason: {}", session.getId(), reason);
        // 清空session，触发重连
        this.session = null;
    }

    /**
     * 连接出错时触发
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket错误，sessionId: {}", session.getId(), error);
        // 清空session，触发重连
        this.session = null;
    }

    /**
     * 生成唯一traceId（示例实现，可替换为UUID）
     */
    private String generateTraceId() {
        return java.util.UUID.randomUUID().toString();
    }

    /**
     * 销毁Bean时关闭线程池和连接
     */
    @PreDestroy
    public void destroy() {
        log.info("销毁WebSocket客户端，关闭资源...");
        // 关闭session
        if (session != null && session.isOpen()) {
            try {
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "应用关闭"));
            } catch (IOException e) {
                log.error("关闭session失败", e);
            }
        }
        // 关闭线程池
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
            try {
                if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduledExecutorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduledExecutorService.shutdownNow();
            }
        }
    }
}
```

{% endtab %}

{% tab title="Python" %}

```python
import asyncio
import json
import uuid
import logging
from typing import Optional
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("websocket-client")

class CryptoWebsocketClient:
    """加密货币行情WebSocket客户端（对等Java版本逻辑）"""
    
    def __init__(self, api_key: str, business: str = "crypto"):
        # WebSocket连接配置
        self.ws_url = f"wss://data.infoway.io/ws?business={business}&apikey={api_key}"
        # 核心状态
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.is_connected = False
        self.reconnect_interval = 10  # 重连间隔（秒）
        self.heartbeat_interval = 30  # 心跳间隔（秒）
        # 任务对象（用于管理异步任务）
        self.reconnect_task: Optional[asyncio.Task] = None
        self.heartbeat_task: Optional[asyncio.Task] = None

    async def connect(self) -> None:
        """建立WebSocket连接"""
        try:
            # 关闭旧连接（防止连接泄露）
            if self.ws and not self.ws.closed:
                await self.ws.close()
            
            # 建立新连接
            self.ws = await websockets.connect(self.ws_url)
            self.is_connected = True
            logger.info(f"WebSocket连接成功，地址: {self.ws_url}")
            
            # 连接成功后启动订阅和心跳
            await self._send_all_subscribe_requests()
            self._start_heartbeat_task()
        
        except WebSocketException as e:
            self.is_connected = False
            logger.error(f"WebSocket连接失败: {str(e)}")
            raise

    async def _send_all_subscribe_requests(self) -> None:
        """发送所有订阅请求（成交明细、盘口、K线）"""
        if not self.ws or self.ws.closed:
            logger.error("连接未建立，无法发送订阅请求")
            return
        
        try:
            # 1. 订阅实时成交明细（协议号10000）
            await self._send_trade_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 2. 订阅实时盘口数据（协议号10003）
            await self._send_depth_subscribe()
            await asyncio.sleep(5)  # 间隔5秒
            
            # 3. 订阅1分钟K线数据（协议号10006）
            await self._send_kline_subscribe()
            
        except Exception as e:
            logger.error(f"发送订阅请求失败: {str(e)}")
            raise

    def _generate_trace_id(self) -> str:
        """生成唯一trace ID（对等Java的UUID）"""
        return str(uuid.uuid4())

    async def _send_trade_subscribe(self) -> None:
        """发送实时成交明细订阅请求"""
        subscribe_msg = {
            "code": 10000,
            "trace": self._generate_trace_id(),
            "data": {"codes": "BTCUSDT"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送成交明细订阅请求: {subscribe_msg}")

    async def _send_depth_subscribe(self) -> None:
        """发送实时盘口数据订阅请求"""
        subscribe_msg = {
            "code": 10003,
            "trace": self._generate_trace_id(),
            "data": {"codes": "BTCUSDT"}
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送盘口数据订阅请求: {subscribe_msg}")

    async def _send_kline_subscribe(self) -> None:
        """发送1分钟K线数据订阅请求"""
        subscribe_msg = {
            "code": 10006,
            "trace": self._generate_trace_id(),
            "data": {
                "arr": [
                    {"type": 1, "codes": "BTCUSDT"}  # type=1 表示1分钟K线
                ]
            }
        }
        await self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"发送K线数据订阅请求: {subscribe_msg}")

    async def _send_heartbeat(self) -> None:
        """发送心跳包（协议号10010）"""
        if not self.ws or self.ws.closed:
            return
        
        heartbeat_msg = {
            "code": 10010,
            "trace": self._generate_trace_id()
        }
        try:
            await self.ws.send(json.dumps(heartbeat_msg))
            logger.debug(f"发送心跳包: {heartbeat_msg}")
        except Exception as e:
            logger.error(f"发送心跳包失败: {str(e)}")
            raise

    def _start_heartbeat_task(self) -> None:
        """启动心跳任务（后台定时发送）"""
        if self.heartbeat_task and not self.heartbeat_task.done():
            self.heartbeat_task.cancel()
        
        async def heartbeat_loop():
            while self.is_connected and self.ws and not self.ws.closed:
                try:
                    await self._send_heartbeat()
                    await asyncio.sleep(self.heartbeat_interval)
                except Exception as e:
                    logger.error(f"心跳任务异常: {str(e)}")
                    break
        
        self.heartbeat_task = asyncio.create_task(heartbeat_loop())

    async def _message_listener(self) -> None:
        """监听服务端推送的消息"""
        while self.is_connected and self.ws and not self.ws.closed:
            try:
                # 阻塞等待接收消息
                message = await self.ws.recv()
                logger.info(f"收到服务端消息: {message}")
                
                # 解析消息并处理（对等Java的OnMessage逻辑）
                self._handle_received_message(message)
                
            except ConnectionClosed:
                logger.warning("WebSocket连接已关闭，停止消息监听")
                self.is_connected = False
                break
            except Exception as e:
                logger.error(f"接收/处理消息异常: {str(e)}")

    def _handle_received_message(self, message: str) -> None:
        """处理接收到的消息（按协议号分类）"""
        try:
            msg_data = json.loads(message)
            code = msg_data.get("code")
            trace = msg_data.get("trace")
            data = msg_data.get("data", {})
            
            if code == 10000:
                logger.info(f"处理成交明细数据 [trace={trace}]: {data}")
            elif code == 10003:
                logger.info(f"处理盘口数据 [trace={trace}]: {data}")
            elif code == 10006:
                logger.info(f"处理K线数据 [trace={trace}]: {data}")
            elif code == 10010:
                logger.debug(f"收到心跳响应 [trace={trace}]")
            else:
                logger.warning(f"未知协议号消息 [code={code}]: {message}")
                
        except json.JSONDecodeError:
            logger.error(f"消息格式错误，无法解析JSON: {message}")
        except Exception as e:
            logger.error(f"处理消息异常: {str(e)}")

    async def _reconnect_loop(self) -> None:
        """自动重连循环（对等Java的startReconnection）"""
        while True:
            if not self.is_connected:
                logger.info(f"尝试重连WebSocket（间隔{self.reconnect_interval}秒）...")
                try:
                    await self.connect()
                except Exception:
                    # 重连失败，等待后重试
                    await asyncio.sleep(self.reconnect_interval)
            else:
                # 连接正常，短暂等待后再次检查
                await asyncio.sleep(1)

    async def start(self) -> None:
        """启动客户端（主入口）"""
        # 启动自动重连任务
        self.reconnect_task = asyncio.create_task(self._reconnect_loop())
        
        try:
            # 首次连接
            await self.connect()
            
            # 启动消息监听
            await self._message_listener()
            
        finally:
            # 清理资源
            self.is_connected = False
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
            if self.reconnect_task:
                self.reconnect_task.cancel()
            if self.ws and not self.ws.closed:
                await self.ws.close()
            logger.info("WebSocket客户端已停止")

async def main():
    """主函数"""
    # 替换为你的实际API Key
    API_KEY = "yourApikey"
    client = CryptoWebsocketClient(api_key=API_KEY)
    await client.start()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("用户中断，退出客户端")
    except Exception as e:
        logger.error(f"客户端运行异常: {str(e)}")
```

{% endtab %}

{% tab title="Go" %}

```go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// CryptoWebsocketClient 加密货币行情WebSocket客户端
type CryptoWebsocketClient struct {
	apiKey             string        // 认证API Key
	business           string        // 业务类型：stock/crypto/common
	wsURL              string        // WebSocket连接地址
	conn               *websocket.Conn // WebSocket连接实例
	isConnected        bool          // 连接状态
	reconnectInterval  time.Duration // 重连间隔
	heartbeatInterval  time.Duration // 心跳间隔
	done               chan struct{} // 退出信号通道
}

// NewCryptoWebsocketClient 创建客户端实例
func NewCryptoWebsocketClient(apiKey, business string) *CryptoWebsocketClient {
	// 构建WS URL
	params := url.Values{}
	params.Set("business", business)
	params.Set("apikey", apiKey)
	wsURL := fmt.Sprintf("wss://data.infoway.io/ws?%s", params.Encode())

	return &CryptoWebsocketClient{
		apiKey:             apiKey,
		business:           business,
		wsURL:              wsURL,
		reconnectInterval:  10 * time.Second,  // 10秒重连间隔
		heartbeatInterval:  30 * time.Second,  // 30秒心跳间隔
		done:               make(chan struct{}),
	}
}

// generateTraceID 生成唯一的trace ID（对齐Java/Python的UUID）
func (c *CryptoWebsocketClient) generateTraceID() string {
	return uuid.New().String()
}

// connect 建立WebSocket连接
func (c *CryptoWebsocketClient) connect() error {
	// 关闭旧连接
	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}

	// 建立新连接
	conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, nil)
	if err != nil {
		c.isConnected = false
		return fmt.Errorf("连接失败: %w", err)
	}

	c.conn = conn
	c.isConnected = true
	log.Printf("WebSocket连接成功: %s", c.wsURL)

	// 连接成功后发送订阅请求
	go c.sendAllSubscribeRequests()

	// 启动心跳任务
	go c.startHeartbeat()

	// 启动消息监听
	go c.listenMessages()

	return nil
}

// sendAllSubscribeRequests 发送所有订阅请求（成交明细、盘口、K线）
func (c *CryptoWebsocketClient) sendAllSubscribeRequests() {
	if !c.isConnected || c.conn == nil {
		log.Println("连接未建立，无法发送订阅请求")
		return
	}

	// 1. 订阅实时成交明细（协议号10000）
	if err := c.sendTradeSubscribe(); err != nil {
		log.Printf("发送成交明细订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 2. 订阅实时盘口数据（协议号10003）
	if err := c.sendDepthSubscribe(); err != nil {
		log.Printf("发送盘口订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 3. 订阅1分钟K线数据（协议号10006）
	if err := c.sendKlineSubscribe(); err != nil {
		log.Printf("发送K线订阅失败: %v", err)
		return
	}
}

// sendTradeSubscribe 发送实时成交明细订阅请求
func (c *CryptoWebsocketClient) sendTradeSubscribe() error {
	msg := map[string]interface{}{
		"code":  10000,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendDepthSubscribe 发送实时盘口数据订阅请求
func (c *CryptoWebsocketClient) sendDepthSubscribe() error {
	msg := map[string]interface{}{
		"code":  10003,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendKlineSubscribe 发送1分钟K线数据订阅请求
func (c *CryptoWebsocketClient) sendKlineSubscribe() error {
	msg := map[string]interface{}{
		"code":  10006,
		"trace": c.generateTraceID(),
		"data": map[string]interface{}{
			"arr": []map[string]interface{}{
				{
					"type":  1,        // 1分钟K线
					"codes": "BTCUSDT",
				},
			},
		},
	}
	return c.sendJSONMessage(msg)
}

// sendHeartbeat 发送心跳包（协议号10010）
func (c *CryptoWebsocketClient) sendHeartbeat() error {
	msg := map[string]interface{}{
		"code":  10010,
		"trace": c.generateTraceID(),
	}
	return c.sendJSONMessage(msg)
}

// sendJSONMessage 发送JSON格式消息
func (c *CryptoWebsocketClient) sendJSONMessage(msg interface{}) error {
	if !c.isConnected || c.conn == nil {
		return fmt.Errorf("连接已断开")
	}

	// 序列化为JSON
	data, err := json.Marshal(msg)
	if err != nil {
		return fmt.Errorf("消息序列化失败: %w", err)
	}

	// 发送消息
	if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
		c.isConnected = false
		return fmt.Errorf("发送消息失败: %w", err)
	}

	log.Printf("发送消息: %s", string(data))
	return nil
}

// startHeartbeat 启动心跳保活任务
func (c *CryptoWebsocketClient) startHeartbeat() {
	ticker := time.NewTicker(c.heartbeatInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// 定时发送心跳
			if err := c.sendHeartbeat(); err != nil {
				log.Printf("发送心跳失败: %v", err)
				return
			}
			log.Printf("发送心跳包成功")
		case <-c.done:
			// 收到退出信号，停止心跳
			return
		}
	}
}

// listenMessages 监听服务端推送的消息
func (c *CryptoWebsocketClient) listenMessages() {
	for {
		if !c.isConnected || c.conn == nil {
			return
		}

		// 读取消息
		_, msgData, err := c.conn.ReadMessage()
		if err != nil {
			c.isConnected = false
			log.Printf("读取消息失败/连接断开: %v", err)
			return
		}

		// 打印原始消息
		log.Printf("收到服务端消息: %s", string(msgData))

		// 解析并处理消息
		c.handleReceivedMessage(msgData)
	}
}

// handleReceivedMessage 处理接收到的消息（按协议号分类）
func (c *CryptoWebsocketClient) handleReceivedMessage(msgData []byte) {
	var msg map[string]interface{}
	if err := json.Unmarshal(msgData, &msg); err != nil {
		log.Printf("解析消息失败: %v", err)
		return
	}

	// 提取核心字段
	code := int(msg["code"].(float64)) // JSON数字默认解析为float64
	trace := msg["trace"].(string)
	data := msg["data"]

	// 按协议号处理不同类型数据
	switch code {
	case 10000:
		log.Printf("处理成交明细数据 [trace=%s]: %+v", trace, data)
	case 10003:
		log.Printf("处理盘口数据 [trace=%s]: %+v", trace, data)
	case 10006:
		log.Printf("处理K线数据 [trace=%s]: %+v", trace, data)
	case 10010:
		log.Printf("收到心跳响应 [trace=%s]", trace)
	default:
		log.Printf("未知协议号消息 [code=%d]: %s", code, string(msgData))
	}
}

// startReconnectLoop 启动自动重连循环
func (c *CryptoWebsocketClient) startReconnectLoop() {
	for {
		select {
		case <-c.done:
			// 收到退出信号，停止重连
			return
		default:
			if !c.isConnected {
				log.Printf("尝试重连WebSocket（间隔%v）...", c.reconnectInterval)
				// 重连前随机延迟（避免高频重试）
				time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
				if err := c.connect(); err != nil {
					log.Printf("重连失败: %v", err)
					time.Sleep(c.reconnectInterval)
					continue
				}
			}
			// 连接正常时，短暂休眠后再次检查
			time.Sleep(1 * time.Second)
		}
	}
}

// Start 启动客户端
func (c *CryptoWebsocketClient) Start() {
	// 启动自动重连循环
	go c.startReconnectLoop()

	// 首次连接
	if err := c.connect(); err != nil {
		log.Printf("首次连接失败: %v", err)
	}

	// 监听系统退出信号（Ctrl+C）
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	// 收到退出信号，清理资源
	log.Println("收到退出信号，关闭客户端...")
	close(c.done)
	if c.conn != nil {
		c.conn.Close()
	}
	c.isConnected = false
}

func main() {
	// 替换为你的实际API Key
	apiKey := "yourApikey"
	// 业务类型：crypto/stock/common
	business := "crypto"

	// 创建客户端
	client := NewCryptoWebsocketClient(apiKey, business)

	// 启动客户端
	client.Start()
}
```

{% endtab %}

{% tab title="Php" %}

```php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use WebSocket\Client;
use WebSocket\ConnectionException;

/**
 * 加密货币行情WebSocket客户端
 */
class CryptoWebsocketClient
{
    // 配置项
    private $apiKey;             // 认证API Key
    private $business;           // 业务类型：stock/crypto/common
    private $wsUrl;              // WebSocket连接地址
    private $reconnectInterval;  // 重连间隔（秒）
    private $heartbeatInterval;  // 心跳间隔（秒）
    
    // 运行状态
    private $client;             // WebSocket客户端实例
    private $isConnected = false;// 连接状态
    private $running = true;     // 客户端运行状态

    /**
     * 构造函数
     * @param string $apiKey API密钥
     * @param string $business 业务类型
     */
    public function __construct(string $apiKey, string $business = 'crypto')
    {
        $this->apiKey = $apiKey;
        $this->business = $business;
        // 构建WS URL
        $this->wsUrl = sprintf(
            'wss://data.infoway.io/ws?business=%s&apikey=%s',
            urlencode($business),
            urlencode($apiKey)
        );
        $this->reconnectInterval = 10;  // 10秒重连间隔
        $this->heartbeatInterval = 30;  // 30秒心跳间隔
    }

    /**
     * 生成唯一Trace ID
     * @return string
     */
    private function generateTraceId(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }

    /**
     * 建立WebSocket连接
     * @return bool
     */
    private function connect(): bool
    {
        try {
            // 关闭旧连接
            if ($this->client) {
                try {
                    $this->client->close();
                } catch (Exception $e) {
                    // 忽略关闭异常
                }
                $this->client = null;
            }

            // 创建新连接
            $this->client = new Client($this->wsUrl, [
                'timeout' => 30,
                'fragment_size' => 8192,
            ]);
            $this->isConnected = true;
            echo date('Y-m-d H:i:s') . " - WebSocket连接成功: {$this->wsUrl}\n";
            
            // 发送所有订阅请求
            $this->sendAllSubscribeRequests();
            
            return true;
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - WebSocket连接失败: {$e->getMessage()}\n";
            return false;
        } catch (Exception $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接异常: {$e->getMessage()}\n";
            return false;
        }
    }

    /**
     * 发送所有订阅请求
     */
    private function sendAllSubscribeRequests(): void
    {
        if (!$this->isConnected || !$this->client) {
            echo date('Y-m-d H:i:s') . " - 连接未建立，无法发送订阅请求\n";
            return;
        }

        try {
            // 1. 订阅实时成交明细（协议号10000）
            $this->sendTradeSubscribe();
            sleep(5); // 间隔5秒

            // 2. 订阅实时盘口数据（协议号10003）
            $this->sendDepthSubscribe();
            sleep(5); // 间隔5秒

            // 3. 订阅1分钟K线数据（协议号10006）
            $this->sendKlineSubscribe();
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 发送订阅请求失败: {$e->getMessage()}\n";
            $this->isConnected = false;
        }
    }

    /**
     * 发送成交明细订阅请求
     */
    private function sendTradeSubscribe(): void
    {
        $msg = [
            'code'  => 10000,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送盘口数据订阅请求
     */
    private function sendDepthSubscribe(): void
    {
        $msg = [
            'code'  => 10003,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送K线数据订阅请求
     */
    private function sendKlineSubscribe(): void
    {
        $msg = [
            'code'  => 10006,
            'trace' => $this->generateTraceId(),
            'data'  => [
                'arr' => [
                    ['type' => 1, 'codes' => 'BTCUSDT'] // 1分钟K线
                ]
            ]
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送心跳包
     */
    private function sendHeartbeat(): void
    {
        $msg = [
            'code'  => 10010,
            'trace' => $this->generateTraceId()
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送JSON格式消息
     * @param array $msg 消息数组
     */
    private function sendJsonMessage(array $msg): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            $jsonStr = json_encode($msg, JSON_UNESCAPED_UNICODE);
            $this->client->send($jsonStr);
            echo date('Y-m-d H:i:s') . " - 发送消息: {$jsonStr}\n";
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 发送消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 消息序列化失败: {$e->getMessage()}\n";
        }
    }

    /**
     * 监听服务端消息
     */
    private function listenMessages(): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            // 设置非阻塞读取（避免卡死）
            $socket = $this->client->getSocket();
            stream_set_blocking($socket, false);

            $message = $this->client->receive();
            if ($message !== '') {
                echo date('Y-m-d H:i:s') . " - 收到消息: {$message}\n";
                $this->handleReceivedMessage($message);
            }
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接断开/读取消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            // 忽略非阻塞读取的空消息异常
            if (strpos($e->getMessage(), 'no data received') === false) {
                echo date('Y-m-d H:i:s') . " - 监听消息异常: {$e->getMessage()}\n";
            }
        }
    }

    /**
     * 处理接收到的消息
     * @param string $message 原始消息字符串
     */
    private function handleReceivedMessage(string $message): void
    {
        try {
            $msg = json_decode($message, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                echo date('Y-m-d H:i:s') . " - 解析消息失败: 格式错误\n";
                return;
            }

            $code = $msg['code'] ?? 0;
            $trace = $msg['trace'] ?? '';
            $data = $msg['data'] ?? [];

            switch ($code) {
                case 10000:
                    echo date('Y-m-d H:i:s') . " - 处理成交明细 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10003:
                    echo date('Y-m-d H:i:s') . " - 处理盘口数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10006:
                    echo date('Y-m-d H:i:s') . " - 处理K线数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10010:
                    echo date('Y-m-d H:i:s') . " - 收到心跳响应 [trace={$trace}]\n";
                    break;
                default:
                    echo date('Y-m-d H:i:s') . " - 未知协议号 [code={$code}]: {$message}\n";
            }
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 处理消息异常: {$e->getMessage()}\n";
        }
    }

    /**
     * 启动心跳任务
     * @param int $lastHeartbeatTime 上次心跳时间戳引用
     */
    private function checkHeartbeat(&$lastHeartbeatTime): void
    {
        $currentTime = time();
        if ($currentTime - $lastHeartbeatTime >= $this->heartbeatInterval) {
            $this->sendHeartbeat();
            $lastHeartbeatTime = $currentTime;
        }
    }

    /**
     * 启动客户端
     */
    public function start(): void
    {
        echo date('Y-m-d H:i:s') . " - 启动WebSocket客户端...\n";
        
        // 注册退出信号处理
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, function () {
            echo "\n" . date('Y-m-d H:i:s') . " - 收到退出信号，停止客户端...\n";
            $this->running = false;
            if ($this->client) {
                $this->client->close();
            }
            exit(0);
        });

        $lastHeartbeatTime = 0;
        $reconnectDelay = 0;

        // 主循环
        while ($this->running) {
            // 处理重连逻辑
            if (!$this->isConnected) {
                if ($reconnectDelay <= 0) {
                    $this->connect();
                    $reconnectDelay = $this->reconnectInterval;
                    $lastHeartbeatTime = time();
                } else {
                    $reconnectDelay--;
                    echo date('Y-m-d H:i:s') . " - 等待重连... ({$reconnectDelay}秒)\n";
                }
            } else {
                // 监听消息
                $this->listenMessages();
                // 检查心跳
                $this->checkHeartbeat($lastHeartbeatTime);
            }

            // 主循环延迟（避免CPU占用过高）
            usleep(100000); // 100毫秒
        }

        // 清理资源
        if ($this->client) {
            $this->client->close();
        }
        echo date('Y-m-d H:i:s') . " - 客户端已停止\n";
    }

    /**
     * 析构函数
     */
    public function __destruct()
    {
        if ($this->client) {
            try {
                $this->client->close();
            } catch (Exception $e) {
                // 忽略
            }
        }
    }
}

// 主程序入口
if (php_sapi_name() !== 'cli') {
    die("该脚本仅支持CLI模式运行！");
}

// 配置（替换为你的实际API Key）
$apiKey = 'yourApikey';
$business = 'crypto';

// 创建并启动客户端
$client = new CryptoWebsocketClient($apiKey, $business);
$client->start();
```

{% endtab %}

{% tab title="JavaScript" %}

```javascript
/**
 * 加密货币行情WebSocket客户端（JS通用版）
 * 支持浏览器/Node.js环境（Node.js需确保版本≥10.0.0）
 */
class CryptoWebsocketClient {
  /**
   * 构造函数
   * @param {string} apiKey - 认证API Key
   * @param {string} business - 业务类型：stock/crypto/common
   */
  constructor(apiKey, business = 'crypto') {
    // 配置项
    this.apiKey = apiKey;
    this.business = business;
    this.wsUrl = `wss://data.infoway.io/ws?business=${encodeURIComponent(business)}&apikey=${encodeURIComponent(apiKey)}`;
    this.reconnectInterval = 10000; // 重连间隔（10秒，单位ms）
    this.heartbeatInterval = 30000; // 心跳间隔（30秒，单位ms）

    // 运行状态
    this.ws = null; // WebSocket实例
    this.isConnected = false; // 连接状态
    this.heartbeatTimer = null; // 心跳定时器
    this.reconnectTimer = null; // 重连定时器
    this.subscribeSent = false; // 订阅请求是否已发送

    // 绑定方法上下文（避免回调中this丢失）
    this.onOpen = this.onOpen.bind(this);
    this.onMessage = this.onMessage.bind(this);
    this.onClose = this.onClose.bind(this);
    this.onError = this.onError.bind(this);
    this.sendHeartbeat = this.sendHeartbeat.bind(this);
  }

  /**
   * 生成唯一Trace ID（模拟UUID）
   * @returns {string}
   */
  generateTraceId() {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
      const r = Math.random() * 16 | 0;
      const v = c === 'x' ? r : (r & 0x3 | 0x8);
      return v.toString(16);
    });
  }

  /**
   * 建立WebSocket连接
   */
  connect() {
    // 清理旧连接
    if (this.ws) {
      try {
        this.ws.close(1000, 'Reconnecting');
      } catch (e) {
        console.error('关闭旧连接失败:', e);
      }
      this.ws = null;
    }

    try {
      // 创建新连接
      this.ws = new WebSocket(this.wsUrl);
      // 注册事件回调
      this.ws.onopen = this.onOpen;
      this.ws.onmessage = this.onMessage;
      this.ws.onclose = this.onClose;
      this.ws.onerror = this.onError;
      console.log(`[${new Date().toLocaleString()}] 正在连接WebSocket: ${this.wsUrl}`);
    } catch (e) {
      this.isConnected = false;
      console.error(`[${new Date().toLocaleString()}] 连接失败:`, e);
      this.scheduleReconnect();
    }
  }

  /**
   * 连接成功回调
   */
  onOpen() {
    this.isConnected = true;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] WebSocket连接成功`);

    // 清理重连定时器
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 发送订阅请求
    this.sendAllSubscribeRequests();

    // 启动心跳
    this.startHeartbeat();
  }

  /**
   * 发送所有订阅请求
   */
  sendAllSubscribeRequests() {
    if (this.subscribeSent || !this.isConnected) return;
    this.subscribeSent = true;

    // 1. 订阅实时成交明细（协议号10000）
    this.sendTradeSubscribe();
    
    // 间隔5秒发送盘口订阅
    setTimeout(() => {
      if (this.isConnected) {
        this.sendDepthSubscribe();
      }
    }, 5000);

    // 间隔10秒发送K线订阅（5+5）
    setTimeout(() => {
      if (this.isConnected) {
        this.sendKlineSubscribe();
      }
    }, 10000);
  }

  /**
   * 发送成交明细订阅请求
   */
  sendTradeSubscribe() {
    const msg = {
      code: 10000,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送盘口数据订阅请求
   */
  sendDepthSubscribe() {
    const msg = {
      code: 10003,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送K线数据订阅请求
   */
  sendKlineSubscribe() {
    const msg = {
      code: 10006,
      trace: this.generateTraceId(),
      data: {
        arr: [{ type: 1, codes: 'BTCUSDT' }] // 1分钟K线
      }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送心跳包
   */
  sendHeartbeat() {
    const msg = {
      code: 10010,
      trace: this.generateTraceId()
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送JSON格式消息
   * @param {object} msg - 消息对象
   */
  sendJsonMessage(msg) {
    if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
      console.warn(`[${new Date().toLocaleString()}] 连接未就绪，跳过消息发送`);
      return;
    }

    try {
      const jsonStr = JSON.stringify(msg);
      this.ws.send(jsonStr);
      console.log(`[${new Date().toLocaleString()}] 发送消息:`, jsonStr);
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 发送消息失败:`, e);
      this.isConnected = false;
      this.scheduleReconnect();
    }
  }

  /**
   * 接收消息回调
   * @param {MessageEvent} event - 消息事件
   */
  onMessage(event) {
    const message = event.data;
    console.log(`[${new Date().toLocaleString()}] 收到消息:`, message);
    this.handleReceivedMessage(message);
  }

  /**
   * 处理接收到的消息
   * @param {string} message - 原始消息字符串
   */
  handleReceivedMessage(message) {
    try {
      const msg = JSON.parse(message);
      const code = msg.code || 0;
      const trace = msg.trace || '';
      const data = msg.data || {};

      switch (code) {
        case 10000:
          console.log(`[${new Date().toLocaleString()}] 处理成交明细 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10003:
          console.log(`[${new Date().toLocaleString()}] 处理盘口数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10006:
          console.log(`[${new Date().toLocaleString()}] 处理K线数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10010:
          console.log(`[${new Date().toLocaleString()}] 收到心跳响应 [trace=${trace}]`);
          break;
        default:
          console.warn(`[${new Date().toLocaleString()}] 未知协议号 [code=${code}]:`, message);
      }
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 处理消息失败:`, e);
    }
  }

  /**
   * 连接关闭回调
   * @param {CloseEvent} event - 关闭事件
   */
  onClose(event) {
    this.isConnected = false;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] 连接关闭: 代码=${event.code}, 原因=${event.reason}`);
    
    // 清理心跳定时器
    this.stopHeartbeat();
    
    // 触发重连
    this.scheduleReconnect();
  }

  /**
   * 错误回调
   * @param {Event} error - 错误事件
   */
  onError(error) {
    console.error(`[${new Date().toLocaleString()}] WebSocket错误:`, error);
    this.isConnected = false;
    this.scheduleReconnect();
  }

  /**
   * 启动心跳任务
   */
  startHeartbeat() {
    // 清理旧定时器
    this.stopHeartbeat();
    
    // 立即发送一次心跳，然后定时发送
    this.sendHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.isConnected) {
        this.sendHeartbeat();
      }
    }, this.heartbeatInterval);
  }

  /**
   * 停止心跳任务
   */
  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  /**
   * 调度重连
   */
  scheduleReconnect() {
    if (this.reconnectTimer) return;
    
    console.log(`[${new Date().toLocaleString()}] ${this.reconnectInterval/1000}秒后尝试重连...`);
    this.reconnectTimer = setTimeout(() => {
      this.reconnectTimer = null;
      this.connect();
    }, this.reconnectInterval);
  }

  /**
   * 启动客户端
   */
  start() {
    console.log(`[${new Date().toLocaleString()}] 启动WebSocket客户端...`);
    this.connect();

    // Node.js环境下监听退出信号
    if (typeof process !== 'undefined' && process.on) {
      process.on('SIGINT', () => {
        console.log(`\n[${new Date().toLocaleString()}] 收到退出信号，关闭客户端...`);
        this.stop();
        process.exit(0);
      });
    }
  }

  /**
   * 停止客户端
   */
  stop() {
    this.isConnected = false;
    this.subscribeSent = false;
    
    // 清理定时器
    this.stopHeartbeat();
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 关闭连接
    if (this.ws) {
      this.ws.close(1000, 'Client stopped');
      this.ws = null;
    }

    console.log(`[${new Date().toLocaleString()}] 客户端已停止`);
  }
}

// ==================== 运行示例 ====================
// 浏览器环境：直接在控制台执行以下代码
// Node.js环境：直接运行该脚本

// 替换为你的实际API Key
const API_KEY = 'yourApikey';
const BUSINESS = 'crypto';

// 创建并启动客户端
const client = new CryptoWebsocketClient(API_KEY, BUSINESS);
client.start();

// 如需停止客户端，执行：client.stop();
```

{% endtab %}
{% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.infoway.io/websocket-api/code-examples.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
