Skip to main content

Handling Massive Concurrency with Elixir and OTP: Advanced Practical Example


For advanced Elixir developers, handling massive concurrency involves not just understanding the basics of GenServers, Tasks, and Supervisors, but also effectively utilizing more complex patterns and strategies to optimize performance and ensure fault tolerance. In this blog post, we'll dive deeper into advanced techniques and provide a practical example involving a distributed, fault-tolerant system.

Practical Example: Distributed Web Crawler
We'll build a distributed web crawler that can handle massive concurrency by distributing tasks across multiple nodes, dynamically supervising crawling processes, and implementing rate limiting to control the crawling rate.

In this example, we will build a distributed web crawler that simulates handling massive concurrency and backpressure. To achieve this, we will:
  1. Generate 100 unique API URLs that will be processed by our system.
  2. Create an API within the application that simulates slow responses using :timer.sleep to introduce artificial delay.
  3. Leverage GenStage for backpressure management to ensure the system can handle a large number of concurrent requests without being overwhelmed.
  4. Distribute tasks across multiple nodes to observe how the system manages concurrency and scalability.
By the end of this post, you'll understand how to handle massive concurrency using advanced Elixir and OTP techniques and how to simulate and manage backpressure in a distributed environment.

Step-by-Step Implementation

Step 1: Setting Up the Phoenix Project
First, create a new Phoenix project. We'll name our project distributed_crawler.

  1. mix phx.new distributed_crawler --no-ecto
  2. cd distributed_crawler

Add the necessary dependencies for clustering and GenStage in the mix.exs file:

  1.       {:gen_stage"~> 1.2"},
  2.       {:libcluster"~> 3.3"}


Run mix deps.get to install the dependencies.

Step 2: Generate Unique API URLs
Modify the Producer module to generate unique URLs that point to our API endpoint and ensure each URL is processed only once.

lib/distributed_crawler/producer.ex:


defmodule DistributedCrawler.Producer do
use GenStage
def start_link(init_arg) do
GenStage.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_) do
# Generate 100 unique URLs for our slow API endpoint
urls = for n <- 1..100 do
"http://localhost:4000/api/crawl?url=https://example.com/page#{n}"
end
{:producer, {urls, MapSet.new()}}
end
@impl true
def handle_demand(demand, {urls, processed}) do
{urls_to_send, remaining_urls} = Enum.split(urls, demand)
new_processed = MapSet.union(processed, MapSet.new(urls_to_send))
{:noreply, urls_to_send, {remaining_urls, new_processed}}
end
end

  • Initialization: The producer initializes with a list of URLs and an empty MapSet to track processed URLs.
  • Handle Demand: When handling demand, the producer splits the list of URLs based on the requested demand and updates the MapSet to ensure URLs are not produced again.
Step 3: Create a Phoenix Endpoint
Create a controller and route for your API endpoint to simulate slow responses:

lib/distributed_crawler_web/controllers/crawler_controller.ex:


defmodule DistributedCrawlerWeb.CrawlerController do
use DistributedCrawlerWeb, :controller

def slow_api(conn, %{"url" => url}) do
:timer.sleep(4000) #Simulate a slow API response
json(conn, %{message: "Processed #{url}"})
end
end


lib/distributed_crawler_web/router.ex:


scope "/api", DistributedCrawlerWeb do
pipe_through :api

get "/crawl", CrawlerController, :slow_api
end


Step 4: Modify the Crawler to Handle Unique API URLs
Ensure each Crawler GenServer uses these unique API URLs:

lib/distributed_crawler/crawler.ex:


defmodule DistributedCrawler.Crawler do
use GenServer
require Logger

def start_link(url) do
GenServer.start_link(__MODULE__, url, name: via_tuple(url))
end

defp via_tuple(url) do
{:via, Registry, {DistributedCrawler.Registry, url}}
end

@impl true
def init(url) do
schedule_work()
{:ok, %{url: url, visited: []}}
end

