Skip to main content

Integrating Elixir with Rust for Advanced WebSocket Message Decoding

As developers, we often face scenarios where we need to push the boundaries of performance and efficiency. One such case is decoding complex WebSocket messages in real-time financial applications. In this blog post, we'll explore how to leverage Rust's performance within an Elixir application to decode WebSocket messages from Zerodha's Kite Connect API.

Why Integrate Elixir with Rust?

Elixir is known for its concurrency and fault-tolerance, making it an excellent choice for building scalable applications. However, Rust offers unmatched performance and memory safety, making it ideal for CPU-intensive tasks like decoding binary WebSocket messages. By integrating Rust with Elixir, we can achieve the best of both worlds.

The Challenge: Decoding Kite Connect WebSocket Messages

Zerodha's Kite Connect API provides market data via WebSocket in binary format. These messages need to be decoded efficiently to be useful. While Elixir is powerful, decoding binary data is an area where Rust shines.

Setup and Environment

To get started, ensure you have Rust and Elixir installed on your system. We'll use the Rustler library to bridge Elixir and Rust.

Setting Up Rustler: Initialize a new Rustler project:


mix new stock_price_fetcher --module StockPriceFetcher
cd stock_price_fetcher
mix rustler.new quote_decoder


Configure Your Mix Project: Update mix.exs to include the Rustler crate:


def project do
[
app: :stock_price_fetcher,
version: "0.1.0",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
deps: deps(),
rustler_crates: [
quote_decoder: [
path: "native/quote_decoder",
mode: (if Mix.env() == :prod, do: :release, else: :debug)
]
]
]
end

defp deps do
[
{:rustler, "~> 0.22"}
]
end


Rust Setup: In native/quote_decoder/Cargo.toml, add dependencies:


[dependencies]
rustler = "0.22.0"
byteorder = "1.4.3"


Rust Code Explanation

Now let's delve into the Rust code that performs the decoding.

1. Initial Setup and Struct Definitions

use rustler::{Encoder, Env, Term, NifResult, Binary};
use byteorder::{BigEndian, ReadBytesExt};
use std::io::Cursor;

rustler::atoms! {
ok,
error
}

#[derive(rustler::NifStruct)]
#[module = "StockPriceFetcher.QuoteDecoder.WebSocketMessage"]
struct WebSocketMessage {
tradable: bool,
mode: String,
instrument_token: i32,
last_price: f64,
last_quantity: Option<f64>,
average_price: Option<f64>,
volume: Option<f64>,
buy_quantity: Option<f64>,
sell_quantity: Option<f64>,
ohlc: Option<OHLC>,
change: Option<f64>,
last_trade_time: Option<f64>,
oi: Option<f64>,
oi_day_high: Option<f64>,
oi_day_low: Option<f64>,
timestamp: Option<f64>,
sell_depths: Option<Vec<Depth>>,
buy_depths: Option<Vec<Depth>>,
}

#[derive(rustler::NifStruct)]
#[module = "StockPriceFetcher.QuoteDecoder.OHLC"]
struct OHLC {
open: f64,
high: f64,
low: f64,
close: f64,
}

#[derive(rustler::NifStruct)]
#[module = "StockPriceFetcher.QuoteDecoder.Depth"]
struct Depth {
quantity: f64,
price: f64,
orders: f64,
}



Explanation:
  • Imports: We import necessary crates for working with Rustler and byteorder for reading binary data.
  • Atoms: Define ok and error atoms used in NIF functions.
  • Structs: Define WebSocketMessage, OHLC, and Depth structs, which represent the decoded WebSocket messages.
2. NIF Function Definition

#[rustler::nif]
fn decode_websocket_message<'a>(env: Env<'a>, binary: Binary<'a>) -> NifResult<Term<'a>> {
let mut cursor = Cursor::new(binary.as_slice());

// Check if there is enough data to read the number of packets
if cursor.get_ref().len() < 2 {
return Err(rustler::Error::RaiseAtom("failed_to_read_number_of_packets"));
}
let number_of_packets = cursor.read_i16::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom("failed_to_read_number_of_packets"))?;
println!("Number of packets: {}", number_of_packets);
let mut tick_data: Vec<WebSocketMessage> = Vec::new();

for _ in 0..number_of_packets {
// Check if there is enough data to read the packet length
if cursor.get_ref().len() < (cursor.position() as usize + 2) {
return Err(rustler::Error::RaiseAtom("failed_to_read_packet_length"));
}

let packet_length = cursor.read_i16::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom("failed_to_read_packet_length"))?;
println!("Packet length: {}", packet_length);

let instrument_token = cursor.read_i32::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom("failed_to_read_instrument_token"))?;
let segment = instrument_token & 0xFF;
let divisor = if segment == 3 { 10000000.0 } else { 100.0 };
let tradable = segment != 9;

