-
`Surfer.Oban.RabbitClient` (producer & consumer)
-
`Surfer.Oban.AsyncWorker` (behaviour)
---
### `Surfer.Oban.AsyncWorker`
- behaviour implemented by specific "async workers"
-
builds a job that will be executed asynchronously (published & awaited)
-
executing jobs are moved to another queue (`async_work_awaits`) not to block other jobs
---
### `Surfer.Oban.RabbitClient`
- producer & consumer module
-
publishes work requests to service queues with `correlation_id` containing job's id to identify the source job when results come
-
handles responses from **all** services by sending `reply_to: :async_work_results`
-
thus, **must only do two things**:
-
update the job (parse, validate & save the results in the db); must be done here, because the instance processing the job might be different from the processing results message; the job process might also _not_ be running at all (it's orphaned after node goes down while processing and it takes a while to "rescue" it)
-
notify its process about completion; if it's running it should be stopped immediately
-
any other processing of the results must be done in **separate** workers
---
### `Surfer.Oban.RabbitClient`
```elixir
def schedule_work(queue, payload, job_id) do
publish(exchange(), queue, payload, reply_to: :async_work_results, correlation_id: job_id)
end
def handle_deliver(_consumer, %{payload: payload, meta: meta}) do
AsyncWorker.finish_job(meta.correlation_id, payload)
end
```
---
### `Surfer.Oban.AsyncWorker`
```elixir[1-15|17-25|27-37|39-61|46-61]
defmacro __using__(opts) do
quote do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def process({queue: :async_work_awaits} = job) do
AsyncWorker.await_work_finished(job)
end
@impl Oban.Pro.Worker
def process(job) do
AsyncWorker.schedule_work(job)
end
end
end
def schedule_work(job) do
worker_module = get_job_worker_module(job)
job
|> worker_module.prepare_payload()
|> worker_module.schedule_work(job)
put_job_in_awaits_queue(job)
end
def await_work_finished(job_id) do
:ok = listen(oban(), [:async_workers])
receive do
{:notification, :async_workers, %{"work_completed" => ^job_id}} ->
:ok
{:notification, :async_workers, %{"work_failed" => ^job_id, "error" => error}} ->
{:error, error}
end
end
def finish_job(job_id, payload) do
job = load_job(job_id)
worker_module = get_job_worker_module(job)
results =
payload
|> worker_module.parse_results()
|> worker_module.validate_results()
|> case do
{:ok, results} -> %{results: encode64(results)}
{:error, error} -> %{work_error: error}
end
job
|> store_results(results)
|> case do
%{meta: %{results: _results}} ->
notify(oban(), :async_workers, %{work_completed: job.id})
%{meta: %{work_error: error}} ->
notify(oban(), :async_workers, %{work_failed: job.id, error: error})
end
end
```
---
### `Surfer.CrawlUrlWorker`
```elixir[1-2|4-7|9-12|14-15|17-18|20-21]
alias Surfer.Oban.AsyncWorker
use AsyncWorker, queue: :crawls
@impl AsyncWorker
def prepare_payload(job) do
prepare_rabbit_message(job)
end
@impl AsyncWorker
def schedule_work(payload, job) do
RabbitClient.schedule_work(:crawl_requests, payload, job.id)
end
@impl AsyncWorker
def parse_results(raw_data), do: Msgpax.unpack!(raw_data)
@impl AsyncWorker
def validate_results(%{error_message: error_message}), do: {:error, error_message}
@impl AsyncWorker
def validate_results(data), do: {:ok, data}
```
---

---

---

---
### Problem
```elixir[6]
# Surfer.InitializeAuditQueryWorkflow
def run(query) do
# ...
|> add_step(&fetch_serp/1, deps: [:crawl_audited_page])
|> add_step(&crawl_top_10_pages_from_serp/1, deps: [:fetch_serp])
|> add_step(&analyze_crawled_pages/1, deps: [:crawl_top_10_pages_from_serp])
# ...
end
```
`crawl_top_10_pages_from_serp` step must add new crawl jobs to the workflow basing on the `fetch_serp` step results
---
### Solution
> Sometimes all jobs aren't known when the workflow is created. In that case, you can add more jobs with optional
dependency checking using append_workflow/2.
Inside the `crawl_top_10_pages_from_serp` step, we make a custom `prepend` operation, which adds new crawl jobs as dependencies of this step itself.
Thanks to that, we avoid "append waterfalls" in respective workers (e.g. scrape -> append crawls -> append next steps) and have a beautiful workflow definition where all crucial steps are visible.
---
#### `Surfer.CrawlScrapePagesWorker`
```elixir[1-2|4-10|12-21]
alias Surfer.Oban.WorkflowWorker
use WorkflowWorker
@impl Oban.Pro.Worker
def process(job) do
scrape = load_scrape(job)
crawl_jobs = build_crawl_jobs(scrape)
prepend(job, :crawl_pages, crawl_jobs)
end
@impl WorkflowWorker
def process_prepend(job, :crawl_pages) do
crawl_jobs = load_prepend(job, :crawl_pages)
if enough_successfully_crawled_pages?(crawl_jobs) do
:ok
else
{:cancel, :not_enough_successfully_crawled_pages}
end
end
```
Prepending mechanism is implemented in `Surfer.Oban.WorkflowWorker`, but we won't be going into details on it in this presentation
---
### Profit
- We still use RabbitMQ, so we can scale easily
-
Whole workflow is defined in a single file, so it's super clear what steps are needed to be performed in what order
-
We have Surfer-module-agnostic workers, e.g. `CrawlUrlWorker`, that can be plugged into any workflow that requires crawling
-
We have Oban Web dashboard where we can monitor the whole process
-
We get error handling with retries and exponential backoff for free
---
### Bonus
- Synchronous, RPC-like calls thanks to `Oban.Pro.Relay` plugin
-
It's now easy to send requests to staging/production machines from dev environment by setting `Surfer.Oban.AsyncWorker` results queue, e.g. `async_work_results/tuna-dev`
---

---

---
### Not-so-great
- Quite a lot custom code and macros 🙈
-
Oban's dependency resolution mechanism isn't great
-
it basically performs periodic deps checks for all awaiting jobs and by defaul it blocks the queue
-
There's no way to see an overview of a single workflow in Oban Web (yet)
-
Additional load on the database
- at Surfer we are runnnig a couple million jobs daily without any hiccups so far
---
### Room for improvement
- We didn't yet remove the db entities that store intermediate steps results, and we could, as these can be safely
stored in Oban jobs
-
In theory, it'd be possible to ditch RabbitMQ completely, but this would require us to implement custom Oban (ergo
Elixir) facade apps for each of our services
---
### That's it, thanks!
#### This deck 👇🏻