# WebSocket Code Example

{% tabs %}
{% tab title="Jave" %}

```java
package org.example.ws;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {

    // Local session channel
    private static Session session;

    // WebSocket subscription address — “business” can be stock, crypto, or common; “apikey” is your credential
    private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";

    @PostConstruct
    public void connectAll() {
        try {
            //Establish WebSocket connection
            connect(WS_URL);
            //Enable auto-reconnect
            startReconnection(WS_URL);
        } catch (Exception e) {
            log.error("Failed to connect to " + WS_URL + ": " + e.getMessage());
        }
    }

    //The auto-reconnect mechanism starts a scheduled thread to check connection status; if disconnected, it reconnects automatically
    private void startReconnection(String s) {
        ScheduledExecutorService usExecutor = Executors.newScheduledThreadPool(1);
        Runnable usTask = () -> {
            if (session == null || !session.isOpen()) {
                log.debug("reconnection...");
                connect(s);
            }
        };
        usExecutor.scheduleAtFixedRate(usTask, 1000, 10000, TimeUnit.MILLISECONDS);
    }

    //Implementation of WebSocket connection establishment
    private void connect(String s) {
        try {
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            session = container.connectToServer(WebsocketExample.class, URI.create(s));
        } catch (DeploymentException | IOException e) {
            log.error("Failed to connect to the server: {}", e.getMessage());
        }
    }

    //The following method is executed when the WebSocket connection is successfully established
    @OnOpen
    public void onOpen(Session session) throws IOException, InterruptedException {
        //This method is triggered upon successful WebSocket connection
        System.out.println("Connection opened: " + session.getId());

        // Send a latest trade details subscription request
        JSONObject tradeSendObj = new JSONObject();
        //Refer to the WebSocket API docs for different protocol codes. 10000 is for latest trade subscription
        tradeSendObj.put("code", 10000);
        //Custom trace ID
        tradeSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format
        JSONObject data = new JSONObject();
        //Subscribe to BTCUSDT
        data.put("codes", "BTCUSDT");
        tradeSendObj.put("data", data);
        //Send the latest trade subscription request
        session.getBasicRemote().sendText(tradeSendObj.toJSONString());

        //-----------------------------------------------------------------//
        //Add a delay between different requests
        Thread.sleep(5000);

        //Send the real-time order book (depth) data subscription request
        JSONObject depthSendObj = new JSONObject();
        //Refer to the WebSocket API docs for different protocol codes. 10003 is for real-time order book subscription
        depthSendObj.put("code", 10003);
        //Custom trace ID
        depthSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format
        depthSendObj.put("data", data);
        //Send subscription request
        session.getBasicRemote().sendText(depthSendObj.toJSONString());

        
        //-----------------------------------------------------------------//
        //Add a delay between different requests
        Thread.sleep(5000);

        //Send the candlestick data subscription request
        JSONObject klineSendObj = new JSONObject();
        //Protocol 10006 is for candlestick subscription
        klineSendObj.put("code", 10006);
        //Custom trace ID
        klineSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
        //Build the subscription request entity in JSON format        JSONObject klineData = new JSONObject();
        JSONArray klineDataArray = new JSONArray();

        //Build the entity for 1-minute candle subscription
        JSONObject kline1minObj = new JSONObject();
        //“type” corresponds to the klineType parameter
        kline1minObj.put("type", 1);
        kline1minObj.put("codes", "BTCUSDT");
        klineDataArray.add(kline1minObj);
        klineData.put("arr", klineDataArray);

        klineSendObj.put("data", klineData);
        //Send the subscription request
        session.getBasicRemote().sendText(klineSendObj.toJSONString());

        //Schedule a heartbeat task
        ScheduledExecutorService pingExecutor = Executors.newScheduledThreadPool(1);
        Runnable pingTask = WebsocketExample::ping;
        pingExecutor.scheduleAtFixedRate(pingTask, 30, 30, TimeUnit.SECONDS);

    }

    @OnMessage
    public void onMessage(String message, Session session) {
        //Receives messages returned by the INFOWAY server, includes subscription success/failure notifications and actual market data pushes
        try {
            System.out.println("Message received: " + message);
        } catch (Exception e) {
        }
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //This method is called when the WebSocket connection is closed
        System.out.println("Connection closed: " + session.getId() + ", reason: " + reason);
    }

    @OnError
    public void onError(Throwable error) {
        error.printStackTrace();
    }

    //Continuously send heartbeat messages to prevent the server from closing the connection due to inactivity
    public static void ping() {
        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("code", 10010);
            jsonObject.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
            session.getBasicRemote().sendText(jsonObject.toJSONString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

```

