# WebSocket Code Example

{% tabs %}
{% tab title="Jave" %}

```java
package org.example.ws;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {

    // Local session channel
    private static Session session;

    // WebSocket subscription address — “business” can be stock, crypto, or common; “apikey” is your credential
    private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";

    @PostConstruct
    public void connectAll() {
        try {
            //Establish WebSocket connection
            connect(WS_URL);
            //Enable auto-reconnect
            startReconnection(WS_URL);
        } catch (Exception e) {
            log.error("Failed to connect to " + WS_URL + ": " + e.getMessage());
        }
    }

    //The auto-reconnect mechanism starts a scheduled thread to check connection status; if disconnected, it reconnects automatically
    private void startReconnection(String s) {
        ScheduledExecutorService usExecutor = Executors.newScheduledThreadPool(1);
        Runnable usTask = () -> {
            if (session == null || !session.isOpen()) {
                log.debug("reconnection...");
                connect(s);
            }
        };
        usExecutor.scheduleAtFixedRate(usTask, 1000, 10000, TimeUnit.MILLISECONDS);
    }

    //Implementation of WebSocket connection establishment
    private void connect(String s) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            session = container.connectToServer(WebsocketExample.class, URI.create(s));
        } catch (DeploymentException | IOException e) {
            log.error("Failed to connect to the server: {}", e.getMessage());
        }
    }

    //The following method is executed when the WebSocket connection is successfully established
    @OnOpen
    public void onOpen(Session session) throws IOException, InterruptedException {
        //This method is triggered upon successful WebSocket connection
        System.out.println("Connection opened: " + session.getId());

        // Send a latest trade details subscription request
        JSONObject tradeSendObj = new JSONObject();
        //Refer to the WebSocket API docs for different protocol codes. 10000 is for latest trade subscription
        tradeSendObj.put("code", 10000);
        //Custom trace ID
        tradeSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format
        JSONObject data = new JSONObject();
        //Subscribe to BTCUSDT
        data.put("codes", "BTCUSDT");
        tradeSendObj.put("data", data);
        //Send the latest trade subscription request
        session.getBasicRemote().sendText(tradeSendObj.toJSONString());

        //-----------------------------------------------------------------//
        //Add a delay between different requests
        Thread.sleep(5000);

        //Send the real-time order book (depth) data subscription request
        JSONObject depthSendObj = new JSONObject();
        //Refer to the WebSocket API docs for different protocol codes. 10003 is for real-time order book subscription
        depthSendObj.put("code", 10003);
        //Custom trace ID
        depthSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format
        depthSendObj.put("data", data);
        //Send subscription request
        session.getBasicRemote().sendText(depthSendObj.toJSONString());

        
        //-----------------------------------------------------------------//
        //Add a delay between different requests
        Thread.sleep(5000);

        //Send the candlestick data subscription request
        JSONObject klineSendObj = new JSONObject();
        //Protocol 10006 is for candlestick subscription
        klineSendObj.put("code", 10006);
        //Custom trace ID
        klineSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format        JSONObject klineData = new JSONObject();
        JSONArray klineDataArray = new JSONArray();

        //Build the entity for 1-minute candle subscription
        JSONObject kline1minObj = new JSONObject();
        //“type” corresponds to the klineType parameter
        kline1minObj.put("type", 1);
        kline1minObj.put("codes", "BTCUSDT");
        klineDataArray.add(kline1minObj);
        klineData.put("arr", klineDataArray);

        klineSendObj.put("data", klineData);
        //Send the subscription request
        session.getBasicRemote().sendText(klineSendObj.toJSONString());

        //Schedule a heartbeat task
        ScheduledExecutorService pingExecutor = Executors.newScheduledThreadPool(1);
        Runnable pingTask = WebsocketExample::ping;
        pingExecutor.scheduleAtFixedRate(pingTask, 30, 30, TimeUnit.SECONDS);

    }

    @OnMessage
    public void onMessage(String message, Session session) {
        //Receives messages returned by the INFOWAY server, includes subscription success/failure notifications and actual market data pushes
        try {
            System.out.println("Message received: " + message);
        } catch (Exception e) {
        }
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //This method is called when the WebSocket connection is closed
        System.out.println("Connection closed: " + session.getId() + ", reason: " + reason);
    }

    @OnError
    public void onError(Throwable error) {
        error.printStackTrace();
    }

    //Continuously send heartbeat messages to prevent the server from closing the connection due to inactivity
    public static void ping() {
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("code", 10010);
            jsonObject.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
            session.getBasicRemote().sendText(jsonObject.toJSONString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

```

