Skip to content

Parallelism of some nodes across processes. #821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
oesteban opened this issue Nov 10, 2017 · 20 comments
Closed

Parallelism of some nodes across processes. #821

oesteban opened this issue Nov 10, 2017 · 20 comments
Labels

Comments

@oesteban
Copy link
Member

When running subjects in parallel, certain nodes as fsdir will be run with the same directory as base_dir and introduce races.

It'd be nice to have a locking mechanism for these nodes.

FileNotFoundError: [Errno 2] No such file or directory: '/scratch/users/oesteban/fmriprep-phase1/work/ds000007/fmriprep_wf/fsdir/_0x6f97333768f8175f647591004cc0cd21_unfinished.json' -> '/scratch/users/oesteban/fmriprep-phase1/work/ds000007/fmriprep_wf/fsdir/_0x6f97333768f8175f647591004cc0cd21.json'
@oesteban oesteban added bug impact: low Estimated low impact task optimization labels Nov 10, 2017
@oesteban
Copy link
Member Author

Or, add parameters to the name, so base_dir is not shared

@effigies
Copy link
Member

I wonder if this would be better in nipype, where a running node locks its working directory with, e.g. fasteners. By the time we get to code we control, nipype has already decided that it hasn't already been run. While we may be able to figure something out here, the time to lock seems to be at the point of that decision.

@oesteban
Copy link
Member Author

Yes, that would be awesome. WDYT @satra?

@effigies
Copy link
Member

Here's a quick implementation of a DirectoryBasedLock. It will only work on filesystems that explicitly emulate local filesystem atomic semantics.

Since nipype can't provide guarantees with any lock, because the contexts can vary too widely, we could make it optional, and have users provide a lock that has the following protocol:

class LockDir(object):
    def __init__(self, outdir):
        self.outdir = outdir
        self.__enter__ = self.acquire
        self.__exit__ = self.release

    def acquire(self, ...):
        raise NotImplementedError

    def release(self):
        raise NotImplementedError

Then a node that needs to be protected could be instantiated:

node = pe.Node(Interface(), name='node', lock=DirectoryBasedLock)

The default lock would be a no-op, and it would be up to users to provide a lock that provides sufficient mutex guarantees for their environment.

@effigies
Copy link
Member

Come to think of it, with a protocol like that, we could set up a little TCP service that does nothing but sit and listen for requests with directory names, and let callers know if they get the lock. So depending on filesystem properties would become entirely optional.

@oesteban
Copy link
Member Author

Another node suffering from this: mri_coreg. It renders as:

Node: fmriprep_wf.single_subject_MSC08_wf.func_preproc_ses_func10_task_memorywords_wf.bold_reg_wf.bbreg_wf.mri_coreg
Working directory: /scratch/users/oesteban/fmriprep-phase1/work/ds000224/fmriprep_wf/single_subject_MSC08_wf/func_preproc_ses_func10_task_memorywords_wf/bold_reg_wf/bbreg_wf/mri_coreg

Node inputs:

args = <undefined>
brute_force_limit = <undefined>
brute_force_samples = <undefined>
compress_report = auto
conform_reference = <undefined>
dof = 9
environ = {'SUBJECTS_DIR': '/opt/freesurfer/subjects'}
ftol = 0.0001
generate_report = True
ignore_exception = False
initial_rotation = <undefined>
initial_scale = <undefined>
initial_shear = <undefined>
initial_translation = <undefined>
linmintol = 0.01
max_iters = <undefined>
no_brute_force = <undefined>
no_coord_dithering = <undefined>
no_cras0 = <undefined>
no_intensity_dithering = <undefined>
no_smooth = <undefined>
num_threads = 8
out_lta_file = True
out_params_file = <undefined>
out_reg_file = <undefined>
out_report = report.svg
ref_fwhm = <undefined>
reference_file = <undefined>
reference_mask = <undefined>
saturation_threshold = <undefined>
sep = [4]
source_file = /scratch/users/oesteban/fmriprep-phase1/work/ds000224/fmriprep_wf/single_subject_MSC08_wf/func_preproc_ses_func10_task_memorywords_wf/nonlinear_sdc_wf/skullstrip_bold_wf/apply_mask/ants_susceptibility_Warped_masked.nii.gz
source_mask = <undefined>
source_oob = <undefined>
subject_id = sub-MSC08
subjects_dir = /oak/stanford/groups/russpold/data/openfmri/derivatives/ds000224/freesurfer
terminal_output = <undefined>

