Skip to content

Commit fe354b8

Browse files
committed
ENH: Avoid loading result from file when writing reports
Minimize the access to the ``result`` property when writing pre/post-execution reports. This modification should particularly preempt nipy#3009 (comment)
1 parent 0470641 commit fe354b8

File tree

2 files changed

+46
-54
lines changed

2 files changed

+46
-54
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .utils import (
3838
_parameterization_dir, save_hashfile as _save_hashfile, load_resultfile as
3939
_load_resultfile, save_resultfile as _save_resultfile, nodelist_runner as
40-
_node_runner, strip_temp as _strip_temp, write_report,
40+
_node_runner, strip_temp as _strip_temp, write_node_report,
4141
clean_working_directory, merge_dict, evaluate_connect_function)
4242
from .base import EngineBase
4343

@@ -464,8 +464,7 @@ def run(self, updatehash=False):
464464

465465
# Store runtime-hashfile, pre-execution report, the node and the inputs set.
466466
_save_hashfile(hashfile_unfinished, self._hashed_inputs)
467-
write_report(
468-
self, report_type='preexec', is_mapnode=isinstance(self, MapNode))
467+
write_node_report(self, is_mapnode=isinstance(self, MapNode))
469468
savepkl(op.join(outdir, '_node.pklz'), self)
470469
savepkl(op.join(outdir, '_inputs.pklz'), self.inputs.get_traitsfree())
471470

@@ -484,8 +483,7 @@ def run(self, updatehash=False):
484483
# Tear-up after success
485484
shutil.move(hashfile_unfinished,
486485
hashfile_unfinished.replace('_unfinished', ''))
487-
write_report(
488-
self, report_type='postexec', is_mapnode=isinstance(self, MapNode))
486+
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
489487
logger.info('[Node] Finished "%s".', self.fullname)
490488
return result
491489

@@ -1196,7 +1194,7 @@ def get_subnodes(self):
11961194
"""Generate subnodes of a mapnode and write pre-execution report"""
11971195
self._get_inputs()
11981196
self._check_iterfield()
1199-
write_report(self, report_type='preexec', is_mapnode=True)
1197+
write_node_report(self, result=None, is_mapnode=True)
12001198
return [node for _, node in self._make_nodes()]
12011199

12021200
def num_subnodes(self):

nipype/pipeline/engine/utils.py

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -115,68 +115,60 @@ def nodelist_runner(nodes, updatehash=False, stop_first=False):
115115
yield i, result, err
116116

117117

118-
def write_report(node, report_type=None, is_mapnode=False):
119-
"""Write a report file for a node"""
118+
def write_node_report(node, result=None, is_mapnode=False):
119+
"""Write a report file for a node."""
120120
if not str2bool(node.config['execution']['create_report']):
121121
return
122122

123-
if report_type not in ['preexec', 'postexec']:
124-
logger.warning('[Node] Unknown report type "%s".', report_type)
125-
return
126-
127123
cwd = node.output_dir()
128-
report_dir = os.path.join(cwd, '_report')
129-
report_file = os.path.join(report_dir, 'report.rst')
130-
makedirs(report_dir, exist_ok=True)
131-
132-
logger.debug('[Node] Writing %s-exec report to "%s"', report_type[:-4],
133-
report_file)
134-
if report_type.startswith('pre'):
135-
lines = [
136-
write_rst_header('Node: %s' % get_print_name(node), level=0),
137-
write_rst_list(
138-
['Hierarchy : %s' % node.fullname,
139-
'Exec ID : %s' % node._id]),
140-
write_rst_header('Original Inputs', level=1),
141-
write_rst_dict(node.inputs.trait_get()),
142-
]
143-
with open(report_file, 'wt') as fp:
144-
fp.write('\n'.join(lines))
145-
return
124+
report_file = Path(cwd) / '_report' / 'report.rst'
125+
report_file.parent.mkdir(exist_ok=True)
146126