{% endtab %}

{% tab title="Python" %}

```python
import json
import time
import schedule
import threading
import websocket
from loguru import logger

class WebsocketExample:
    def __init__(self):
        self.session = None
        self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
        self.reconnecting = False
        self.is_ws_connected = False  # Add connection status flag

    def connect_all(self):
        """Establish WebSocket connection and start auto-reconnect mechanism"""
        try:
            self.connect(self.ws_url)
            self.start_reconnection(self.ws_url)
        except Exception as e:
            logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")

    def start_reconnection(self, url):
        """Start scheduled reconnection checks"""
        def check_connection():
            if not self.is_connected():
                logger.debug("Reconnection attempt...")
                self.connect(url)
        
        # Use a thread to periodically check connection status
        schedule.every(10).seconds.do(check_connection)
        def run_scheduler():
            while True:
                schedule.run_pending()
                time.sleep(1)
        threading.Thread(target=run_scheduler, daemon=True).start()

    def is_connected(self):
        """Check WebSocket connection status"""
        return self.session and self.is_ws_connected

    def connect(self, url):
        """Establish a WebSocket connection"""
        try:
            if self.is_connected():
                self.session.close()
            
            self.session = websocket.WebSocketApp(
                url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )
            
            # Start the WebSocket connection (non-blocking)
            threading.Thread(target=self.session.run_forever, daemon=True).start()
        except Exception as e:
            logger.error(f"Failed to connect to the server: {str(e)}")

    def on_open(self, ws):
        """Callback when WebSocket connection is successfully established"""
        logger.info(f"Connection opened")
        self.is_ws_connected = True  # Set connection status to True
        
        try:
            # Send realtime trade subscription request
            trade_send_obj = {
                "code": 10000,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(trade_send_obj)
            
            # Wait a while between requests
            time.sleep(5)
            
            # Send realtime order book subscription request
            depth_send_obj = {
                "code": 10003,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(depth_send_obj)
            
            # Wait a while between requests
            time.sleep(5)
            
            # Send realtime candlestick (K-line) subscription request
            kline_data = {
                "arr": [
                    {
                        "type": 1,
                        "codes": "BTCUSDT"
                    }
                ]
            }
            kline_send_obj = {
                "code": 10006,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": kline_data
            }
            self.send_message(kline_send_obj)
            
            # Start the scheduled heartbeat task
            schedule.every(30).seconds.do(self.ping)
            
        except Exception as e:
            logger.error(f"Error sending initial messages: {str(e)}")

    def on_message(self, ws, message):
        """Callback for incoming messages"""
        try:
            logger.info(f"Message received: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")

    def on_close(self, ws, close_status_code, close_msg):
        """Callback when the connection is closed"""
        logger.info(f"Connection closed: {close_status_code} - {close_msg}")
        self.is_ws_connected = False  # Set connection status to False

    def on_error(self, ws, error):
        """Error handling callback"""
        logger.error(f"WebSocket error: {str(error)}")
        self.is_ws_connected = False  # Set connection status to False on error

    def send_message(self, message_obj):
        """Send a message to the WebSocket server"""
        if self.is_connected():
            try:
                self.session.send(json.dumps(message_obj))
            except Exception as e:
                logger.error(f"Error sending message: {str(e)}")
        else:
            logger.warning("Cannot send message: Not connected")

    def ping(self):
        """Send heartbeat packet"""
        ping_obj = {
            "code": 10010,
            "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
        }
        self.send_message(ping_obj)

# Usage example
if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()
    
    # Keep the main thread running
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Exiting...")
        if ws_client.is_connected():
            ws_client.session.close()



```

{% endtab %}

{% tab title="Go" %}