@impl true
def handle_info(:work, state) do
new_state = call_slow_api(state.url, state.visited)
schedule_work()
Logger.info("Processes: #{state.url}")
{:noreply, new_state}
end

defp schedule_work() do
Process.send_after(self(), :work, 1000)
end

defp call_slow_api(url, visited) do
#simulate calling the slow API endpoint
HTTPoison.get!(url)
new_visited = [url | visited]
%{url: url, visited: new_visited}
end

end


Step 5: Set Up Clustering
Configure clustering using libcluster to distribute tasks across multiple nodes:

config/config.exs:


config :libcluster,
topologies: [
example: [
strategy: Cluster.Strategy.Gossip,
config: [port: 45892]
]
]


Step 6: Add Consumer
To leverage GenStage's ability to handle concurrent processing, add consumer to the supervision tree.

lib/distributed_crawler/application.ex:


defmodule DistributedCrawler.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false

use Application

@impl true
def start(_type, _args) do
children = [
DistributedCrawlerWeb.Telemetry,
{DNSCluster, query: Application.get_env(:distributed_crawler, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: DistributedCrawler.PubSub},
# Start the Finch HTTP client for sending emails
{Finch, name: DistributedCrawler.Finch},
# Start a worker by calling: DistributedCrawler.Worker.start_link(arg)
# {DistributedCrawler.Worker, arg},
# Start to serve requests, typically the last entry
DistributedCrawlerWeb.Endpoint,
{Cluster.Supervisor, [Application.get_env(:libcluster, :topologies), [name: DistributedCrawler.ClusterSupervisor]]},
DistributedCrawler.DynamicSupervisor,
{Registry, keys: :unique, name: DistributedCrawler.Registry},
DistributedCrawler.Producer,
DistributedCrawler.Consumer
]

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: DistributedCrawler.Supervisor]
Supervisor.start_link(children, opts)
end

# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
@impl true
def config_change(changed, _new, removed) do
DistributedCrawlerWeb.Endpoint.config_change(changed, removed)
:ok
end
end


Step 7: Implement the Consumer
The consumer will handle the URLs produced by the producer. It will ensure each URL is processed only once, even with multiple consumers.

lib/distributed_crawler/consumer.ex:


defmodule DistributedCrawler.Consumer do
use GenStage

def start_link(init_arg) do
GenStage.start_link(__MODULE__, init_arg)
end

@impl true
def init(_) do
{:consumer, :ok, subscribe_to: [DistributedCrawler.Producer]}
end

@impl true
def handle_events(events, _from, state) do
for url <- events do
DistributedCrawler.DynamicSupervisor.start_crawler(url)
:timer.sleep(2000) # Introduce a 2-second delay to simulate slower processing
end
{:noreply, [], state}
end
end


Running the Application
Start the application with multiple consumers to observe how the system handles concurrent processing.

Start the First Node:


PORT=4000 iex --sname node1 -S mix phx.server


Start the Second Node:


PORT=4001 iex --sname node2 -S mix phx.server


Verify Clustering by running Node.list() in each terminal:

# Terminal 1
Node.list()

# Terminal 2
Node.list()

Observing Concurrent Processing
With multiple consumers, the workload will be distributed among them, allowing concurrent processing of events. You should see the logs showing different URLs being processed by different consumers.

This practical example of a distributed web crawler demonstrates handling massive concurrency through several advanced techniques and components. Here’s a detailed explanation:

1. Dynamic Supervision
Dynamic supervision allows us to dynamically start and stop crawling processes as needed, without predefining all the children in a static supervision tree.
  • DynamicSupervisor: Manages crawler processes dynamically. Each URL to be crawled is handled by a separate process, ensuring that each crawling task operates concurrently.
  • Concurrency: Each GenServer instance (crawler) runs independently, allowing multiple crawling tasks to occur simultaneously. This avoids blocking and enables high levels of concurrency.
