Code Snippet :
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/strand.hpp>
#include <boost/thread/thread.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <cstdlib>
#include <fstream>
#include <jsoncpp/json/json.h>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <sys/types.h>
#include <unistd.h>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
tcp::resolver resolver_;
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string message_text_;
public:
// Resolver and socket require an io_context
explicit
session(net::io_context& ioc)
: resolver_(net::make_strand(ioc))
, ws_(net::make_strand(ioc))
{
}
// Start the asynchronous operation
void
run(
char const* host,
char const* port,
Json::Value message)
{
// Save these for later
host_ = host;
message_text_ = Json_to_string(message);
// Look up the domain name
resolver_.async_resolve(
host,
port,
beast::bind_front_handler(
&session::on_resolve,
shared_from_this()));
}
void
on_resolve(
beast::error_code ec,
tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
beast::bind_front_handler(
&session::on_connect,
shared_from_this()));
}
void
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
if(ec)
return fail(ec, "connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING)
" websocket-client-async");
}));
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host_ = ':' std::to_string(ep.port());
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
beast::bind_front_handler(
&session::on_handshake,
shared_from_this()));
}
std::string Json_to_string(const Json::Value& json) {
Json::StreamWriterBuilder wbuilder;
wbuilder["indentation"] = ""; // Optional
return Json::writeString(wbuilder, json);
}
void
on_handshake(beast::error_code ec)
{
if(ec) {
return fail(ec, "handshake");
}
// Send the message
ws_.async_write(
net::buffer(message_text_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Read a message into our buffer
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "read");
ws_.async_read(buffer_,beast::bind_front_handler(
&session::on_read,shared_from_this()
));
}
void
on_close(beast::error_code ec)
{
if(ec)
return fail(ec, "close");
// If we get here then the connection is closed gracefully
// The make_printable() function helps print a ConstBufferSequence
std::cout << beast::make_printable(buffer_.data()) << std::endl;
}
};
//------------------------------------------------------------------------------
int main()
{
// auto const host = argv[1];
// auto const port = argv[1];
// auto const text = argv[1];
std::ifstream file("details.json");
Json::Value actualjson;
Json::Reader jsonreader;
jsonreader.parse(file,actualjson);
// The io_context is required for all I/O
net::io_context context;
// Launch the asynchronous operation
std::make_shared<session>(context)->run("stream.binance.com", "9443",
actualjson);
// Run the I/O service. The call will return when
// the socket is closed.
context.run();
return EXIT_SUCCESS;
}
my details.json file for subcribing market data streams :
{
"method": "SUBSCRIBE",
"params":
[
"btcusdt@aggTrade"
],
"id": 1
}
This is the binance API docs i am referring to : https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
I am trying to stream trade market streams over the websocket.
I don't know why the request is being rejected by the server, please help and advance thanks!
CodePudding user response:
You should
use SSL, usually with SNI
not append the port to the hostname for WS handshake (mildly surprising)
use a proper endpoint url, from the same docs:
The base endpoint is:
wss://stream.binance.com:9443
- Streams can be accessed either in a single raw stream or in a combined stream
- Raw streams are accessed at
/ws/<streamName>
- Combined streams are accessed at
/stream?streams=<streamName1>/<streamName2>/<streamName3>
- Combined stream events are wrapped as follows:
{"stream":"<streamName>","data":<rawPayload>}
I just guessed a stream name (wss://stream.binance.com:9443/ws/btcusdt
), added some code to print the received/sent messages:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <jsoncpp/json/json.h>
#include <sys/types.h>
#include <unistd.h>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = net::ssl;
using tcp = net::ip::tcp; // from <boost/asio/ip/tcp.hpp>
using Stream = websocket::stream<beast::ssl_stream<beast::tcp_stream>>;
using namespace std::chrono_literals;
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
tcp::resolver resolver_;
Stream ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string message_text_;
public:
// Resolver and socket require an io_context
explicit
session(net::io_context& ioc, ssl::context& ctx)
: resolver_(net::make_strand(ioc))
, ws_(net::make_strand(ioc), ctx)
{
}
// Start the asynchronous operation
void
run(
char const* host,
char const* port,
Json::Value message)
{
// Set SNI Hostname (many hosts need this to handshake successfully)
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host)) {
throw boost::system::system_error(beast::error_code(
::ERR_get_error(), net::error::get_ssl_category()));
}
// Save these for later
host_ = host;
message_text_ = Json_to_string(message);
// Look up the domain name
resolver_.async_resolve(
host,
port,
beast::bind_front_handler(
&session::on_resolve,
shared_from_this()));
}
void
on_resolve(
beast::error_code ec,
tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set SNI Hostname (many hosts need this to ssl handshake successfully)
if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(),
host_.c_str())) {
throw beast::system_error{beast::error_code(
::ERR_get_error(), net::error::get_ssl_category())};
}
// Set the timeout for the operation
get_lowest_layer(ws_).expires_after(30s);
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
beast::bind_front_handler(
&session::on_connect,
shared_from_this()));
}
void
on_connect(beast::error_code ec, [[maybe_unused]] tcp::resolver::results_type::endpoint_type ep)
{
if(ec)
return fail(ec, "connect");
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
//host_ = ':' std::to_string(ep.port());
// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::client,
beast::bind_front_handler(&session::on_ssl_handshake,
shared_from_this()));
}
void
on_ssl_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING)
" websocket-client-async");
}));
// Perform the websocket handshake
std::cout << "using host_: " << host_ << std::endl;
ws_.async_handshake(host_, "/ws/btcusdt",
beast::bind_front_handler(
&session::on_handshake,
shared_from_this()));
}
std::string Json_to_string(const Json::Value& json) {
Json::StreamWriterBuilder wbuilder;
wbuilder["indentation"] = ""; // Optional
return Json::writeString(wbuilder, json);
}
void
on_handshake(beast::error_code ec)
{
if(ec) {
return fail(ec, "handshake");
}
// Send the message
std::cout << "Sending " << message_text_ << std::endl;
ws_.async_write(
net::buffer(message_text_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Read a message into our buffer
ws_.async_read(
buffer_,
beast::bind_front_handler(
&session::on_read,
shared_from_this()));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "read");
std::cout << "Received: " << beast::buffers_to_string(buffer_.cdata())
<< std::endl;
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void
on_close(beast::error_code ec)
{
if(ec)
return fail(ec, "close");
// If we get here then the connection is closed gracefully
// The make_printable() function helps print a ConstBufferSequence
std::cout << beast::make_printable(buffer_.data()) << std::endl;
}
};
//------------------------------------------------------------------------------
int main()
{
// auto const host = argv[1];
// auto const port = argv[1];
// auto const text = argv[1];
std::ifstream file("details.json");
Json::Value actualjson;
Json::Reader jsonreader;
jsonreader.parse(file, actualjson);
// The io_context is required for all I/O
net::io_context ioc;
// The SSL context is required
ssl::context ctx{ssl::context::tlsv12_client};
// Verify the remote server's certificate
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
// Launch the asynchronous operation
std::make_shared<session>(ioc, ctx)->run("stream.binance.com", "9443",
actualjson);
// Run the I/O service. The call will return when
// the socket is closed.
ioc.run();
}
Now the output is
using host_: stream.binance.com
Sending {"id":1,"method":"SUBSCRIBE","params":["btcusdt@aggTrade"]}
Received: {"result":null,"id":1}
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
Received: {"result":null,"id":1}{"e":"aggTrade","E":1650214540406,"s":"BTCUSDT","a":1143057169,"p":"40330.30000000","q":"0.00910000","f":13276
etc...
Note: you will want to fix the code to not hardcode the /ws/btcusdt
path