```go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// CryptoWebsocketClient 加密货币行情WebSocket客户端
type CryptoWebsocketClient struct {
	apiKey             string        // 认证API Key
	business           string        // 业务类型：stock/crypto/common
	wsURL              string        // WebSocket连接地址
	conn               *websocket.Conn // WebSocket连接实例
	isConnected        bool          // 连接状态
	reconnectInterval  time.Duration // 重连间隔
	heartbeatInterval  time.Duration // 心跳间隔
	done               chan struct{} // 退出信号通道
}

// NewCryptoWebsocketClient 创建客户端实例
func NewCryptoWebsocketClient(apiKey, business string) *CryptoWebsocketClient {
	// 构建WS URL
	params := url.Values{}
	params.Set("business", business)
	params.Set("apikey", apiKey)
	wsURL := fmt.Sprintf("wss://data.infoway.io/ws?%s", params.Encode())

	return &CryptoWebsocketClient{
		apiKey:             apiKey,
		business:           business,
		wsURL:              wsURL,
		reconnectInterval:  10 * time.Second,  // 10秒重连间隔
		heartbeatInterval:  30 * time.Second,  // 30秒心跳间隔
		done:               make(chan struct{}),
	}
}

// generateTraceID 生成唯一的trace ID（对齐Java/Python的UUID）
func (c *CryptoWebsocketClient) generateTraceID() string {
	return uuid.New().String()
}

// connect 建立WebSocket连接
func (c *CryptoWebsocketClient) connect() error {
	// 关闭旧连接
	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}

	// 建立新连接
	conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, nil)
	if err != nil {
		c.isConnected = false
		return fmt.Errorf("连接失败: %w", err)
	}

	c.conn = conn
	c.isConnected = true
	log.Printf("WebSocket连接成功: %s", c.wsURL)

	// 连接成功后发送订阅请求
	go c.sendAllSubscribeRequests()

	// 启动心跳任务
	go c.startHeartbeat()

	// 启动消息监听
	go c.listenMessages()

	return nil
}

// sendAllSubscribeRequests 发送所有订阅请求（成交明细、盘口、K线）
func (c *CryptoWebsocketClient) sendAllSubscribeRequests() {
	if !c.isConnected || c.conn == nil {
		log.Println("连接未建立，无法发送订阅请求")
		return
	}

	// 1. 订阅实时成交明细（协议号10000）
	if err := c.sendTradeSubscribe(); err != nil {
		log.Printf("发送成交明细订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 2. 订阅实时盘口数据（协议号10003）
	if err := c.sendDepthSubscribe(); err != nil {
		log.Printf("发送盘口订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 3. 订阅1分钟K线数据（协议号10006）
	if err := c.sendKlineSubscribe(); err != nil {
		log.Printf("发送K线订阅失败: %v", err)
		return
	}
}

// sendTradeSubscribe 发送实时成交明细订阅请求
func (c *CryptoWebsocketClient) sendTradeSubscribe() error {
	msg := map[string]interface{}{
		"code":  10000,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendDepthSubscribe 发送实时盘口数据订阅请求
func (c *CryptoWebsocketClient) sendDepthSubscribe() error {
	msg := map[string]interface{}{
		"code":  10003,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendKlineSubscribe 发送1分钟K线数据订阅请求
func (c *CryptoWebsocketClient) sendKlineSubscribe() error {
	msg := map[string]interface{}{
		"code":  10006,
		"trace": c.generateTraceID(),
		"data": map[string]interface{}{
			"arr": []map[string]interface{}{
				{
					"type":  1,        // 1分钟K线
					"codes": "BTCUSDT",
				},
			},
		},
	}
	return c.sendJSONMessage(msg)
}

// sendHeartbeat 发送心跳包（协议号10010）
func (c *CryptoWebsocketClient) sendHeartbeat() error {
	msg := map[string]interface{}{
		"code":  10010,
		"trace": c.generateTraceID(),
	}
	return c.sendJSONMessage(msg)
}

// sendJSONMessage 发送JSON格式消息
func (c *CryptoWebsocketClient) sendJSONMessage(msg interface{}) error {
	if !c.isConnected || c.conn == nil {
		return fmt.Errorf("连接已断开")
	}

	// 序列化为JSON
	data, err := json.Marshal(msg)
	if err != nil {
		return fmt.Errorf("消息序列化失败: %w", err)
	}

	// 发送消息
	if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
		c.isConnected = false
		return fmt.Errorf("发送消息失败: %w", err)
	}

	log.Printf("发送消息: %s", string(data))
	return nil
}

// startHeartbeat 启动心跳保活任务
func (c *CryptoWebsocketClient) startHeartbeat() {
	ticker := time.NewTicker(c.heartbeatInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// 定时发送心跳
			if err := c.sendHeartbeat(); err != nil {
				log.Printf("发送心跳失败: %v", err)
				return
			}
			log.Printf("发送心跳包成功")
		case <-c.done:
			// 收到退出信号，停止心跳
			return
		}
	}
}

// listenMessages 监听服务端推送的消息
func (c *CryptoWebsocketClient) listenMessages() {
	for {
		if !c.isConnected || c.conn == nil {
			return
		}

		// 读取消息
		_, msgData, err := c.conn.ReadMessage()
		if err != nil {
			c.isConnected = false
			log.Printf("读取消息失败/连接断开: %v", err)
			return
		}

		// 打印原始消息
		log.Printf("收到服务端消息: %s", string(msgData))

		// 解析并处理消息
		c.handleReceivedMessage(msgData)
	}
}

// handleReceivedMessage 处理接收到的消息（按协议号分类）
func (c *CryptoWebsocketClient) handleReceivedMessage(msgData []byte) {
	var msg map[string]interface{}
	if err := json.Unmarshal(msgData, &msg); err != nil {
		log.Printf("解析消息失败: %v", err)
		return
	}

	// 提取核心字段
	code := int(msg["code"].(float64)) // JSON数字默认解析为float64
	trace := msg["trace"].(string)
	data := msg["data"]

	// 按协议号处理不同类型数据
	switch code {
	case 10000:
		log.Printf("处理成交明细数据 [trace=%s]: %+v", trace, data)
	case 10003:
		log.Printf("处理盘口数据 [trace=%s]: %+v", trace, data)
	case 10006:
		log.Printf("处理K线数据 [trace=%s]: %+v", trace, data)
	case 10010:
		log.Printf("收到心跳响应 [trace=%s]", trace)
	default:
		log.Printf("未知协议号消息 [code=%d]: %s", code, string(msgData))
	}
}

// startReconnectLoop 启动自动重连循环
func (c *CryptoWebsocketClient) startReconnectLoop() {
	for {
		select {
		case <-c.done:
			// 收到退出信号，停止重连
			return
		default:
			if !c.isConnected {
				log.Printf("尝试重连WebSocket（间隔%v）...", c.reconnectInterval)
				// 重连前随机延迟（避免高频重试）
				time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
				if err := c.connect(); err != nil {
					log.Printf("重连失败: %v", err)
					time.Sleep(c.reconnectInterval)
					continue
				}
			}
			// 连接正常时，短暂休眠后再次检查
			time.Sleep(1 * time.Second)
		}
	}
}

// Start 启动客户端
func (c *CryptoWebsocketClient) Start() {
	// 启动自动重连循环
	go c.startReconnectLoop()

	// 首次连接
	if err := c.connect(); err != nil {
		log.Printf("首次连接失败: %v", err)
	}

	// 监听系统退出信号（Ctrl+C）
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	// 收到退出信号，清理资源
	log.Println("收到退出信号，关闭客户端...")
	close(c.done)
	if c.conn != nil {
		c.conn.Close()
	}
	c.isConnected = false
}

func main() {
	// 替换为你的实际API Key
	apiKey := "yourApikey"
	// 业务类型：crypto/stock/common
	business := "crypto"

	// 创建客户端
	client := NewCryptoWebsocketClient(apiKey, business)

	// 启动客户端
	client.Start()
}
```

