Overview
What is a ROH?
ROH is a distributed Python tasks manager.
It is possible to distribute python tasks across the network to different nodes
When to Use ROH?
To distribute loads
Let’s see some python code
The following snippet shows some internals.
the AMQP module
channel_.exchange_declare(exchange='test_exchange_1', durable=True,
exchange_type="fanout") # (1)
result = channel_.queue_declare(queue='my_queue', durable=True)
queue_name = result.method.queue
channel_.queue_bind(exchange="test_exchange_1", queue=queue_name,
routing_key="") # (2)
channel_.basic_qos(prefetch_count=1)
channel_.basic_consume(on_message, queue=queue_name,
no_ack=False) # (3)
channel_.start_consuming()
-
We define an exchange
-
We bind a queue
-
We send the ACK
Name |
Description |
|
The exchange to bind to |
|
The queue to bind to |
|
The routing key to bind with |
|
Other properties (construction arguments) for the binding |
|
Do not wait for the response |
|
A callback method taking one argument, the bound queue |
|
The ticket number |
Let’s see some erlang code
The following snippet shows some internals.
erlang module
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, % (1)
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({add_task, Task}, _From,
State = #state{running_workers = MRW, waiting_queue = QWQ, supervisor = Sup}) ->
case is_watermark_processes(MRW) of % (2)
true ->
QWQ2 = queue:in(Task, QWQ),
roh_console_log:info("Added in waiting list, current size: ~w", [queue:len(QWQ2)]),
{reply, ok, State#state{waiting_queue = QWQ2, global = State#state.global + 1}};
false -> MRW2 = execute_new_worker(Task, Sup, MRW),
{reply, ok, State#state{running_workers = MRW2, global = State#state.global + 1}} % (3)
end;
-
Define the type for the analyzer
-
check the watermark
-
execute the task