pipeline.engine.nodes

Module: pipeline.engine.nodes

Inheritance diagram for nipype.pipeline.engine.nodes:

Inheritance diagram of nipype.pipeline.engine.nodes

Defines functionality for pipelined execution of interfaces

The Node class provides core functionality for batch processing.

Classes

JoinNode

class nipype.pipeline.engine.nodes.JoinNode(interface, name, joinsource, joinfield=None, unique=False, **kwargs)

Bases: nipype.pipeline.engine.nodes.Node

Wraps interface objects that join inputs into a list.

Examples

>>> import nipype.pipeline.engine as pe
>>> from nipype import Node, JoinNode, Workflow
>>> from nipype.interfaces.utility import IdentityInterface
>>> from nipype.interfaces import (ants, dcm2nii, fsl)
>>> wf = Workflow(name='preprocess')
>>> inputspec = Node(IdentityInterface(fields=['image']),
...                     name='inputspec')
>>> inputspec.iterables = [('image',
...                        ['img1.nii', 'img2.nii', 'img3.nii'])]
>>> img2flt = Node(fsl.ImageMaths(out_data_type='float'),
...                   name='img2flt')
>>> wf.connect(inputspec, 'image', img2flt, 'in_file')
>>> average = JoinNode(ants.AverageImages(), joinsource='inputspec',
...                       joinfield='images', name='average')
>>> wf.connect(img2flt, 'out_file', average, 'images')
>>> realign = Node(fsl.FLIRT(), name='realign')
>>> wf.connect(img2flt, 'out_file', realign, 'in_file')
>>> wf.connect(average, 'output_average_image', realign, 'reference')
>>> strip = Node(fsl.BET(), name='strip')
>>> wf.connect(realign, 'out_file', strip, 'in_file')
Attributes:
fullname
inputs

The JoinNode inputs include the join field overrides.

interface

Return the underlying interface object

itername

Name for expanded iterable

joinsource
mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name) Clone an EngineBase object
get_output(parameter) Retrieve a particular output of the node
hash_exists([updatehash]) Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help() Print interface help
is_cached([rm_outdated]) Check if the interface has been run previously, and whether cached results are up-to-date.
output_dir() Return the location of the output directory for the node
run([updatehash]) Execute the node in its directory.
set_input(parameter, val) Set interface input value
update(**opts) Update inputs
load  
save  
__init__(interface, name, joinsource, joinfield=None, unique=False, **kwargs)
Parameters:
interface : interface object

node specific interface (fsl.Bet(), spm.Coregister())

name : alphanumeric string

node specific name

joinsource : node name

name of the join predecessor iterable node

joinfield : string or list of strings

name(s) of list input fields that will be aggregated. The default is all of the join node input fields.

unique : flag indicating whether to ignore duplicate input values
See Node docstring for additional keyword arguments.
clone(name)

Clone an EngineBase object

Parameters:
name : string (mandatory)

A clone of node or workflow must have a new name

fullname
get_output(parameter)

Retrieve a particular output of the node

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

inputs

The JoinNode inputs include the join field overrides.

interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

itername

Name for expanded iterable

joinfield = None

the fields to join

joinsource

the join predecessor iterable node

load(filename)
mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
output_dir()

Return the location of the output directory for the node

outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters:
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)
set_input(parameter, val)

Set interface input value

update(**opts)

Update inputs

MapNode

class nipype.pipeline.engine.nodes.MapNode(interface, iterfield, name, serial=False, nested=False, **kwargs)

Bases: nipype.pipeline.engine.nodes.Node

Wraps interface objects that need to be iterated on a list of inputs.

Examples

>>> from nipype import MapNode
>>> from nipype.interfaces import fsl
>>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign')
>>> realign.inputs.in_file = ['functional.nii',
...                           'functional2.nii',
...                           'functional3.nii']
>>> realign.run() 
Attributes:
fullname
inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Name for expanded iterable

mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name) Clone an EngineBase object
get_output(parameter) Retrieve a particular output of the node
get_subnodes() Generate subnodes of a mapnode and write pre-execution report
hash_exists([updatehash]) Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help() Print interface help
is_cached([rm_outdated]) Check if the interface has been run previously, and whether cached results are up-to-date.
num_subnodes() Get the number of subnodes to iterate in this MapNode
output_dir() Return the location of the output directory for the node
run([updatehash]) Execute the node in its directory.
set_input(parameter, val) Set interface input value or nodewrapper attribute Priority goes to interface.
update(**opts) Update inputs
load  
save  
__init__(interface, iterfield, name, serial=False, nested=False, **kwargs)
Parameters:
interface : interface object

node specific interface (fsl.Bet(), spm.Coregister())

iterfield : string or list of strings

name(s) of input fields that will receive a list of whatever kind of input they take. the node will be run separately for each value in these lists. for more than one input, the values are paired (i.e. it does not compute a combinatorial product).

name : alphanumeric string

node specific name

serial : boolean

flag to enforce executing the jobs of the mapnode in a serial manner rather than parallel

nested : boolean

support for nested lists. If set, the input list will be flattened before running and the nested list structure of the outputs will be resored.

See Node docstring for additional keyword arguments.
clone(name)

Clone an EngineBase object

Parameters:
name : string (mandatory)

A clone of node or workflow must have a new name

fullname
get_output(parameter)

Retrieve a particular output of the node

get_subnodes()

Generate subnodes of a mapnode and write pre-execution report

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

itername

Name for expanded iterable

load(filename)
mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
num_subnodes()

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters:
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)
set_input(parameter, val)

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

Node

class nipype.pipeline.engine.nodes.Node(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=0.2, **kwargs)

Bases: nipype.pipeline.engine.base.EngineBase

Wraps interface objects for use in pipeline

A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.

Examples

>>> from nipype import Node
>>> from nipype.interfaces import spm
>>> realign = Node(spm.Realign(), 'realign')
>>> realign.inputs.in_files = 'functional.nii'
>>> realign.inputs.register_to_mean = True
>>> realign.run() 
Attributes:
fullname
inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Name for expanded iterable

mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name) Clone an EngineBase object
get_output(parameter) Retrieve a particular output of the node
hash_exists([updatehash]) Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help() Print interface help
is_cached([rm_outdated]) Check if the interface has been run previously, and whether cached results are up-to-date.
output_dir() Return the location of the output directory for the node
run([updatehash]) Execute the node in its directory.
set_input(parameter, val) Set interface input value
update(**opts) Update inputs
load  
save  
__init__(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=0.2, **kwargs)
Parameters:
interface : interface object

node specific interface (fsl.Bet(), spm.Coregister())

name : alphanumeric string

node specific name

iterables : generator

Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples

node.iterables = ('frac',[0.5,0.6,0.7])
node.iterables = [('fwhm',[2,4]),('fieldx',[0.5,0.6,0.7])]

If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g.:

inputspec.iterables = ('images',['img1.nii', 'img2.nii']])
node.itersource = ('inputspec', ['frac'])
node.iterables = ('frac', {'img1.nii': [0.5, 0.6],
                           'img2.nii': [0.6, 0.7]})

If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g.:

node.synchronize = True
node.iterables = [('frac', 'threshold'),
                  [(0.5, True),
                   (0.6, False)]]
itersource: tuple

The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.

synchronize: boolean

Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.

overwrite : Boolean

Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed

needed_outputs : list of output_names

Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.

run_without_submitting : boolean

Run the node without submitting to a job engine or to a multiprocessing pool

clone(name)

Clone an EngineBase object

Parameters:
name : string (mandatory)

A clone of node or workflow must have a new name

fullname
get_output(parameter)

Retrieve a particular output of the node

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

itername

Name for expanded iterable

load(filename)
mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name
needed_outputs
output_dir()

Return the location of the output directory for the node

outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters:
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)
set_input(parameter, val)

Set interface input value

update(**opts)

Update inputs