Module: pipeline.engine.utils

Utility routines for workflow graphs


nipype.pipeline.engine.utils.clean_working_directory(outputs, cwd, inputs, needed_outputs, config, files2keep=None, dirs2keep=None)

Removes all files not needed for further analysis from the directory

nipype.pipeline.engine.utils.count_iterables(iterables, synchronize=False)

Return the number of iterable expansion nodes.

If synchronize is True, then the count is the maximum number of iterables value lists. Otherwise, the count is the product of the iterables value list sizes.

nipype.pipeline.engine.utils.evaluate_connect_function(function_source, args, first_arg)
nipype.pipeline.engine.utils.expand_iterables(iterables, synchronize=False)
nipype.pipeline.engine.utils.export_graph(graph_in, base_dir=None, show=False, use_execgraph=False, show_connectinfo=False, dotfilename='graph.dot', format='png', simple_form=True)

Displays the graph layout of the pipeline

This function requires that pygraphviz and matplotlib are available on the system.

show : boolean
Indicate whether to generate pygraphviz output fromn
networkx. default [False]
use_execgraph : boolean
Indicates whether to use the specification graph or the
execution graph. default [False]
show_connectioninfo : boolean
Indicates whether to show the edge data on the graph. This
makes the graph rather cluttered. default [False]
nipype.pipeline.engine.utils.format_dot(dotfilename, format='png')

Dump a directed graph (Linux only; install via brew on OSX)

nipype.pipeline.engine.utils.format_node(node, format='python', include_config=False)

Format a node in a given output syntax.


Generates an expanded graph based on node parameterization

Parameterization is controlled using the iterables field of the pipeline elements. Thus if there are two nodes with iterables a=[1,2] and b=[3,4] this procedure will generate a graph with sub-graphs parameterized as (a=1,b=3), (a=1,b=4), (a=2,b=3) and (a=2,b=4).

nipype.pipeline.engine.utils.get_print_name(node, simple_form=True)

Get the name of the node

For example, a node containing an instance of interfaces.fsl.BET would be called nodename.BET.fsl

nipype.pipeline.engine.utils.load_resultfile(path, name)

Load InterfaceResult file from path

result : InterfaceResult structure
aggregate : boolean indicating whether node should aggregate_outputs
attribute error : boolean indicating whether there was some mismatch in

versions of traits used to store result and hence node needs to rerun

nipype.pipeline.engine.utils.merge_bundles(g1, g2)
nipype.pipeline.engine.utils.merge_dict(d1, d2, merge=<function <lambda> at 0x7fc7bef75d90>)

Merges two dictionaries, non-destructively, combining values on duplicate keys as defined by the optional merge function. The default behavior replaces the values in d1 with corresponding values in d2. (There is no other generally applicable merge strategy, but often you’ll have homogeneous types in your dicts, so specifying a merge technique can be valuable.)


>>> d1 = {'a': 1, 'c': 3, 'b': 2}
>>> d2 = merge_dict(d1, d1)
>>> len(d2)
>>> [d2[k] for k in ['a', 'b', 'c']]
[1, 2, 3]
>>> d3 = merge_dict(d1, d1, lambda x,y: x+y)
>>> len(d3)
>>> [d3[k] for k in ['a', 'b', 'c']]
[2, 4, 6]
nipype.pipeline.engine.utils.modify_paths(object, relative=True, basedir=None)

Convert paths in data structure to either full paths or relative paths

Supports combinations of lists, dicts, tuples, strs

relative : boolean indicating whether paths should be set relative to the

current directory

basedir : default os.getcwd()

what base directory to use as default

nipype.pipeline.engine.utils.nodelist_runner(nodes, updatehash=False, stop_first=False)

A generator that iterates and over a list of nodes and executes them.

nipype.pipeline.engine.utils.save_hashfile(hashfile, hashed_inputs)

Store a hashfile

nipype.pipeline.engine.utils.save_resultfile(result, cwd, name)

Save a result pklz file to cwd

nipype.pipeline.engine.utils.strip_temp(files, wd)

Remove temp from a list of file paths


Synchronize the given iterables in item-wise order.

Return: the {field: value} dictionary list


>>> from nipype.pipeline.engine.utils import synchronize_iterables
>>> iterables = dict(a=lambda: [1, 2], b=lambda: [3, 4])
>>> synced = synchronize_iterables(iterables)
>>> synced == [{'a': 1, 'b': 3}, {'a': 2, 'b': 4}]
>>> iterables = dict(a=lambda: [1, 2], b=lambda: [3], c=lambda: [4, 5, 6])
>>> synced = synchronize_iterables(iterables)
>>> synced == [{'a': 1, 'b': 3, 'c': 4}, {'a': 2, 'c': 5}, {'c': 6}]
nipype.pipeline.engine.utils.topological_sort(graph, depth_first=False)

Returns a depth first sorted order if depth_first is True

nipype.pipeline.engine.utils.walk(children, level=0, path=None, usename=True)

Generate all the full paths in a tree, as a dict.


>>> from nipype.pipeline.engine.utils import walk
>>> iterables = [('a', lambda: [1, 2]), ('b', lambda: [3, 4])]
>>> [val['a'] for val in walk(iterables)]
[1, 1, 2, 2]
>>> [val['b'] for val in walk(iterables)]
[3, 4, 3, 4]

Extract every file and directory from a python structure

nipype.pipeline.engine.utils.write_report(node, report_type=None, is_mapnode=False)

Write a report file for a node

nipype.pipeline.engine.utils.write_workflow_prov(graph, filename=None, format='all')

Write W3C PROV Model JSON file

nipype.pipeline.engine.utils.write_workflow_resources(graph, filename=None, append=None)

Generate a JSON file with profiling traces that can be loaded in a pandas DataFrame or processed with JavaScript like D3.js