{% endtab %}

{% tab title="Php" %}

```php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use WebSocket\Client;
use WebSocket\ConnectionException;

/**
 * 加密货币行情WebSocket客户端
 */
class CryptoWebsocketClient
{
    // 配置项
    private $apiKey;             // 认证API Key
    private $business;           // 业务类型：stock/crypto/common
    private $wsUrl;              // WebSocket连接地址
    private $reconnectInterval;  // 重连间隔（秒）
    private $heartbeatInterval;  // 心跳间隔（秒）
    
    // 运行状态
    private $client;             // WebSocket客户端实例
    private $isConnected = false;// 连接状态
    private $running = true;     // 客户端运行状态

    /**
     * 构造函数
     * @param string $apiKey API密钥
     * @param string $business 业务类型
     */
    public function __construct(string $apiKey, string $business = 'crypto')
    {
        $this->apiKey = $apiKey;
        $this->business = $business;
        // 构建WS URL
        $this->wsUrl = sprintf(
            'wss://data.infoway.io/ws?business=%s&apikey=%s',
            urlencode($business),
            urlencode($apiKey)
        );
        $this->reconnectInterval = 10;  // 10秒重连间隔
        $this->heartbeatInterval = 30;  // 30秒心跳间隔
    }

    /**
     * 生成唯一Trace ID
     * @return string
     */
    private function generateTraceId(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }

    /**
     * 建立WebSocket连接
     * @return bool
     */
    private function connect(): bool
    {
        try {
            // 关闭旧连接
            if ($this->client) {
                try {
                    $this->client->close();
                } catch (Exception $e) {
                    // 忽略关闭异常
                }
                $this->client = null;
            }

            // 创建新连接
            $this->client = new Client($this->wsUrl, [
                'timeout' => 30,
                'fragment_size' => 8192,
            ]);
            $this->isConnected = true;
            echo date('Y-m-d H:i:s') . " - WebSocket连接成功: {$this->wsUrl}\n";
            
            // 发送所有订阅请求
            $this->sendAllSubscribeRequests();
            
            return true;
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - WebSocket连接失败: {$e->getMessage()}\n";
            return false;
        } catch (Exception $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接异常: {$e->getMessage()}\n";
            return false;
        }
    }

    /**
     * 发送所有订阅请求
     */
    private function sendAllSubscribeRequests(): void
    {
        if (!$this->isConnected || !$this->client) {
            echo date('Y-m-d H:i:s') . " - 连接未建立，无法发送订阅请求\n";
            return;
        }

        try {
            // 1. 订阅实时成交明细（协议号10000）
            $this->sendTradeSubscribe();
            sleep(5); // 间隔5秒

            // 2. 订阅实时盘口数据（协议号10003）
            $this->sendDepthSubscribe();
            sleep(5); // 间隔5秒

            // 3. 订阅1分钟K线数据（协议号10006）
            $this->sendKlineSubscribe();
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 发送订阅请求失败: {$e->getMessage()}\n";
            $this->isConnected = false;
        }
    }

    /**
     * 发送成交明细订阅请求
     */
    private function sendTradeSubscribe(): void
    {
        $msg = [
            'code'  => 10000,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送盘口数据订阅请求
     */
    private function sendDepthSubscribe(): void
    {
        $msg = [
            'code'  => 10003,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送K线数据订阅请求
     */
    private function sendKlineSubscribe(): void
    {
        $msg = [
            'code'  => 10006,
            'trace' => $this->generateTraceId(),
            'data'  => [
                'arr' => [
                    ['type' => 1, 'codes' => 'BTCUSDT'] // 1分钟K线
                ]
            ]
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送心跳包
     */
    private function sendHeartbeat(): void
    {
        $msg = [
            'code'  => 10010,
            'trace' => $this->generateTraceId()
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送JSON格式消息
     * @param array $msg 消息数组
     */
    private function sendJsonMessage(array $msg): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            $jsonStr = json_encode($msg, JSON_UNESCAPED_UNICODE);
            $this->client->send($jsonStr);
            echo date('Y-m-d H:i:s') . " - 发送消息: {$jsonStr}\n";
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 发送消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 消息序列化失败: {$e->getMessage()}\n";
        }
    }

    /**
     * 监听服务端消息
     */
    private function listenMessages(): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            // 设置非阻塞读取（避免卡死）
            $socket = $this->client->getSocket();
            stream_set_blocking($socket, false);

            $message = $this->client->receive();
            if ($message !== '') {
                echo date('Y-m-d H:i:s') . " - 收到消息: {$message}\n";
                $this->handleReceivedMessage($message);
            }
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接断开/读取消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            // 忽略非阻塞读取的空消息异常
            if (strpos($e->getMessage(), 'no data received') === false) {
                echo date('Y-m-d H:i:s') . " - 监听消息异常: {$e->getMessage()}\n";
            }
        }
    }

    /**
     * 处理接收到的消息
     * @param string $message 原始消息字符串
     */
    private function handleReceivedMessage(string $message): void
    {
        try {
            $msg = json_decode($message, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                echo date('Y-m-d H:i:s') . " - 解析消息失败: 格式错误\n";
                return;
            }

            $code = $msg['code'] ?? 0;
            $trace = $msg['trace'] ?? '';
            $data = $msg['data'] ?? [];

            switch ($code) {
                case 10000:
                    echo date('Y-m-d H:i:s') . " - 处理成交明细 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10003:
                    echo date('Y-m-d H:i:s') . " - 处理盘口数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10006:
                    echo date('Y-m-d H:i:s') . " - 处理K线数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10010:
                    echo date('Y-m-d H:i:s') . " - 收到心跳响应 [trace={$trace}]\n";
                    break;
                default:
                    echo date('Y-m-d H:i:s') . " - 未知协议号 [code={$code}]: {$message}\n";
            }
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 处理消息异常: {$e->getMessage()}\n";
        }
    }

    /**
     * 启动心跳任务
     * @param int $lastHeartbeatTime 上次心跳时间戳引用
     */
    private function checkHeartbeat(&$lastHeartbeatTime): void
    {
        $currentTime = time();
        if ($currentTime - $lastHeartbeatTime >= $this->heartbeatInterval) {
            $this->sendHeartbeat();
            $lastHeartbeatTime = $currentTime;
        }
    }

    /**
     * 启动客户端
     */
    public function start(): void
    {
        echo date('Y-m-d H:i:s') . " - 启动WebSocket客户端...\n";
        
        // 注册退出信号处理
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, function () {
            echo "\n" . date('Y-m-d H:i:s') . " - 收到退出信号，停止客户端...\n";
            $this->running = false;
            if ($this->client) {
                $this->client->close();
            }
            exit(0);
        });

        $lastHeartbeatTime = 0;
        $reconnectDelay = 0;

        // 主循环
        while ($this->running) {
            // 处理重连逻辑
            if (!$this->isConnected) {
                if ($reconnectDelay <= 0) {
                    $this->connect();
                    $reconnectDelay = $this->reconnectInterval;
                    $lastHeartbeatTime = time();
                } else {
                    $reconnectDelay--;
                    echo date('Y-m-d H:i:s') . " - 等待重连... ({$reconnectDelay}秒)\n";
                }
            } else {
                // 监听消息
                $this->listenMessages();
                // 检查心跳
                $this->checkHeartbeat($lastHeartbeatTime);
            }

            // 主循环延迟（避免CPU占用过高）
            usleep(100000); // 100毫秒
        }

        // 清理资源
        if ($this->client) {
            $this->client->close();
        }
        echo date('Y-m-d H:i:s') . " - 客户端已停止\n";
    }

    /**
     * 析构函数
     */
    public function __destruct()
    {
        if ($this->client) {
            try {
                $this->client->close();
            } catch (Exception $e) {
                // 忽略
            }
        }
    }
}

// 主程序入口
if (php_sapi_name() !== 'cli') {
    die("该脚本仅支持CLI模式运行！");
}

// 配置（替换为你的实际API Key）
$apiKey = 'yourApikey';
$business = 'crypto';

// 创建并启动客户端
$client = new CryptoWebsocketClient($apiKey, $business);
$client->start();
```