let message = match packet_length {
8 => {
println!("Decoding LTP packet");
decode_ltp(&mut cursor, tradable, instrument_token, divisor)?
},
28 | 32 => {
println!("Decoding Quote Full packet");
decode_quote_full(&mut cursor, packet_length, tradable, instrument_token, divisor)?
},
44 | 184 => {
println!("Decoding Full packet");
decode_full(&mut cursor, packet_length, tradable, instrument_token, divisor)?
},
_ => return Err(rustler::Error::RaiseAtom("undefined_packet_length")),
};

tick_data.push(message);
}

Ok((ok(), tick_data).encode(env))
}


Explanation:
  • Cursor Initialization: Initialize a cursor to read binary data.
  • Number of Packets: Read the number of packets from the cursor.
  • Loop Through Packets: Loop through each packet, read the packet length, and decode the packet based on its length.
  • Return Data: Return the decoded messages as an encoded term to Elixir.

3. Decoding Functions

Let's dive into the specific decoding functions, optimized and refactored using helper functions.


fn read_i32(cursor: &mut Cursor<&[u8]>, error_msg: &'static str) -> Result<f64, rustler::Error> {
cursor.read_i32::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom(error_msg)).map(|val| val as f64)
}

fn read_i16(cursor: &mut Cursor<&[u8]>, error_msg: &'static str) -> Result<f64, rustler::Error> {
cursor.read_i16::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom(error_msg)).map(|val| val as f64)
}

Explanation:
  • Helper Functions: read_i32 and read_i16 functions simplify repeated reading operations and error handling, converting read values to f64.

Decoding LTP Packet


fn decode_ltp(cursor: &mut Cursor<&[u8]>, tradable: bool, instrument_token: i32, divisor: f64) -> Result<WebSocketMessage, rustler::Error> {
println!("In decode_ltp");
let last_price = read_i32(cursor, "failed_to_read_last_price")? / divisor;
Ok(WebSocketMessage {
tradable,
mode: "ltp".to_string(),
instrument_token,
last_price,
last_quantity: None,
average_price: None,
volume: None,
buy_quantity: None,
sell_quantity: None,
ohlc: None,
change: None,
last_trade_time: None,
oi: None,
oi_day_high: None,
oi_day_low: None,
timestamp: None,
sell_depths: None,
buy_depths: None,
})
}



Explanation:
  • Read Last Price: Use the read_i32 helper function to read the last traded price.
  • Create WebSocketMessage: Create a WebSocketMessage struct with the decoded LTP data.

Decoding Quote/Full Packet


fn decode_quote_full(cursor: &mut Cursor<&[u8]>, packet_length: i16, tradable: bool, instrument_token: i32, divisor: f64) -> Result<WebSocketMessage, rustler::Error> {
println!("In decode_quote");
let last_price = read_i32(cursor, "failed_to_read_last_price")? / divisor;
let ohlc = OHLC {
high: read_i32(cursor, "failed_to_read_high_price")? / divisor,
low: read_i32(cursor, "failed_to_read_low_price")? / divisor,
open: read_i32(cursor, "failed_to_read_open_price")? / divisor,
close: read_i32(cursor, "failed_to_read_close_price")? / divisor,
};
let change = if ohlc.close != 0.0 {
Some((last_price - ohlc.close) * 100.0 / ohlc.close)
} else {
None
};
let mut message = WebSocketMessage {
tradable,
mode: if packet_length == 28 { "quote".to_string() } else { "full".to_string() },
instrument_token,
last_price,
last_quantity: None,
average_price: None,
volume: None,
buy_quantity: None,
sell_quantity: None,
ohlc: Some(ohlc),
change,
last_trade_time: None,
oi: None,
oi_day_high: None,
oi_day_low: None,
timestamp: None,
sell_depths: None,
buy_depths: None,
};

if packet_length == 32 {
message.timestamp = Some(read_i32(cursor, "failed_to_read_timestamp")? / divisor);
}

Ok(message)
}


Explanation:
  • Read Prices: Use the read_i32 helper function to read various prices (last, high, low, open, close).
  • Calculate Change: Calculate the percentage change in price.
  • Create WebSocketMessage: Create a WebSocketMessage struct with the decoded data.

Decoding Full Packet with Market Depth