{% endtab %}

{% tab title="Python" %}

```python
import json
import time
import schedule
import threading
import websocket
from loguru import logger

class WebsocketExample:
    def __init__(self):
        self.session = None
        self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
        self.reconnecting = False
        self.is_ws_connected = False  # Add connection status flag

    def connect_all(self):
        """Establish WebSocket connection and start auto-reconnect mechanism"""
        try:
            self.connect(self.ws_url)
            self.start_reconnection(self.ws_url)
        except Exception as e:
            logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")

    def start_reconnection(self, url):
        """Start scheduled reconnection checks"""
        def check_connection():
            if not self.is_connected():
                logger.debug("Reconnection attempt...")
                self.connect(url)
        
        # Use a thread to periodically check connection status
        schedule.every(10).seconds.do(check_connection)
        def run_scheduler():
            while True:
                schedule.run_pending()
                time.sleep(1)
        threading.Thread(target=run_scheduler, daemon=True).start()

    def is_connected(self):
        """Check WebSocket connection status"""
        return self.session and self.is_ws_connected

    def connect(self, url):
        """Establish a WebSocket connection"""
        try:
            if self.is_connected():
                self.session.close()
            
            self.session = websocket.WebSocketApp(
                url,
                on_open=self.on_open,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close
            )
            
            # Start the WebSocket connection (non-blocking)
            threading.Thread(target=self.session.run_forever, daemon=True).start()
        except Exception as e:
            logger.error(f"Failed to connect to the server: {str(e)}")

    def on_open(self, ws):
        """Callback when WebSocket connection is successfully established"""
        logger.info(f"Connection opened")
        self.is_ws_connected = True  # Set connection status to True
        
        try:
            # Send realtime trade subscription request
            trade_send_obj = {
                "code": 10000,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(trade_send_obj)
            
            # Wait a while between requests
            time.sleep(5)
            
            # Send realtime order book subscription request
            depth_send_obj = {
                "code": 10003,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": {"codes": "BTCUSDT"}
            }
            self.send_message(depth_send_obj)
            
            # Wait a while between requests
            time.sleep(5)
            
            # Send realtime candlestick (K-line) subscription request
            kline_data = {
                "arr": [
                    {
                        "type": 1,
                        "codes": "BTCUSDT"
                    }
                ]
            }
            kline_send_obj = {
                "code": 10006,
                "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
                "data": kline_data
            }
            self.send_message(kline_send_obj)
            
            # Start the scheduled heartbeat task
            schedule.every(30).seconds.do(self.ping)
            
        except Exception as e:
            logger.error(f"Error sending initial messages: {str(e)}")

    def on_message(self, ws, message):
        """Callback for incoming messages"""
        try:
            logger.info(f"Message received: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")

    def on_close(self, ws, close_status_code, close_msg):
        """Callback when the connection is closed"""
        logger.info(f"Connection closed: {close_status_code} - {close_msg}")
        self.is_ws_connected = False  # Set connection status to False

    def on_error(self, ws, error):
        """Error handling callback"""
        logger.error(f"WebSocket error: {str(error)}")
        self.is_ws_connected = False  # Set connection status to False on error

    def send_message(self, message_obj):
        """Send a message to the WebSocket server"""
        if self.is_connected():
            try:
                self.session.send(json.dumps(message_obj))
            except Exception as e:
                logger.error(f"Error sending message: {str(e)}")
        else:
            logger.warning("Cannot send message: Not connected")

    def ping(self):
        """Send heartbeat packet"""
        ping_obj = {
            "code": 10010,
            "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
        }
        self.send_message(ping_obj)

# Usage example
if __name__ == "__main__":
    ws_client = WebsocketExample()
    ws_client.connect_all()
    
    # Keep the main thread running
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Exiting...")
        if ws_client.is_connected():
            ws_client.session.close()



```

{% endtab %}

{% tab title="Go" %}