Traceback (most recent call last):
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/plugins/multiproc.py", line 51, in run_node
    result['result'] = node.run(updatehash=updatehash)
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/engine/nodes.py", line 407, in run
    self._run_interface()
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/engine/nodes.py", line 515, in _run_interface
    old_cwd = os.getcwd()
FileNotFoundError: [Errno 2] No such file or directory

@effigies
Copy link
Member

Why would this node be run on two different nodes? That looks like a directory got deleted out from under a running process.

@oesteban
Copy link
Member Author

Yep, you are right. I got to that conclusion but missed updating this issue. This FileNotFoundError also happened to the update_metadata node (just in case it lights up some ideas)

@oesteban
Copy link
Member Author

This problem is back:

Node: fmriprep_wf.single_subject_13_wf.func_preproc_task_dis_run_02_wf.bold_reg_wf.bbreg_wf.bbregister
Working directory: /scratch/users/oesteban/fmriprep-phase2/work/ds000212/fmriprep_wf/single_subject_13_wf/func_preproc_task_dis_run_02_wf/bold_reg_wf/bbreg_wf/bbregister

Node inputs:

args = <undefined>
compress_report = auto
contrast_type = t2
dof = 9
environ = {'SUBJECTS_DIR': '/opt/freesurfer/subjects'}
epi_mask = <undefined>
fsldof = <undefined>
generate_report = True
ignore_exception = False
init = <undefined>
init_cost_file = <undefined>
init_reg_file = /scratch/users/oesteban/fmriprep-phase2/work/ds000212/fmriprep_wf/single_subject_13_wf/func_preproc_task_dis_run_02_wf/bold_reg_wf/bbreg_wf/mri_coreg/registration.lta
intermediate_file = <undefined>
out_fsl_file = <undefined>
out_lta_file = True
out_reg_file = <undefined>
out_report = report.svg
reg_frame = <undefined>
reg_middle_frame = <undefined>
registered_file = True
source_file = /scratch/users/oesteban/fmriprep-phase2/work/ds000212/fmriprep_wf/single_subject_13_wf/func_preproc_task_dis_run_02_wf/nonlinear_sdc_wf/skullstrip_bold_wf/apply_mask/ants_susceptibility_Warped_masked.nii.gz
spm_nifti = <undefined>
subject_id = sub-13
subjects_dir = /oak/stanford/groups/russpold/data/openfmri/derivatives/ds000212/freesurfer
terminal_output = <undefined>

Traceback (most recent call last):
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/plugins/multiproc.py", line 51, in run_node
    result['result'] = node.run(updatehash=updatehash)
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/engine/nodes.py", line 407, in run
    self._run_interface()
  File "/usr/local/miniconda/lib/python3.6/site-packages/niworkflows/nipype/pipeline/engine/nodes.py", line 515, in _run_interface
    old_cwd = os.getcwd()
FileNotFoundError: [Errno 2] No such file or directory

I'll try to debug this, but may be necessary to escalate to nipype.

@effigies
Copy link
Member

So, are you assuming that one process is erasing the node out from under the other? Do you have two nodes running the same BOLD files at the same time?

@oesteban
Copy link
Member Author

Yes - I'm sure I only have one instance of fmriprep for a given subject. So these nodes should not be deleted while executing.

@effigies
Copy link
Member

effigies commented Dec 1, 2017

So this doesn't seem to be a parallelism of nodes across processes issue, does it? I agree it looks like a bug. Just not related to the fsdir issue.