fn decode_full(cursor: &mut Cursor<&[u8]>, packet_length: i16, tradable: bool, instrument_token: i32, divisor: f64) -> Result<WebSocketMessage, rustler::Error> {
println!("In decode full");
let last_price = read_i32(cursor, "failed_to_read_last_price")? / divisor;
let last_quantity = read_i32(cursor, "failed_to_read_last_quantity")?;
let average_price = read_i32(cursor, "failed_to_read_average_price")? / divisor;
let volume = read_i32(cursor, "failed_to_read_volume")?;
let buy_quantity = read_i32(cursor, "failed_to_read_buy_quantity")?;
let sell_quantity = read_i32(cursor, "failed_to_read_sell_quantity")?;
let ohlc = OHLC {
open: read_i32(cursor, "failed_to_read_open_price")? / divisor,
high: read_i32(cursor, "failed_to_read_high_price")? / divisor,
low: read_i32(cursor, "failed_to_read_low_price")? / divisor,
close: read_i32(cursor, "failed_to_read_close_price")? / divisor,
};
let change = if ohlc.close != 0.0 {
Some((last_price - ohlc.close) * 100.0 / ohlc.close)
} else {
None
};

let mut message = WebSocketMessage {
tradable,
mode: if packet_length == 44 { "quote".to_string() } else { "full".to_string() },
instrument_token,
last_price,
last_quantity: Some(last_quantity),
average_price: Some(average_price),
volume: Some(volume),
buy_quantity: Some(buy_quantity),
sell_quantity: Some(sell_quantity),
ohlc: Some(ohlc),
change,
last_trade_time: None,
oi: None,
oi_day_high: None,
oi_day_low: None,
timestamp: None,
sell_depths: None,
buy_depths: None,
};

if packet_length == 184 {
message.last_trade_time = Some(read_i32(cursor, "failed_to_read_last_trade_time")?);
message.oi = Some(read_i32(cursor, "failed_to_read_oi")?);
message.oi_day_high = Some(read_i32(cursor, "failed_to_read_oi_day_high")?);
message.oi_day_low = Some(read_i32(cursor, "failed_to_read_oi_day_low")?);
message.timestamp = Some(read_i32(cursor, "failed_to_read_timestamp")?);

let mut buy_depth_data: Vec<Depth> = Vec::with_capacity(5);
let mut sell_depth_data: Vec<Depth> = Vec::with_capacity(5);
for index in 0..10 {
let depth_data = Depth {
quantity: read_i32(cursor, "failed_to_read_depth_quantity")?,
price: read_i32(cursor, "failed_to_read_depth_price")? / divisor,
orders: read_i16(cursor, "failed_to_read_depth_orders")?,
};

if index < 5 {
buy_depth_data.push(depth_data);
} else {
sell_depth_data.push(depth_data);
}

// Skip 2 bytes padding
cursor.read_i16::<BigEndian>().map_err(|_| rustler::Error::RaiseAtom("failed_to_skip_padding"))?;
}
message.sell_depths = Some(sell_depth_data);
message.buy_depths = Some(buy_depth_data);
}

Ok(message)
}

rustler::init!("Elixir.StockPriceFetcher.QuoteDecoder", [decode_websocket_message]);


Explanation:
  • Read Additional Fields: In addition to the fields read in the quote/full packet, we also read the last trade time, open interest (OI), and market depth data.
  • Market Depth Data: For full packets with market depth, we read 10 depth entries (5 buy and 5 sell) and store them in the buy_depths and sell_depths fields.
  • Return Decoded Message: Return the fully decoded WebSocketMessage struct.

Understanding the Rust Code

Let's break down the key parts of our Rust code:

  1. Rustler Integration:

    • We define Rustler structs to match the Elixir structs for WebSocketMessage, OHLC, and Depth.
    • The decode_websocket_message function is our NIF that Elixir will call to decode binary WebSocket messages.
  2. Reading Binary Data:

    • We use the byteorder crate to read the binary data in big-endian format.
    • We start by reading the number of packets and then iterate over each packet, reading its length and data.
  3. Decoding Packets:

    • Depending on the packet length, we call different functions (decode_ltp, decode_quote_full, decode_full) to decode the packet.
    • Each function reads the relevant fields from the binary data and constructs a WebSocketMessage struct.
  4. Error Handling:

    • We handle errors by returning Rustler atoms, which are then propagated back to Elixir.

Benefits of Using Rust for Decoding

  • Performance: Rust's low-level control over memory and performance optimizations make it ideal for computationally intensive tasks like binary decoding.
  • Safety: Rust's ownership model ensures memory safety, preventing common bugs like buffer overflows.
  • Integration: Rustler makes it easy to integrate Rust with Elixir, allowing us to leverage Rust's strengths while benefiting from Elixir's concurrency and fault tolerance.

Compiling the Rust NIF:


cd native/kiteconnect_websocket_message
cargo build --release

Integrating Rust NIF in Elixir

Now, let’s integrate the Rust NIF with our Elixir application.

Elixir Code for WebSocket Connection

In your Elixir project, create a module for handling the WebSocket connection and decoding messages using the Rust NIF:


defmodule StockPriceFetcher.TickerWebSocket do
use WebSockex
require Logger

@api_key "your_api_key"
@access_token "your_access_token"
@url "wss://ws.kite.trade?api_key=#{@api_key}&access_token=#{@access_token}"

@subscribe_msg Jason.encode!(%{"a" => "subscribe", "mode" => "full", "v" => [408065, 884737, 415745, 416513]})

def start_link() do
{:ok, pid} = WebSockex.start_link(@url, __MODULE__, :fake_state, name: __MODULE__)
WebSockex.send_frame(pid, {:text, @subscribe_msg})
{:ok, pid}
end

def handle_connect(_conn, state) do
Logger.info("Connected to WebSocket.")
{:ok, state}
end

def handle_frame({:text, msg}, state) do
Logger.info("Received Text Message - #{msg}")
case Jason.decode(msg) do
{:ok, %{"type" => "error", "data" => error_message}} ->
Logger.error("Error from server: #{error_message}")
_other ->
Logger.debug("Received Message: #{msg}")
end
{:ok, state}
end

def handle_frame({:binary, binary}, state) do
Logger.debug("Received Binary Message - #{inspect(binary, binaries: :as_binaries)}")
case StockPriceFetcher.QuoteDecoder.decode(binary) do
{:ok, decoded} ->
Logger.info("Decoded Message: #{inspect(decoded)}")
_ ->
Logger.error("Decoding Error: error")
end
{:ok, state}
rescue
e -> Logger.error("Error processing frame: #{inspect(e)}")
{:error, e}
end
end


defmodule StockPriceFetcher.QuoteDecoder do
use Rustler, otp_app: :play, crate: "quote_decoder"

@spec decode(binary()) :: {:ok, map()} | {:error, String.t()}
def decode(binary), do: decode_websocket_message(binary)

# When NIF is not loaded, this will be called
defp decode_websocket_message(_binary), do: :erlang.nif_error(:nif_not_loaded)
end

This Elixir module handles the WebSocket connection, subscribes to market data, and processes incoming messages using the Rust NIF for decoding.

Conclusion

By combining Elixir and Rust, we've created a robust and efficient system for decoding complex WebSocket messages. This approach leverages the best of both worlds: Rust's performance and safety, and Elixir's concurrency and ease of use. Whether you're dealing with financial data, real-time analytics, or any other data-intensive application, this combination can provide the performance and reliability you need.


Comments

Popular posts from this blog

Handling Massive Concurrency with Elixir and OTP: Advanced Practical Example

For advanced Elixir developers, handling massive concurrency involves not just understanding the basics of GenServers, Tasks, and Supervisors, but also effectively utilizing more complex patterns and strategies to optimize performance and ensure fault tolerance. In this blog post, we'll dive deeper into advanced techniques and provide a practical example involving a distributed, fault-tolerant system. Practical Example: Distributed Web Crawler We'll build a distributed web crawler that can handle massive concurrency by distributing tasks across multiple nodes, dynamically supervising crawling processes, and implementing rate limiting to control the crawling rate. In this example, we will build a distributed web crawler that simulates handling massive concurrency and backpressure. To achieve this, we will: Generate 100 unique API URLs that will be processed by our system. Create an API within the application that simulates slow responses using :timer.sleep to introduce artificia...

Building a Real-Time Collaborative Editing Application with Phoenix LiveView and Presence

In this blog post, we will walk through the process of building a real-time collaborative document editing application using Phoenix LiveView. We will cover everything from setting up the project, creating the user interface, implementing real-time updates, and handling user presence. By the end of this tutorial, you'll have a fully functional collaborative editor that allows multiple users to edit a document simultaneously, with real-time updates and presence tracking. User Flow  Before diving into the code, let's outline the user flow and wireframes for our application. This will help us understand the overall structure and functionality we aim to achieve. Landing Page: The user is greeted with a landing page that prompts them to enter their name. Upon entering their name and clicking "Submit", they are redirected to the document list page. Document List Page: The user sees a list of available documents. Each document title is a clickable link that takes the user to...

How Phoenix Live View Works and what are its advantage

If we take a look at the Phoenix live view Docs this statement actually comes directly from it Phoenix live view is a library that enables rich real-time user experiences all from server rendered HTML and you might be thinking that this is quite an ambitious claim is that even possible what does actually mean to you what this actually means is you don't have to immediately reach for some sort of front-end library or framework to have rich user experiences what it actually means is that you can truly write some rich interactive user interfaces using the tools that you're already familiar with in the elixir ecosystem without needing to write JavaScript you might need to write JavaScript but definitely a lot less so but don't get me wrong when I make this statement you still might want client side JavaScript it still does have its use cases and you shouldn't discard it completely but one of the benefits is that we can lessen the technical burden of developers by not making...