```go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// CryptoWebsocketClient 加密货币行情WebSocket客户端
type CryptoWebsocketClient struct {
	apiKey             string        // 认证API Key
	business           string        // 业务类型：stock/crypto/common
	wsURL              string        // WebSocket连接地址
	conn               *websocket.Conn // WebSocket连接实例
	isConnected        bool          // 连接状态
	reconnectInterval  time.Duration // 重连间隔
	heartbeatInterval  time.Duration // 心跳间隔
	done               chan struct{} // 退出信号通道
}

// NewCryptoWebsocketClient 创建客户端实例
func NewCryptoWebsocketClient(apiKey, business string) *CryptoWebsocketClient {
	// 构建WS URL
	params := url.Values{}
	params.Set("business", business)
	params.Set("apikey", apiKey)
	wsURL := fmt.Sprintf("wss://data.infoway.io/ws?%s", params.Encode())

	return &CryptoWebsocketClient{
		apiKey:             apiKey,
		business:           business,
		wsURL:              wsURL,
		reconnectInterval:  10 * time.Second,  // 10秒重连间隔
		heartbeatInterval:  30 * time.Second,  // 30秒心跳间隔
		done:               make(chan struct{}),
	}
}

// generateTraceID 生成唯一的trace ID（对齐Java/Python的UUID）
func (c *CryptoWebsocketClient) generateTraceID() string {
	return uuid.New().String()
}

// connect 建立WebSocket连接
func (c *CryptoWebsocketClient) connect() error {
	// 关闭旧连接
	if c.conn != nil {
		c.conn.Close()
		c.conn = nil
	}

	// 建立新连接
	conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, nil)
	if err != nil {
		c.isConnected = false
		return fmt.Errorf("连接失败: %w", err)
	}

	c.conn = conn
	c.isConnected = true
	log.Printf("WebSocket连接成功: %s", c.wsURL)

	// 连接成功后发送订阅请求
	go c.sendAllSubscribeRequests()

	// 启动心跳任务
	go c.startHeartbeat()

	// 启动消息监听
	go c.listenMessages()

	return nil
}

// sendAllSubscribeRequests 发送所有订阅请求（成交明细、盘口、K线）
func (c *CryptoWebsocketClient) sendAllSubscribeRequests() {
	if !c.isConnected || c.conn == nil {
		log.Println("连接未建立，无法发送订阅请求")
		return
	}

	// 1. 订阅实时成交明细（协议号10000）
	if err := c.sendTradeSubscribe(); err != nil {
		log.Printf("发送成交明细订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 2. 订阅实时盘口数据（协议号10003）
	if err := c.sendDepthSubscribe(); err != nil {
		log.Printf("发送盘口订阅失败: %v", err)
		return
	}
	time.Sleep(5 * time.Second) // 间隔5秒

	// 3. 订阅1分钟K线数据（协议号10006）
	if err := c.sendKlineSubscribe(); err != nil {
		log.Printf("发送K线订阅失败: %v", err)
		return
	}
}

// sendTradeSubscribe 发送实时成交明细订阅请求
func (c *CryptoWebsocketClient) sendTradeSubscribe() error {
	msg := map[string]interface{}{
		"code":  10000,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendDepthSubscribe 发送实时盘口数据订阅请求
func (c *CryptoWebsocketClient) sendDepthSubscribe() error {
	msg := map[string]interface{}{
		"code":  10003,
		"trace": c.generateTraceID(),
		"data": map[string]string{
			"codes": "BTCUSDT",
		},
	}
	return c.sendJSONMessage(msg)
}

// sendKlineSubscribe 发送1分钟K线数据订阅请求
func (c *CryptoWebsocketClient) sendKlineSubscribe() error {
	msg := map[string]interface{}{
		"code":  10006,
		"trace": c.generateTraceID(),
		"data": map[string]interface{}{
			"arr": []map[string]interface{}{
				{
					"type":  1,        // 1分钟K线
					"codes": "BTCUSDT",
				},
			},
		},
	}
	return c.sendJSONMessage(msg)
}

// sendHeartbeat 发送心跳包（协议号10010）
func (c *CryptoWebsocketClient) sendHeartbeat() error {
	msg := map[string]interface{}{
		"code":  10010,
		"trace": c.generateTraceID(),
	}
	return c.sendJSONMessage(msg)
}

// sendJSONMessage 发送JSON格式消息
func (c *CryptoWebsocketClient) sendJSONMessage(msg interface{}) error {
	if !c.isConnected || c.conn == nil {
		return fmt.Errorf("连接已断开")
	}

	// 序列化为JSON
	data, err := json.Marshal(msg)
	if err != nil {
		return fmt.Errorf("消息序列化失败: %w", err)
	}

	// 发送消息
	if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
		c.isConnected = false
		return fmt.Errorf("发送消息失败: %w", err)
	}

	log.Printf("发送消息: %s", string(data))
	return nil
}

// startHeartbeat 启动心跳保活任务
func (c *CryptoWebsocketClient) startHeartbeat() {
	ticker := time.NewTicker(c.heartbeatInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// 定时发送心跳
			if err := c.sendHeartbeat(); err != nil {
				log.Printf("发送心跳失败: %v", err)
				return
			}
			log.Printf("发送心跳包成功")
		case <-c.done:
			// 收到退出信号，停止心跳
			return
		}
	}
}

// listenMessages 监听服务端推送的消息
func (c *CryptoWebsocketClient) listenMessages() {
	for {
		if !c.isConnected || c.conn == nil {
			return
		}

		// 读取消息
		_, msgData, err := c.conn.ReadMessage()
		if err != nil {
			c.isConnected = false
			log.Printf("读取消息失败/连接断开: %v", err)
			return
		}

		// 打印原始消息
		log.Printf("收到服务端消息: %s", string(msgData))

		// 解析并处理消息
		c.handleReceivedMessage(msgData)
	}
}

// handleReceivedMessage 处理接收到的消息（按协议号分类）
func (c *CryptoWebsocketClient) handleReceivedMessage(msgData []byte) {
	var msg map[string]interface{}
	if err := json.Unmarshal(msgData, &msg); err != nil {
		log.Printf("解析消息失败: %v", err)
		return
	}

	// 提取核心字段
	code := int(msg["code"].(float64)) // JSON数字默认解析为float64
	trace := msg["trace"].(string)
	data := msg["data"]

	// 按协议号处理不同类型数据
	switch code {
	case 10000:
		log.Printf("处理成交明细数据 [trace=%s]: %+v", trace, data)
	case 10003:
		log.Printf("处理盘口数据 [trace=%s]: %+v", trace, data)
	case 10006:
		log.Printf("处理K线数据 [trace=%s]: %+v", trace, data)
	case 10010:
		log.Printf("收到心跳响应 [trace=%s]", trace)
	default:
		log.Printf("未知协议号消息 [code=%d]: %s", code, string(msgData))
	}
}

// startReconnectLoop 启动自动重连循环
func (c *CryptoWebsocketClient) startReconnectLoop() {
	for {
		select {
		case <-c.done:
			// 收到退出信号，停止重连
			return
		default:
			if !c.isConnected {
				log.Printf("尝试重连WebSocket（间隔%v）...", c.reconnectInterval)
				// 重连前随机延迟（避免高频重试）
				time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
				if err := c.connect(); err != nil {
					log.Printf("重连失败: %v", err)
					time.Sleep(c.reconnectInterval)
					continue
				}
			}
			// 连接正常时，短暂休眠后再次检查
			time.Sleep(1 * time.Second)
		}
	}
}

// Start 启动客户端
func (c *CryptoWebsocketClient) Start() {
	// 启动自动重连循环
	go c.startReconnectLoop()

	// 首次连接
	if err := c.connect(); err != nil {
		log.Printf("首次连接失败: %v", err)
	}

	// 监听系统退出信号（Ctrl+C）
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan

	// 收到退出信号，清理资源
	log.Println("收到退出信号，关闭客户端...")
	close(c.done)
	if c.conn != nil {
		c.conn.Close()
	}
	c.isConnected = false
}

func main() {
	// 替换为你的实际API Key
	apiKey := "yourApikey"
	// 业务类型：crypto/stock/common
	business := "crypto"

	// 创建客户端
	client := NewCryptoWebsocketClient(apiKey, business)

	// 启动客户端
	client.Start()
}
```