2. GenStage for Rate Limiting and Flow Control
GenStage provides a mechanism for handling backpressure, which is crucial for controlling the rate at which URLs are processed and ensuring the system is not overwhelmed.
  • Producer and Consumer: GenStage is used to create a producer that generates URLs to be crawled and a consumer that processes these URLs.
  • Rate Limiting: The producer only sends as many URLs as the consumer can handle at a given time, effectively rate-limiting the crawling tasks and preventing overload.
  • Flow Control: GenStage allows fine-grained control over the flow of messages between stages, ensuring efficient resource utilization.
3. Clustered Nodes with libcluster
Distributing tasks across a cluster of nodes increases the system's capacity to handle massive concurrency by leveraging multiple machines.
  • Clustering: Using libcluster, the application runs on multiple nodes that automatically form a cluster. This allows the workload to be distributed across several nodes.
  • Scalability: By distributing the crawling tasks across a cluster, the system can handle a larger number of concurrent crawling processes than a single node could.
4. Partitioned GenServers
Distributing the workload across multiple GenServer instances ensures that no single process becomes a bottleneck.
  • Partitioning: Each GenServer handles a specific URL or set of URLs. This partitioning distributes the load evenly and allows the system to process many URLs concurrently.
  • Concurrency: Multiple GenServers running in parallel enable the system to process numerous URLs simultaneously, enhancing throughput and reducing latency.
Summary of Concurrency Handling
  1. Dynamic Process Management: Each crawling task is managed by its own GenServer, dynamically supervised, enabling independent and concurrent execution.
  2. Backpressure Management: GenStage controls the rate of URL processing, ensuring the system remains stable and efficient under high load.
  3. Distributed Processing: Clustering spreads the workload across multiple nodes, significantly increasing the capacity to handle concurrent tasks.
  4. Load Distribution: Partitioning tasks across multiple GenServers prevents bottlenecks and ensures smooth concurrent processing.
By introducing multiple consumers and ensuring unique URL processing in the producer, we can leverage GenStage's ability to handle concurrent processing, making it easier to scale the system horizontally. Each consumer processes events independently, distributing the workload and improving the system's overall throughput and resilience to load.

This setup demonstrates how GenStage can efficiently manage concurrency and backpressure in a real-world application, providing a robust and scalable solution for handling massive concurrency.

Comments

Popular posts from this blog

Building a Real-Time Collaborative Editing Application with Phoenix LiveView and Presence

In this blog post, we will walk through the process of building a real-time collaborative document editing application using Phoenix LiveView. We will cover everything from setting up the project, creating the user interface, implementing real-time updates, and handling user presence. By the end of this tutorial, you'll have a fully functional collaborative editor that allows multiple users to edit a document simultaneously, with real-time updates and presence tracking. User Flow  Before diving into the code, let's outline the user flow and wireframes for our application. This will help us understand the overall structure and functionality we aim to achieve. Landing Page: The user is greeted with a landing page that prompts them to enter their name. Upon entering their name and clicking "Submit", they are redirected to the document list page. Document List Page: The user sees a list of available documents. Each document title is a clickable link that takes the user to...

How Phoenix Live View Works and what are its advantage

If we take a look at the Phoenix live view Docs this statement actually comes directly from it Phoenix live view is a library that enables rich real-time user experiences all from server rendered HTML and you might be thinking that this is quite an ambitious claim is that even possible what does actually mean to you what this actually means is you don't have to immediately reach for some sort of front-end library or framework to have rich user experiences what it actually means is that you can truly write some rich interactive user interfaces using the tools that you're already familiar with in the elixir ecosystem without needing to write JavaScript you might need to write JavaScript but definitely a lot less so but don't get me wrong when I make this statement you still might want client side JavaScript it still does have its use cases and you shouldn't discard it completely but one of the benefits is that we can lessen the technical burden of developers by not making...