Skip to content

ENH: Minimize the number of calls to _load_results when populating inputs #3075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 11, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 45 additions & 34 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
absolute_import)
from builtins import range, str, bytes, open

from collections import OrderedDict
from collections import OrderedDict, defaultdict

import os
import os.path as op
Expand Down Expand Up @@ -510,49 +510,60 @@ def _get_hashval(self):
return self._hashed_inputs, self._hashvalue

def _get_inputs(self):
"""Retrieve inputs from pointers to results file
"""
Retrieve inputs from pointers to results files.

This mechanism can be easily extended/replaced to retrieve data from
other data sources (e.g., XNAT, HTTP, etc.,.)
"""
if self._got_inputs:
if self._got_inputs: # Inputs cached
return

if not self.input_source: # No previous nodes
self._got_inputs = True
return

logger.debug('Setting node inputs')
prev_results = defaultdict(list)
for key, info in list(self.input_source.items()):
logger.debug('input: %s', key)
results_file = info[0]
logger.debug('results file: %s', results_file)
outputs = _load_resultfile(results_file).outputs
prev_results[info[0]].append((key, info[1]))

logger.debug(
'[Node] Setting %d connected inputs of node "%s" from %d previous nodes.',
len(self.input_source), self.name, len(prev_results))

for results_fname, connections in list(prev_results.items()):
outputs = None
try:
outputs = _load_resultfile(results_fname).outputs
except AttributeError as e:
logger.critical('%s', e)

if outputs is None:
raise RuntimeError("""\
Error populating the input "%s" of node "%s": the results file of the source node \
(%s) does not contain any outputs.""" % (key, self.name, results_file))
output_value = Undefined
if isinstance(info[1], tuple):
output_name = info[1][0]
value = getattr(outputs, output_name)
if isdefined(value):
output_value = evaluate_connect_function(
info[1][1], info[1][2], value)
else:
output_name = info[1]
Error populating the inputs of node "%s": the results file of the source node \
(%s) does not contain any outputs.""" % (self.name, results_fname))

for key, conn in connections:
output_value = Undefined
if isinstance(conn, tuple):
value = getattr(outputs, conn[0])
if isdefined(value):
output_value = evaluate_connect_function(
conn[1], conn[2], value)
else:
output_value = getattr(outputs, conn)

try:
output_value = outputs.trait_get()[output_name]
except AttributeError:
output_value = outputs.dictcopy()[output_name]
logger.debug('output: %s', output_name)
try:
self.set_input(key, deepcopy(output_value))
except traits.TraitError as e:
msg = (
e.args[0], '', 'Error setting node input:',
'Node: %s' % self.name, 'input: %s' % key,
'results_file: %s' % results_file,
'value: %s' % str(output_value),
)
e.args = ('\n'.join(msg), )
raise
self.set_input(key, deepcopy(output_value))
except traits.TraitError as e:
msg = (
e.args[0], '', 'Error setting node input:',
'Node: %s' % self.name, 'input: %s' % key,
'results_file: %s' % results_fname,
'value: %s' % str(output_value),
)
e.args = ('\n'.join(msg), )
raise

# Successfully set inputs
self._got_inputs = True
Expand Down