{% endtab %}

{% tab title="Php" %}

```php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use WebSocket\Client;
use WebSocket\ConnectionException;

/**
 * 加密货币行情WebSocket客户端
 */
class CryptoWebsocketClient
{
    // 配置项
    private $apiKey;             // 认证API Key
    private $business;           // 业务类型：stock/crypto/common
    private $wsUrl;              // WebSocket连接地址
    private $reconnectInterval;  // 重连间隔（秒）
    private $heartbeatInterval;  // 心跳间隔（秒）
    
    // 运行状态
    private $client;             // WebSocket客户端实例
    private $isConnected = false;// 连接状态
    private $running = true;     // 客户端运行状态

    /**
     * 构造函数
     * @param string $apiKey API密钥
     * @param string $business 业务类型
     */
    public function __construct(string $apiKey, string $business = 'crypto')
    {
        $this->apiKey = $apiKey;
        $this->business = $business;
        // 构建WS URL
        $this->wsUrl = sprintf(
            'wss://data.infoway.io/ws?business=%s&apikey=%s',
            urlencode($business),
            urlencode($apiKey)
        );
        $this->reconnectInterval = 10;  // 10秒重连间隔
        $this->heartbeatInterval = 30;  // 30秒心跳间隔
    }

    /**
     * 生成唯一Trace ID
     * @return string
     */
    private function generateTraceId(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }

    /**
     * 建立WebSocket连接
     * @return bool
     */
    private function connect(): bool
    {
        try {
            // 关闭旧连接
            if ($this->client) {
                try {
                    $this->client->close();
                } catch (Exception $e) {
                    // 忽略关闭异常
                }
                $this->client = null;
            }

            // 创建新连接
            $this->client = new Client($this->wsUrl, [
                'timeout' => 30,
                'fragment_size' => 8192,
            ]);
            $this->isConnected = true;
            echo date('Y-m-d H:i:s') . " - WebSocket连接成功: {$this->wsUrl}\n";
            
            // 发送所有订阅请求
            $this->sendAllSubscribeRequests();
            
            return true;
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - WebSocket连接失败: {$e->getMessage()}\n";
            return false;
        } catch (Exception $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接异常: {$e->getMessage()}\n";
            return false;
        }
    }

    /**
     * 发送所有订阅请求
     */
    private function sendAllSubscribeRequests(): void
    {
        if (!$this->isConnected || !$this->client) {
            echo date('Y-m-d H:i:s') . " - 连接未建立，无法发送订阅请求\n";
            return;
        }

        try {
            // 1. 订阅实时成交明细（协议号10000）
            $this->sendTradeSubscribe();
            sleep(5); // 间隔5秒

            // 2. 订阅实时盘口数据（协议号10003）
            $this->sendDepthSubscribe();
            sleep(5); // 间隔5秒

            // 3. 订阅1分钟K线数据（协议号10006）
            $this->sendKlineSubscribe();
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 发送订阅请求失败: {$e->getMessage()}\n";
            $this->isConnected = false;
        }
    }

    /**
     * 发送成交明细订阅请求
     */
    private function sendTradeSubscribe(): void
    {
        $msg = [
            'code'  => 10000,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送盘口数据订阅请求
     */
    private function sendDepthSubscribe(): void
    {
        $msg = [
            'code'  => 10003,
            'trace' => $this->generateTraceId(),
            'data'  => ['codes' => 'BTCUSDT']
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送K线数据订阅请求
     */
    private function sendKlineSubscribe(): void
    {
        $msg = [
            'code'  => 10006,
            'trace' => $this->generateTraceId(),
            'data'  => [
                'arr' => [
                    ['type' => 1, 'codes' => 'BTCUSDT'] // 1分钟K线
                ]
            ]
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送心跳包
     */
    private function sendHeartbeat(): void
    {
        $msg = [
            'code'  => 10010,
            'trace' => $this->generateTraceId()
        ];
        $this->sendJsonMessage($msg);
    }

    /**
     * 发送JSON格式消息
     * @param array $msg 消息数组
     */
    private function sendJsonMessage(array $msg): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            $jsonStr = json_encode($msg, JSON_UNESCAPED_UNICODE);
            $this->client->send($jsonStr);
            echo date('Y-m-d H:i:s') . " - 发送消息: {$jsonStr}\n";
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 发送消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 消息序列化失败: {$e->getMessage()}\n";
        }
    }

    /**
     * 监听服务端消息
     */
    private function listenMessages(): void
    {
        if (!$this->isConnected || !$this->client) {
            return;
        }

        try {
            // 设置非阻塞读取（避免卡死）
            $socket = $this->client->getSocket();
            stream_set_blocking($socket, false);

            $message = $this->client->receive();
            if ($message !== '') {
                echo date('Y-m-d H:i:s') . " - 收到消息: {$message}\n";
                $this->handleReceivedMessage($message);
            }
        } catch (ConnectionException $e) {
            $this->isConnected = false;
            echo date('Y-m-d H:i:s') . " - 连接断开/读取消息失败: {$e->getMessage()}\n";
        } catch (Exception $e) {
            // 忽略非阻塞读取的空消息异常
            if (strpos($e->getMessage(), 'no data received') === false) {
                echo date('Y-m-d H:i:s') . " - 监听消息异常: {$e->getMessage()}\n";
            }
        }
    }

    /**
     * 处理接收到的消息
     * @param string $message 原始消息字符串
     */
    private function handleReceivedMessage(string $message): void
    {
        try {
            $msg = json_decode($message, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                echo date('Y-m-d H:i:s') . " - 解析消息失败: 格式错误\n";
                return;
            }

            $code = $msg['code'] ?? 0;
            $trace = $msg['trace'] ?? '';
            $data = $msg['data'] ?? [];

            switch ($code) {
                case 10000:
                    echo date('Y-m-d H:i:s') . " - 处理成交明细 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10003:
                    echo date('Y-m-d H:i:s') . " - 处理盘口数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10006:
                    echo date('Y-m-d H:i:s') . " - 处理K线数据 [trace={$trace}]: " . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n";
                    break;
                case 10010:
                    echo date('Y-m-d H:i:s') . " - 收到心跳响应 [trace={$trace}]\n";
                    break;
                default:
                    echo date('Y-m-d H:i:s') . " - 未知协议号 [code={$code}]: {$message}\n";
            }
        } catch (Exception $e) {
            echo date('Y-m-d H:i:s') . " - 处理消息异常: {$e->getMessage()}\n";
        }
    }

    /**
     * 启动心跳任务
     * @param int $lastHeartbeatTime 上次心跳时间戳引用
     */
    private function checkHeartbeat(&$lastHeartbeatTime): void
    {
        $currentTime = time();
        if ($currentTime - $lastHeartbeatTime >= $this->heartbeatInterval) {
            $this->sendHeartbeat();
            $lastHeartbeatTime = $currentTime;
        }
    }

    /**
     * 启动客户端
     */
    public function start(): void
    {
        echo date('Y-m-d H:i:s') . " - 启动WebSocket客户端...\n";
        
        // 注册退出信号处理
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, function () {
            echo "\n" . date('Y-m-d H:i:s') . " - 收到退出信号，停止客户端...\n";
            $this->running = false;
            if ($this->client) {
                $this->client->close();
            }
            exit(0);
        });

        $lastHeartbeatTime = 0;
        $reconnectDelay = 0;

        // 主循环
        while ($this->running) {
            // 处理重连逻辑
            if (!$this->isConnected) {
                if ($reconnectDelay <= 0) {
                    $this->connect();
                    $reconnectDelay = $this->reconnectInterval;
                    $lastHeartbeatTime = time();
                } else {
                    $reconnectDelay--;
                    echo date('Y-m-d H:i:s') . " - 等待重连... ({$reconnectDelay}秒)\n";
                }
            } else {
                // 监听消息
                $this->listenMessages();
                // 检查心跳
                $this->checkHeartbeat($lastHeartbeatTime);
            }

            // 主循环延迟（避免CPU占用过高）
            usleep(100000); // 100毫秒
        }

        // 清理资源
        if ($this->client) {
            $this->client->close();
        }
        echo date('Y-m-d H:i:s') . " - 客户端已停止\n";
    }

    /**
     * 析构函数
     */
    public function __destruct()
    {
        if ($this->client) {
            try {
                $this->client->close();
            } catch (Exception $e) {
                // 忽略
            }
        }
    }
}

// 主程序入口
if (php_sapi_name() !== 'cli') {
    die("该脚本仅支持CLI模式运行！");
}

// 配置（替换为你的实际API Key）
$apiKey = 'yourApikey';
$business = 'crypto';

// 创建并启动客户端
$client = new CryptoWebsocketClient($apiKey, $business);
$client->start();
```