147127
lines = [
128+
write_rst_header('Node: %s' % get_print_name(node), level=0),
129+
write_rst_list(
130+
['Hierarchy : %s' % node.fullname,
131+
'Exec ID : %s' % node._id]),
132+
write_rst_header('Original Inputs', level=1),
133+
write_rst_dict(node.inputs.trait_get()),
134+
]
135+
136+
if result is None:
137+
logger.debug('[Node] Writing pre-exec report to "%s"', report_file)
138+
report_file.write_text('\n'.join(lines))
139+
return
140+
141+
logger.debug('[Node] Writing post-exec report to "%s"', report_file)
142+
lines += [
148143
write_rst_header('Execution Inputs', level=1),
149144
write_rst_dict(node.inputs.trait_get()),
145+
write_rst_header('Execution Outputs', level=1)
150146
]
151147

152-
result = node.result # Locally cache result
153148
outputs = result.outputs
154-
155149
if outputs is None:
156-
with open(report_file, 'at') as fp:
157-
fp.write('\n'.join(lines))
150+
lines += ['None']
151+
report_file.write_text('\n'.join(lines))
158152
return
159153

160-
lines.append(write_rst_header('Execution Outputs', level=1))
161-
162154
if isinstance(outputs, Bunch):
163155
lines.append(write_rst_dict(outputs.dictcopy()))
164156
elif outputs:
165157
lines.append(write_rst_dict(outputs.trait_get()))
158+
else:
159+
lines += ['Outputs object was empty.']
166160

167161
if is_mapnode:
168162
lines.append(write_rst_header('Subnode reports', level=1))
169163
nitems = len(ensure_list(getattr(node.inputs, node.iterfield[0])))
170164
subnode_report_files = []
171165
for i in range(nitems):
172-
nodecwd = os.path.join(cwd, 'mapflow', '_%s%d' % (node.name, i),
173-
'_report', 'report.rst')
174-
subnode_report_files.append('subnode %d : %s' % (i, nodecwd))
166+
subnode_file = Path(cwd) / 'mapflow' / (
167+
'_%s%d' % (node.name, i)) / '_report' / 'report.rst'
168+
subnode_report_files.append('subnode %d : %s' % (i, subnode_file))
175169

176170
lines.append(write_rst_list(subnode_report_files))
177-
178-
with open(report_file, 'at') as fp:
179-
fp.write('\n'.join(lines))
171+
report_file.write_text('\n'.join(lines))
180172
return
181173

182174
lines.append(write_rst_header('Runtime info', level=1))
@@ -188,15 +180,9 @@ def write_report(node, report_type=None, is_mapnode=False):
188180
'prev_wd': getattr(result.runtime, 'prevcwd', '<not-set>'),
189181
}
190182

191-
if hasattr(result.runtime, 'cmdline'):
192-
rst_dict['command'] = result.runtime.cmdline
193-
194-
# Try and insert memory/threads usage if available
195-
if hasattr(result.runtime, 'mem_peak_gb'):
196-
rst_dict['mem_peak_gb'] = result.runtime.mem_peak_gb
197-
198-
if hasattr(result.runtime, 'cpu_percent'):
199-
rst_dict['cpu_percent'] = result.runtime.cpu_percent
183+
for prop in ('cmdline', 'mem_peak_gb', 'cpu_percent'):
184+
if hasattr(result.runtime, prop):
185+
rst_dict[prop] = getattr(result.runtime, prop)
200186

201187
lines.append(write_rst_dict(rst_dict))
202188

@@ -224,9 +210,17 @@ def write_report(node, report_type=None, is_mapnode=False):
224210
write_rst_dict(result.runtime.environ),
225211
]
226212

227-
with open(report_file, 'at') as fp:
228-
fp.write('\n'.join(lines))
229-
return
213+
report_file.write_text('\n'.join(lines))
214+
215+
216+
def write_report(node, report_type=None, is_mapnode=False):
217+
"""Write a report file for a node - DEPRECATED"""
218+
if report_type not in ('preexec', 'postexec'):
219+
logger.warning('[Node] Unknown report type "%s".', report_type)
220+
return
221+
222+
write_node_report(node, is_mapnode=is_mapnode,
223+
result=node.result if report_type == 'postexec' else None)
230224

231225

232226
def save_resultfile(result, cwd, name, rebase=None):

0 commit comments

Comments
 (0)