Source code for fastr.execution.networkchunker

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

This module contains the NetworkChunker class and its default implementation
the DefaultNetworkChunker

from abc import abstractmethod
from collections import deque
import fastr
import fastr.exceptions as exceptions

[docs]class NetworkChunker(object): """ The base class for NetworkChunkers. A Network chunker is a class that takes a Network and produces a list of chunks that can each be analyzed and executed in one go. """
[docs] @abstractmethod def chunck_network(self, network): """ Create a list of Network chunks that can be pre-analyzed completely. Each chunk needs to be executed before the next can be analyzed and executed. :param network: Network to split into chunks :return: list containing chunks """ pass
[docs]class DefaultNetworkChunker(NetworkChunker): """ The default implementation of the NetworkChunker. It tries to create as large as possible chunks so the execution blocks as little as possible. """
[docs] def __init__(self): self.chunks = [] self.node_status = {} self.pool = set() self.processed_nodes = set() self.used_nodes = set()
[docs] def chunck_network(self, network): """ Create a list of Network chunks that can be pre-analyzed completely. Each chunk needs to be executed before the next can be analyzed and executed. The returned chunks are (at the moment) in the format of a tuple (start, nodes) which are both tuples. The tuple contain the nodes where to start execution (should ready if previous chunks are done) and all nodes of the chunk respectively. :param network: Network to split into chunks :return: tuple containing chunks """ self._set_network_used_nodes(network) fastr.log.debug('START Pool: {}'.format(self.used_nodes)) fastr.log.debug('SOURCELIST: {}'.format(network.sourcelist)) fastr.log.debug('CONSTANTLIST: {}'.format(network.constantlist)) candidates = deque() for source in network.sourcelist.values(): if source in self.used_nodes: candidates.append(source) for constant in network.constantlist.values(): if constant in self.used_nodes: candidates.append(constant) fastr.log.debug('START candidates: {}'.format([ for x in candidates])) chunks = [] self.pool = set(self.used_nodes) while len(self.used_nodes) > 0: chunk_start = [x for x in candidates] new_chunk = [] new_candidates = deque() new_pool = set() while len(candidates) > 0: node = candidates.popleft() fastr.log.debug('Considering NodeRun {}'.format( if node in candidates: fastr.log.error('NODE HAS ENTERED CANDIDATES TWICE! {}'.format(node)) if node not in self.pool: fastr.log.debug('Used nodes: {}\nPool: {}\nCandidates: {}\nNew pool: {}' '\nNew candidates: {}'.format([ for x in self.used_nodes], [ for x in self.pool], [ for x in candidates], [ for x in new_pool], [ for x in new_candidates])) message = 'Encountered previously visited NodeRun {}!'.format( fastr.log.error(message) raise exceptions.FastrStateError(message) # If this NodeRun definitely cannot be executed, move to pool for next chunks if not self._node_is_candidate(node): fastr.log.debug('Moving {} to next chunk (not a candidate)'.format( new_candidates.append(node) continue # If this NodeRun is blocked by earlier nodes in the Chunk, move it to pool for next chunks if self._node_is_blocked(node): fastr.log.debug('Moving {} to next chunk (is blocked)'.format( new_candidates.append(node) continue # Change node status and append to working chunk fastr.log.debug('Adding {} to chunk'.format( self.used_nodes.remove(node) self.node_status[] = 'visited' new_chunk.append(node) fastr.log.debug('Processing listeners for {}'.format( # Recurse into following Nodes for listener in node.listeners: lnode = fastr.log.debug('Considering listener {}'.format( if lnode not in self.used_nodes: fastr.log.debug('Ignoring {}'.format( continue if not self._node_is_blocked(lnode): if lnode not in candidates: fastr.log.debug('Adding {} to candidates' ' (blocking {})'.format(, self._node_is_blocked(lnode))) candidates.append(lnode) else: fastr.log.debug('Listener {} already in candidates'.format( else: if lnode not in new_candidates: fastr.log.debug('Queueing {} ({})'.format(, self._node_is_blocked(lnode))) if lnode in candidates: candidates.remove(lnode) new_candidates.append(lnode) # Add the newly created chunk to the list chunk_start = tuple(x for x in chunk_start if x in new_chunk) fastr.log.debug('Adding chunk {} with start {}'.format([ for x in new_chunk], [ for x in chunk_start])) if len(new_chunk) == 0 and candidates == new_candidates: raise exceptions.FastrStateError('Network chunker does not converge! It appears there is a bug!') chunks.append((chunk_start, tuple(new_chunk))) candidates = new_candidates self.processed_nodes.update(new_chunk) self.pool = self.pool - self.processed_nodes # Remove processed nodes from pool fastr.log.debug('Start new chuck with candidates {} and pool {}'.format([ for x in candidates], [ for x in self.pool])) # After chunking return a tuple of tuples, to avoid tempering later on return tuple(chunks)
def _node_is_candidate(self, node): """ Check if the NodeRun is considered a candidate :param node: NodeRun to check :return: flag indicating the NodeRun is a candidate """ for node in node.get_sourced_nodes(): if node not in self.processed_nodes and node not in self.pool: return False return True @staticmethod def _node_is_analyzable(node): """ Check if it is possible to analyze a NodeRun :param node: NodeRun to check :return: flag indicating the NodeRun is analyzable """ for snode in node.get_sourced_nodes(): if snode.blocking: return False return True def _node_is_blocked(self, target): """ Check if a NodeRun is blocked :param target: NodeRun to check :return:flag indicating the NodeRun is blocked by a blocking NodeRun """ for node in target.get_sourced_nodes(): if node in self.pool and (node.blocking or self._node_is_blocked(node)): fastr.log.debug('NodeRun {} is the cause of the blocked node {} (pool: {})'.format(,, [ for x in self.pool])) return True return False def _set_network_used_nodes(self, network): """ Create a list of used Nodes from the network :param network: Network to analyze """ self.used_nodes.clear() self.node_status.clear() for sink in network.sinklist.values(): self.used_nodes.add(sink) if not self._node_is_analyzable(sink): self.node_status[] = 'blocked' else: self.node_status[] = 'unvisited' self._network_walker(sink) def _network_walker(self, node): """ Recursive backwards search through the network to find all Nodes required for the Sinks. :param node: NodeRun to start search from """ # loop over inputs of the node for input_ in node.inputs.itervalues(): # loop over the subinputs of every input for parentnode in input_.get_sourced_nodes(): # if the node connected to the input is not the source node then add to execute list and walk further. #print('Adding parent {}'.format(parentnode.fullid)) self.used_nodes.add(parentnode) if parentnode.blocking: self.node_status[] = 'blocking' elif not self._node_is_analyzable(parentnode): self.node_status[] = 'blocked' else: self.node_status[] = 'unvisited' # Recurse down into the network self._network_walker(parentnode)