{% endtab %}

{% tab title="Javascript" %}

```javascript
/**
 * 加密货币行情WebSocket客户端（JS通用版）
 * 支持浏览器/Node.js环境（Node.js需确保版本≥10.0.0）
 */
class CryptoWebsocketClient {
  /**
   * 构造函数
   * @param {string} apiKey - 认证API Key
   * @param {string} business - 业务类型：stock/crypto/common
   */
  constructor(apiKey, business = 'crypto') {
    // 配置项
    this.apiKey = apiKey;
    this.business = business;
    this.wsUrl = `wss://data.infoway.io/ws?business=${encodeURIComponent(business)}&apikey=${encodeURIComponent(apiKey)}`;
    this.reconnectInterval = 10000; // 重连间隔（10秒，单位ms）
    this.heartbeatInterval = 30000; // 心跳间隔（30秒，单位ms）

    // 运行状态
    this.ws = null; // WebSocket实例
    this.isConnected = false; // 连接状态
    this.heartbeatTimer = null; // 心跳定时器
    this.reconnectTimer = null; // 重连定时器
    this.subscribeSent = false; // 订阅请求是否已发送

    // 绑定方法上下文（避免回调中this丢失）
    this.onOpen = this.onOpen.bind(this);
    this.onMessage = this.onMessage.bind(this);
    this.onClose = this.onClose.bind(this);
    this.onError = this.onError.bind(this);
    this.sendHeartbeat = this.sendHeartbeat.bind(this);
  }

  /**
   * 生成唯一Trace ID（模拟UUID）
   * @returns {string}
   */
  generateTraceId() {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
      const r = Math.random() * 16 | 0;
      const v = c === 'x' ? r : (r & 0x3 | 0x8);
      return v.toString(16);
    });
  }

  /**
   * 建立WebSocket连接
   */
  connect() {
    // 清理旧连接
    if (this.ws) {
      try {
        this.ws.close(1000, 'Reconnecting');
      } catch (e) {
        console.error('关闭旧连接失败:', e);
      }
      this.ws = null;
    }

    try {
      // 创建新连接
      this.ws = new WebSocket(this.wsUrl);
      // 注册事件回调
      this.ws.onopen = this.onOpen;
      this.ws.onmessage = this.onMessage;
      this.ws.onclose = this.onClose;
      this.ws.onerror = this.onError;
      console.log(`[${new Date().toLocaleString()}] 正在连接WebSocket: ${this.wsUrl}`);
    } catch (e) {
      this.isConnected = false;
      console.error(`[${new Date().toLocaleString()}] 连接失败:`, e);
      this.scheduleReconnect();
    }
  }

  /**
   * 连接成功回调
   */
  onOpen() {
    this.isConnected = true;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] WebSocket连接成功`);

    // 清理重连定时器
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 发送订阅请求
    this.sendAllSubscribeRequests();

    // 启动心跳
    this.startHeartbeat();
  }

  /**
   * 发送所有订阅请求
   */
  sendAllSubscribeRequests() {
    if (this.subscribeSent || !this.isConnected) return;
    this.subscribeSent = true;

    // 1. 订阅实时成交明细（协议号10000）
    this.sendTradeSubscribe();
    
    // 间隔5秒发送盘口订阅
    setTimeout(() => {
      if (this.isConnected) {
        this.sendDepthSubscribe();
      }
    }, 5000);

    // 间隔10秒发送K线订阅（5+5）
    setTimeout(() => {
      if (this.isConnected) {
        this.sendKlineSubscribe();
      }
    }, 10000);
  }

  /**
   * 发送成交明细订阅请求
   */
  sendTradeSubscribe() {
    const msg = {
      code: 10000,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送盘口数据订阅请求
   */
  sendDepthSubscribe() {
    const msg = {
      code: 10003,
      trace: this.generateTraceId(),
      data: { codes: 'BTCUSDT' }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送K线数据订阅请求
   */
  sendKlineSubscribe() {
    const msg = {
      code: 10006,
      trace: this.generateTraceId(),
      data: {
        arr: [{ type: 1, codes: 'BTCUSDT' }] // 1分钟K线
      }
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送心跳包
   */
  sendHeartbeat() {
    const msg = {
      code: 10010,
      trace: this.generateTraceId()
    };
    this.sendJsonMessage(msg);
  }

  /**
   * 发送JSON格式消息
   * @param {object} msg - 消息对象
   */
  sendJsonMessage(msg) {
    if (!this.isConnected || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
      console.warn(`[${new Date().toLocaleString()}] 连接未就绪，跳过消息发送`);
      return;
    }

    try {
      const jsonStr = JSON.stringify(msg);
      this.ws.send(jsonStr);
      console.log(`[${new Date().toLocaleString()}] 发送消息:`, jsonStr);
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 发送消息失败:`, e);
      this.isConnected = false;
      this.scheduleReconnect();
    }
  }

  /**
   * 接收消息回调
   * @param {MessageEvent} event - 消息事件
   */
  onMessage(event) {
    const message = event.data;
    console.log(`[${new Date().toLocaleString()}] 收到消息:`, message);
    this.handleReceivedMessage(message);
  }

  /**
   * 处理接收到的消息
   * @param {string} message - 原始消息字符串
   */
  handleReceivedMessage(message) {
    try {
      const msg = JSON.parse(message);
      const code = msg.code || 0;
      const trace = msg.trace || '';
      const data = msg.data || {};

      switch (code) {
        case 10000:
          console.log(`[${new Date().toLocaleString()}] 处理成交明细 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10003:
          console.log(`[${new Date().toLocaleString()}] 处理盘口数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10006:
          console.log(`[${new Date().toLocaleString()}] 处理K线数据 [trace=${trace}]:`, JSON.stringify(data));
          break;
        case 10010:
          console.log(`[${new Date().toLocaleString()}] 收到心跳响应 [trace=${trace}]`);
          break;
        default:
          console.warn(`[${new Date().toLocaleString()}] 未知协议号 [code=${code}]:`, message);
      }
    } catch (e) {
      console.error(`[${new Date().toLocaleString()}] 处理消息失败:`, e);
    }
  }

  /**
   * 连接关闭回调
   * @param {CloseEvent} event - 关闭事件
   */
  onClose(event) {
    this.isConnected = false;
    this.subscribeSent = false;
    console.log(`[${new Date().toLocaleString()}] 连接关闭: 代码=${event.code}, 原因=${event.reason}`);
    
    // 清理心跳定时器
    this.stopHeartbeat();
    
    // 触发重连
    this.scheduleReconnect();
  }

  /**
   * 错误回调
   * @param {Event} error - 错误事件
   */
  onError(error) {
    console.error(`[${new Date().toLocaleString()}] WebSocket错误:`, error);
    this.isConnected = false;
    this.scheduleReconnect();
  }

  /**
   * 启动心跳任务
   */
  startHeartbeat() {
    // 清理旧定时器
    this.stopHeartbeat();
    
    // 立即发送一次心跳，然后定时发送
    this.sendHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.isConnected) {
        this.sendHeartbeat();
      }
    }, this.heartbeatInterval);
  }

  /**
   * 停止心跳任务
   */
  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  /**
   * 调度重连
   */
  scheduleReconnect() {
    if (this.reconnectTimer) return;
    
    console.log(`[${new Date().toLocaleString()}] ${this.reconnectInterval/1000}秒后尝试重连...`);
    this.reconnectTimer = setTimeout(() => {
      this.reconnectTimer = null;
      this.connect();
    }, this.reconnectInterval);
  }

  /**
   * 启动客户端
   */
  start() {
    console.log(`[${new Date().toLocaleString()}] 启动WebSocket客户端...`);
    this.connect();

    // Node.js环境下监听退出信号
    if (typeof process !== 'undefined' && process.on) {
      process.on('SIGINT', () => {
        console.log(`\n[${new Date().toLocaleString()}] 收到退出信号，关闭客户端...`);
        this.stop();
        process.exit(0);
      });
    }
  }

  /**
   * 停止客户端
   */
  stop() {
    this.isConnected = false;
    this.subscribeSent = false;
    
    // 清理定时器
    this.stopHeartbeat();
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }

    // 关闭连接
    if (this.ws) {
      this.ws.close(1000, 'Client stopped');
      this.ws = null;
    }

    console.log(`[${new Date().toLocaleString()}] 客户端已停止`);
  }
}

// ==================== 运行示例 ====================
// 浏览器环境：直接在控制台执行以下代码
// Node.js环境：直接运行该脚本

// 替换为你的实际API Key
const API_KEY = 'yourApikey';
const BUSINESS = 'crypto';

// 创建并启动客户端
const client = new CryptoWebsocketClient(API_KEY, BUSINESS);
client.start();

// 如需停止客户端，执行：client.stop();
```

{% endtab %}
{% endtabs %}
