Relayer worker architecture
The internal worker in the relayer is a poll-driven event loop, with a main thread polling all stored interfaces and spinning up secondary threads to handle transaction routing. All interfaces' internals are strictly screened off from the relayer.
def poll_for_transactions(self):
"""
Polls for transactions on all interfaces
Updates task list with found events
"""
for name, (chain_interface, contract_interface, evt_name, _) in self.dict_of_names_to_interfaces.items():
prev_height = self.dict_of_names_to_blocks[name]
curr_height = chain_interface.get_last_block()
if prev_height is None:
prev_height = curr_height - 1
for block_num in range(prev_height + 1, curr_height + 1):
transactions = chain_interface.get_transactions(contract_interface.address, height=block_num)
for transaction in transactions:
tasks = contract_interface.parse_event_from_txn(evt_name, transaction)
for task in tasks:
task_id = task.task_data['task_id']
self.task_list.extend(tasks)
self.dict_of_names_to_blocks[name] = curr_height
def route_transaction(self, task: Task):
"""
Given a Task, routes it where it's supposed to go
Args:
task: the Task to be routed
"""
self.logger.info('Routing task {}'.format(task))
if task.task_destination_network is None:
self.logger.warning(f'Task {task} has no destination network, not routing')
self.task_ids_to_statuses[task.task_data['task_id']] = 'Failed to route'
return
if task.task_destination_network not in self.dict_of_names_to_interfaces:
self.logger.warning(f'Network {task.task_destination_network} is unknown, not routing')
self.task_ids_to_statuses[task.task_data['task_id']] = 'Failed to route'
return
contract_for_txn = self.dict_of_names_to_interfaces[task.task_destination_network][1]
function_name = self.dict_of_names_to_interfaces[task.task_destination_network][3]
contract_for_txn.call_function(function_name, str(task))
pass
def task_list_handle(self):
"""
Spins up threads to handle each task in the task list
"""
def _thread_func():
while len(self.task_list) > 0:
task = self.task_list.pop()
self.route_transaction(task)
if len(self.task_threads) < 5 and len(self.task_list) > 0:
thread = Thread(target=_thread_func)
thread.start()
self.task_threads.append(thread)
self.task_threads = [thread_live for thread_live in self.task_threads if thread_live.is_alive()]
def run(self):
"""
Runs the central relayer event loop:
poll for transactions,
log them,
handle transactions
sleep
"""
self.logger.info('Starting relayer')
loops_run = 0
while True:
self.poll_for_transactions()
self.task_list_handle()
sleep(5)
pass
Last updated