{% endtab %}

{% tab title="Javascript" %}

```javascript
/**
 * 加密货币行情WebSocket客户端（JS通用版）
 * 支持浏览器/Node.js环境（Node.js需确保版本≥10.0.0）
 */
class CryptoWebsocketClient {
  /**
   * 构造函数
   * @param {string} apiKey - 认证API Key
   * @param {string} business - 业务类型：stock/crypto/common
   */
  constructor(apiKey, business = 'crypto') {
    // 配置项
    this.apiKey = apiKey;
    this.business = business;
    this.wsUrl = `wss://data.infoway.io/ws?business=${encodeURIComponent(business)}&apikey=${encodeURIComponent(apiKey)}`;
    this.reconnectInterval = 10000; // 重连间隔（10秒，单位ms）
    this.heartbeatInterval = 30000; // 心跳间隔（30秒，单位ms）

    // 运行状态
    this.ws = null; // WebSocket实例
    this.isConnected = false; // 连接状态
    this.heartbeatTimer = null; // 心跳定时器
    this.reconnectTimer = null; // 重连定时器
    this.subscribeSent = false; // 订阅请求是否已发送

    // 绑定方法上下文（避免回调中this丢失）
    this.onOpen = this.onOpen.bind(this);
    this.onMessage = this.onMessage.bind(this);
    this.onClose = this.onClose.bind(this);
    this.onError = this.onError.bind(this);
    this.sendHeartbeat = this.sendHeartbeat.bind(this);
  }

  /**
   * 生成唯一Trace ID（模拟UUID）
   * @returns {string}
   */
  generateTraceId() {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
      const r = Math.random() * 16 | 0;
      const v = c === 'x' ? r : (r & 0x3 | 0x8);
      return v.toString(16);
    });
  }

  /**
   * 建立WebSocket连接
   */
  connect() {
    // 清理旧连接
    if (this.ws) {
      try {
        this.ws.close(1000, 'Reconnecting');
      } catch (e) {
        console.error('关闭旧连接失败:', e);
      }
      this.ws = null;
    }

    try {
      // 创建新连接
      this.ws = new WebSocket(this.wsUrl);
      // 注册事件回调
      this.ws.onopen = this.onOpen;
      this.ws.onmessage = this.onMessage;
      this.ws.onclose = this.onClose;
      this.ws.onerror = this.onError;
      console.log(`[${new Date().toLocaleString()}] 正在连接WebSocket: ${this.wsUrl}`);
    } catch (e) {
      this.isConnected = false;
      console.error(`[${new Date().toLocaleString()}] 连接失败:`, e);
      this.scheduleReconnect();
    }
  }

  /**
   * 连接成功回调
   */
  onOpen() {
    this.isConnected = true;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] WebSocket连接成功`);

    // 清理重连定时器
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 发送订阅请求
    this.sendAllSubscribeRequests();

    // 启动心跳
    this.startHeartbeat();
  }

  /**
   * 发送所有订阅请求
   */
  sendAllSubscribeRequests() {
    if (this.subscribeSent || !this.isConnected) return;
    this.subscribeSent = true;

    // 1. 订阅实时成交明细（协议号10000）
    this.sendTradeSubscribe();
    
    // 间隔5秒发送盘口订阅
    setTimeout(() => {
      if (this.isConnected) {
        this.sendDepthSubscribe();
      }
    }, 5000);

    // 间隔10秒发送K线订阅（5+5）
    setTimeout(() => {
      if (this.isConnected) {
        this.sendKlineSubscribe();
      }
    }, 10000);
  }

  /**
   * 发送成交明细订阅请求
   */
  sendTradeSubscribe() {
    const msg = {
      code: 10000,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送盘口数据订阅请求
   */
  sendDepthSubscribe() {
    const msg = {
      code: 10003,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送K线数据订阅请求
   */
  sendKlineSubscribe() {
    const msg = {
      code: 10006,
      trace: this.generateTraceId(),
      data: {
        arr: [{ type: 1, codes: 'BTCUSDT' }] // 1分钟K线
      }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送心跳包
   */
  sendHeartbeat() {
    const msg = {
      code: 10010,
      trace: this.generateTraceId()
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送JSON格式消息
   * @param {object} msg - 消息对象
   */
  sendJsonMessage(msg) {
    if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
      console.warn(`[${new Date().toLocaleString()}] 连接未就绪，跳过消息发送`);
      return;
    }

    try {
      const jsonStr = JSON.stringify(msg);
      this.ws.send(jsonStr);
      console.log(`[${new Date().toLocaleString()}] 发送消息:`, jsonStr);
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 发送消息失败:`, e);
      this.isConnected = false;
      this.scheduleReconnect();
    }
  }

  /**
   * 接收消息回调
   * @param {MessageEvent} event - 消息事件
   */
  onMessage(event) {
    const message = event.data;
    console.log(`[${new Date().toLocaleString()}] 收到消息:`, message);
    this.handleReceivedMessage(message);
  }

  /**
   * 处理接收到的消息
   * @param {string} message - 原始消息字符串
   */
  handleReceivedMessage(message) {
    try {
      const msg = JSON.parse(message);
      const code = msg.code || 0;
      const trace = msg.trace || '';
      const data = msg.data || {};

      switch (code) {
        case 10000:
          console.log(`[${new Date().toLocaleString()}] 处理成交明细 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10003:
          console.log(`[${new Date().toLocaleString()}] 处理盘口数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10006:
          console.log(`[${new Date().toLocaleString()}] 处理K线数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10010:
          console.log(`[${new Date().toLocaleString()}] 收到心跳响应 [trace=${trace}]`);
          break;
        default:
          console.warn(`[${new Date().toLocaleString()}] 未知协议号 [code=${code}]:`, message);
      }
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 处理消息失败:`, e);
    }
  }

  /**
   * 连接关闭回调
   * @param {CloseEvent} event - 关闭事件
   */
  onClose(event) {
    this.isConnected = false;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] 连接关闭: 代码=${event.code}, 原因=${event.reason}`);
    
    // 清理心跳定时器
    this.stopHeartbeat();
    
    // 触发重连
    this.scheduleReconnect();
  }

  /**
   * 错误回调
   * @param {Event} error - 错误事件
   */
  onError(error) {
    console.error(`[${new Date().toLocaleString()}] WebSocket错误:`, error);
    this.isConnected = false;
    this.scheduleReconnect();
  }

  /**
   * 启动心跳任务
   */
  startHeartbeat() {
    // 清理旧定时器
    this.stopHeartbeat();
    
    // 立即发送一次心跳，然后定时发送
    this.sendHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.isConnected) {
        this.sendHeartbeat();
      }
    }, this.heartbeatInterval);
  }

  /**
   * 停止心跳任务
   */
  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  /**
   * 调度重连
   */
  scheduleReconnect() {
    if (this.reconnectTimer) return;
    
    console.log(`[${new Date().toLocaleString()}] ${this.reconnectInterval/1000}秒后尝试重连...`);
    this.reconnectTimer = setTimeout(() => {
      this.reconnectTimer = null;
      this.connect();
    }, this.reconnectInterval);
  }

  /**
   * 启动客户端
   */
  start() {
    console.log(`[${new Date().toLocaleString()}] 启动WebSocket客户端...`);
    this.connect();

    // Node.js环境下监听退出信号
    if (typeof process !== 'undefined' && process.on) {
      process.on('SIGINT', () => {
        console.log(`\n[${new Date().toLocaleString()}] 收到退出信号，关闭客户端...`);
        this.stop();
        process.exit(0);
      });
    }
  }

  /**
   * 停止客户端
   */
  stop() {
    this.isConnected = false;
    this.subscribeSent = false;
    
    // 清理定时器
    this.stopHeartbeat();
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 关闭连接
    if (this.ws) {
      this.ws.close(1000, 'Client stopped');
      this.ws = null;
    }

    console.log(`[${new Date().toLocaleString()}] 客户端已停止`);
  }
}

// ==================== 运行示例 ====================
// 浏览器环境：直接在控制台执行以下代码
// Node.js环境：直接运行该脚本

// 替换为你的实际API Key
const API_KEY = 'yourApikey';
const BUSINESS = 'crypto';

// 创建并启动客户端
const client = new CryptoWebsocketClient(API_KEY, BUSINESS);
client.start();

// 如需停止客户端，执行：client.stop();
```

{% endtab %}
{% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.infoway.io/en-docs/websocket/websocket-code-example.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
