Relayer worker architecture

The internal worker in the relayer is a poll-driven event loop, with a main thread polling all stored interfaces and spinning up secondary threads to handle transaction routing. All interfaces' internals are strictly screened off from the relayer.

    def poll_for_transactions(self):
        """
        Polls for transactions on all interfaces
        Updates task list with found events
        """
        for name, (chain_interface, contract_interface, evt_name, _) in self.dict_of_names_to_interfaces.items():
            prev_height = self.dict_of_names_to_blocks[name]
            curr_height = chain_interface.get_last_block()
            if prev_height is None:
                prev_height = curr_height - 1
            for block_num in range(prev_height + 1, curr_height + 1):
                transactions = chain_interface.get_transactions(contract_interface.address, height=block_num)
                for transaction in transactions:
                    tasks = contract_interface.parse_event_from_txn(evt_name, transaction)
                    for task in tasks:
                        task_id = task.task_data['task_id']
                    self.task_list.extend(tasks)
            self.dict_of_names_to_blocks[name] = curr_height

    def route_transaction(self, task: Task):
        """
        Given a Task, routes it where it's supposed to go
        Args:
            task: the Task to be routed
        """
        self.logger.info('Routing task {}'.format(task))
        if task.task_destination_network is None:
            self.logger.warning(f'Task {task} has no destination network, not routing')
            self.task_ids_to_statuses[task.task_data['task_id']] = 'Failed to route'
            return
        if task.task_destination_network not in self.dict_of_names_to_interfaces:
            self.logger.warning(f'Network {task.task_destination_network} is unknown, not routing')
            self.task_ids_to_statuses[task.task_data['task_id']] = 'Failed to route'
            return
        contract_for_txn = self.dict_of_names_to_interfaces[task.task_destination_network][1]
        function_name = self.dict_of_names_to_interfaces[task.task_destination_network][3]
        contract_for_txn.call_function(function_name, str(task))
        pass

    def task_list_handle(self):
        """
        Spins up threads to handle each task in the task list

        """

        def _thread_func():
            while len(self.task_list) > 0:
                task = self.task_list.pop()
                self.route_transaction(task)

        if len(self.task_threads) < 5 and len(self.task_list) > 0:
            thread = Thread(target=_thread_func)
            thread.start()
            self.task_threads.append(thread)
        self.task_threads = [thread_live for thread_live in self.task_threads if thread_live.is_alive()]

    def run(self):
        """
        Runs the central relayer event loop:
        poll for transactions,
        log them,
        handle transactions
        sleep

        """
        self.logger.info('Starting relayer')
        loops_run = 0
        while True:
            self.poll_for_transactions()
            self.task_list_handle()
            sleep(5)
        pass

Last updated