@oesteban
Copy link
Member Author

oesteban commented Dec 1, 2017

Correct, I'll update the issue description and title - create a new one

@oesteban
Copy link
Member Author

oesteban commented Dec 1, 2017

Created, I think this is not an issue anymore. Let's keep an eye on Chris' PR to nipype.

@oesteban oesteban closed this as completed Dec 1, 2017
@effigies
Copy link
Member

effigies commented Dec 4, 2017

AFAIK this is still an issue. Even if nipy/nipype#2278 goes in, we'll still need to create a lock that works reasonably reliably and test it.

@effigies effigies reopened this Dec 4, 2017
@satra
Copy link

satra commented Dec 4, 2017

@effigies - for multiproc, one can easily create a local lock and portalocker, which we have would be fine. i'm mostly worried about locks across nodes. also different clusters with different filesystems enable different kinds of locking mechanisms.

@effigies
Copy link
Member

effigies commented Dec 4, 2017

Yes, this issue is specifically across nodes.

@stale
Copy link

stale bot commented Mar 12, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Mar 12, 2019
@stale stale bot closed this as completed Apr 1, 2019
@dmd
Copy link

dmd commented Sep 17, 2019

I'm getting what I believe is this issue, in 1.5.0:

$ cat /data/ddrucker/testing/derivatives/fmriprep/sub-acj/log/20190917-102450_234554e0-3572-4773-9070-09ebb07bea01/crash-20190917-103025-ddrucker-bids_info-5eccfd72-fb37-442c-82d0-d2127cde5582.txt
Node: fmriprep_wf.single_subject_acj_wf.bids_info
Working directory: /data/ddrucker/testing/fmriprep-work/fmriprep_wf/single_subject_acj_wf/bids_info

Node inputs:

bids_dir = /data/ddrucker/testing
bids_validate = False
in_file = /data/ddrucker/testing/sub-acj/anat/sub-acj_T1w.nii.gz

Traceback (most recent call last):
  File "/usr/local/miniconda/lib/python3.7/site-packages/nipype/pipeline/plugins/legacymultiproc.py", line 69, in run_node
    result['result'] = node.run(updatehash=updatehash)
  File "/usr/local/miniconda/lib/python3.7/site-packages/nipype/pipeline/engine/nodes.py", line 473, in run
    result = self._run_interface(execute=True)
  File "/usr/local/miniconda/lib/python3.7/site-packages/nipype/pipeline/engine/nodes.py", line 564, in _run_interface
    return self._run_command(execute)
  File "/usr/local/miniconda/lib/python3.7/site-packages/nipype/pipeline/engine/nodes.py", line 649, in _run_command
    result = self._interface.run(cwd=outdir)
  File "/usr/local/miniconda/lib/python3.7/site-packages/nipype/interfaces/base/core.py", line 376, in run
    runtime = self._run_interface(runtime)
  File "/usr/local/miniconda/lib/python3.7/site-packages/niworkflows/interfaces/bids.py", line 165, in _run_interface
    self.inputs.bids_validate)
  File "/usr/local/miniconda/lib/python3.7/site-packages/niworkflows/utils/bids.py", line 213, in _init_layout
    layout = BIDSLayout(str(bids_dir), validate=validate)
  File "/usr/local/miniconda/lib/python3.7/site-packages/bids/layout/layout.py", line 212, in __init__
    indexer.index_metadata()
  File "/usr/local/miniconda/lib/python3.7/site-packages/bids/layout/index.py", line 207, in index_metadata
    with open(bf.path, 'r') as handle:
FileNotFoundError: [Errno 2] No such file or directory: '/data/ddrucker/testing/fmriprep-work/fmriprep_wf/single_subject_acj_wf/func_preproc_task_cue_wf/bold_split/_0xca3dc0d25055795133385b837b4d35b0_unfinished.json'

What should I do?

@effigies
Copy link
Member

effigies commented Sep 17, 2019

@dmd I think this is a different but related issue. Could you open a new issue, please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants