Skip to content
This repository was archived by the owner on Feb 10, 2021. It is now read-only.

Commit bd76002

Browse files
committed
Add file based locks
1 parent aba745a commit bd76002

File tree

4 files changed

+80
-88
lines changed

4 files changed

+80
-88
lines changed

continuous_integration/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ ENV PATH /opt/conda/bin:$PATH
99

1010
# hdfs3 - python
1111
ENV LIBHDFS3_CONF /etc/hadoop/conf/hdfs-site.xml
12-
RUN conda install -y -q ipython pytest
12+
RUN conda install -y -q ipython pytest locket
1313
RUN conda install -y -q libhdfs3 -c conda-forge
1414
RUN conda create -y -n py3 python=3
15-
RUN conda install -y -n py3 ipython pytest
15+
RUN conda install -y -n py3 ipython pytest locket
1616
RUN conda install -y -n py3 libhdfs3 -c conda-forge
1717

1818
# Cloudera repositories

docs/source/limitations.rst

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,6 @@ Forked processes
66

77
The ``libhdfs3`` library may fail when an ``HDFileSystem`` is copied to a new
88
forked process. This happens in some cases when using ``hdfs3`` with
9-
``multiprocessing`` in Python 2. Common solutions include the following:
9+
``multiprocessing`` in Python 2.
1010

11-
* Use threads
12-
* Use Python 3 and a multiprocessing context using spawn with
13-
``multiprocessing.get_context(method='spawn')`` see `multiprocessing docs`_
14-
* Only instantiate ``HDFileSystem`` within the forked processes, do not start
15-
an ``HDFileSystem`` within the parent processes or do not use that
16-
``HDFileSystem`` within the child processes.
17-
* Use a file based lock. We recommend ``locket``::
18-
19-
$ pip install locket
20-
21-
.. code-block:: python
22-
23-
import locket
24-
25-
with locket.lock_file('.lock'):
26-
# do hdfs3 work
27-
28-
29-
.. _`multiprocessing docs`: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
11+
We get around this by using file-based locks, which slightly limit concurrency.

hdfs3/core.py

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
from .compatibility import FileNotFoundError, urlparse, ConnectionError
1717
from .utils import read_block
1818

19+
import locket
20+
21+
lock = locket.lock_file('.libhdfs3.lock')
1922

2023
logger = logging.getLogger(__name__)
2124

@@ -132,9 +135,6 @@ class HDFileSystem(object):
132135
133136
>>> hdfs = HDFileSystem(host='127.0.0.1', port=8020) # doctest: +SKIP
134137
"""
135-
136-
CONNECT_RETRIES = 5
137-
138138
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, user=None,
139139
ticket_cache=None, token=None, pars=None, connect=True):
140140
"""
@@ -185,44 +185,41 @@ def connect(self):
185185
if self._handle:
186186
return
187187

188-
o = _lib.hdfsNewBuilder()
189-
if self.port is not None:
190-
_lib.hdfsBuilderSetNameNodePort(o, self.port)
191-
_lib.hdfsBuilderSetNameNode(o, ensure_bytes(self.host))
192-
if self.user:
193-
_lib.hdfsBuilderSetUserName(o, ensure_bytes(self.user))
188+
with lock:
189+
o = _lib.hdfsNewBuilder()
190+
if self.port is not None:
191+
_lib.hdfsBuilderSetNameNodePort(o, self.port)
192+
_lib.hdfsBuilderSetNameNode(o, ensure_bytes(self.host))
193+
if self.user:
194+
_lib.hdfsBuilderSetUserName(o, ensure_bytes(self.user))
194195

195-
if self.ticket_cache:
196-
_lib.hdfsBuilderSetKerbTicketCachePath(o, ensure_bytes(self.ticket_cache))
196+
if self.ticket_cache:
197+
_lib.hdfsBuilderSetKerbTicketCachePath(o, ensure_bytes(self.ticket_cache))
197198

198-
if self.token:
199-
_lib.hdfsBuilderSetToken(o, ensure_bytes(self.token))
199+
if self.token:
200+
_lib.hdfsBuilderSetToken(o, ensure_bytes(self.token))
200201

201-
for par, val in self.pars.items():
202-
if not _lib.hdfsBuilderConfSetStr(o, ensure_bytes(par), ensure_bytes(val)) == 0:
203-
warnings.warn('Setting conf parameter %s failed' % par)
202+
for par, val in self.pars.items():
203+
if not _lib.hdfsBuilderConfSetStr(o, ensure_bytes(par), ensure_bytes(val)) == 0:
204+
warnings.warn('Setting conf parameter %s failed' % par)
204205

205-
trial = 0
206-
while trial < self.CONNECT_RETRIES:
207206
fs = _lib.hdfsBuilderConnect(o)
208-
trial += 1
209207
if fs:
210-
break
211-
if fs:
212-
logger.debug("Connect to handle %d", fs.contents.filesystem)
213-
self._handle = fs
214-
#if self.token: # TODO: find out what a delegation token is
215-
# self._token = _lib.hdfsGetDelegationToken(self._handle,
216-
# ensure_bytes(self.user))
217-
else:
218-
msg = ensure_string(_lib.hdfsGetLastError())
219-
raise ConnectionError('Connection Failed: {}'.format(msg))
208+
logger.debug("Connect to handle %d", fs.contents.filesystem)
209+
self._handle = fs
210+
#if self.token: # TODO: find out what a delegation token is
211+
# self._token = _lib.hdfsGetDelegationToken(self._handle,
212+
# ensure_bytes(self.user))
213+
else:
214+
msg = ensure_string(_lib.hdfsGetLastError())
215+
raise ConnectionError('Connection Failed: {}'.format(msg))
220216

221217
def disconnect(self):
222218
""" Disconnect from name node """
223219
if self._handle:
224220
logger.debug("Disconnect from handle %d", self._handle.contents.filesystem)
225-
_lib.hdfsDisconnect(self._handle)
221+
with lock:
222+
_lib.hdfsDisconnect(self._handle)
226223
self._handle = None
227224

228225
def open(self, path, mode='rb', replication=0, buff=0, block_size=0):
@@ -276,8 +273,9 @@ def du(self, path, total=False, deep=False):
276273

277274
def df(self):
278275
""" Used/free disc space on the HDFS system """
279-
cap = _lib.hdfsGetCapacity(self._handle)
280-
used = _lib.hdfsGetUsed(self._handle)
276+
with lock:
277+
cap = _lib.hdfsGetCapacity(self._handle)
278+
used = _lib.hdfsGetUsed(self._handle)
281279
return {'capacity': cap, 'used': used, 'percent-free': 100*(cap-used)/cap}
282280

283281
def get_block_locations(self, path, start=0, length=0):
@@ -287,26 +285,29 @@ def get_block_locations(self, path, start=0, length=0):
287285
start = int(start) or 0
288286
length = int(length) or self.info(path)['size']
289287
nblocks = ctypes.c_int(0)
290-
out = _lib.hdfsGetFileBlockLocations(self._handle, ensure_bytes(path),
291-
ctypes.c_int64(start), ctypes.c_int64(length),
292-
ctypes.byref(nblocks))
288+
with lock:
289+
out = _lib.hdfsGetFileBlockLocations(self._handle, ensure_bytes(path),
290+
ctypes.c_int64(start), ctypes.c_int64(length),
291+
ctypes.byref(nblocks))
293292
locs = []
294293
for i in range(nblocks.value):
295294
block = out[i]
296295
hosts = [block.hosts[i] for i in
297296
range(block.numOfNodes)]
298297
locs.append({'hosts': hosts, 'length': block.length,
299298
'offset': block.offset})
300-
_lib.hdfsFreeFileBlockLocations(out, nblocks)
299+
with lock:
300+
_lib.hdfsFreeFileBlockLocations(out, nblocks)
301301
return locs
302302

303303
def info(self, path):
304304
""" File information (as a dict) """
305305
if not self.exists(path):
306306
raise FileNotFoundError(path)
307-
fi = _lib.hdfsGetPathInfo(self._handle, ensure_bytes(path)).contents
308-
out = info_to_dict(fi)
309-
_lib.hdfsFreeFileInfo(ctypes.byref(fi), 1)
307+
with lock:
308+
fi = _lib.hdfsGetPathInfo(self._handle, ensure_bytes(path)).contents
309+
out = info_to_dict(fi)
310+
_lib.hdfsFreeFileInfo(ctypes.byref(fi), 1)
310311
return ensure_string(out)
311312

312313
def walk(self, path):
@@ -358,9 +359,10 @@ def ls(self, path, detail=True):
358359
if not self.exists(path):
359360
raise FileNotFoundError(path)
360361
num = ctypes.c_int(0)
361-
fi = _lib.hdfsListDirectory(self._handle, ensure_bytes(path), ctypes.byref(num))
362-
out = [ensure_string(info_to_dict(fi[i])) for i in range(num.value)]
363-
_lib.hdfsFreeFileInfo(fi, num.value)
362+
with lock:
363+
fi = _lib.hdfsListDirectory(self._handle, ensure_bytes(path), ctypes.byref(num))
364+
out = [ensure_string(info_to_dict(fi[i])) for i in range(num.value)]
365+
_lib.hdfsFreeFileInfo(fi, num.value)
364366
if detail:
365367
return out
366368
else:
@@ -376,10 +378,11 @@ def __del__(self):
376378

377379
def mkdir(self, path):
378380
""" Make directory at path """
379-
out = _lib.hdfsCreateDirectory(self._handle, ensure_bytes(path))
380-
if out != 0:
381-
msg = ensure_string(_lib.hdfsGetLastError())
382-
raise IOError('Create directory failed: {}'.format(msg))
381+
with lock:
382+
out = _lib.hdfsCreateDirectory(self._handle, ensure_bytes(path))
383+
if out != 0:
384+
msg = ensure_string(_lib.hdfsGetLastError())
385+
raise IOError('Create directory failed: {}'.format(msg))
383386

384387
def set_replication(self, path, replication):
385388
""" Instruct HDFS to set the replication for the given file.
@@ -391,31 +394,35 @@ def set_replication(self, path, replication):
391394
"""
392395
if replication < 0:
393396
raise ValueError('Replication must be positive, or 0 for system default')
394-
out = _lib.hdfsSetReplication(self._handle, ensure_bytes(path),
395-
ctypes.c_int16(int(replication)))
396-
if out != 0:
397-
msg = ensure_string(_lib.hdfsGetLastError())
398-
raise IOError('Set replication failed: {}'.format(msg))
397+
with lock:
398+
out = _lib.hdfsSetReplication(self._handle, ensure_bytes(path),
399+
ctypes.c_int16(int(replication)))
400+
if out != 0:
401+
msg = ensure_string(_lib.hdfsGetLastError())
402+
raise IOError('Set replication failed: {}'.format(msg))
399403

400404
def mv(self, path1, path2):
401405
""" Move file at path1 to path2 """
402406
if not self.exists(path1):
403407
raise FileNotFoundError(path1)
404-
out = _lib.hdfsRename(self._handle, ensure_bytes(path1), ensure_bytes(path2))
408+
with lock:
409+
out = _lib.hdfsRename(self._handle, ensure_bytes(path1), ensure_bytes(path2))
405410
return out == 0
406411

407412
def rm(self, path, recursive=True):
408413
"Use recursive for `rm -r`, i.e., delete directory and contents"
409414
if not self.exists(path):
410415
raise FileNotFoundError(path)
411-
out = _lib.hdfsDelete(self._handle, ensure_bytes(path), bool(recursive))
412-
if out != 0:
413-
msg = ensure_string(_lib.hdfsGetLastError())
414-
raise IOError('Remove failed on %s %s' % (path, msg))
416+
with lock:
417+
out = _lib.hdfsDelete(self._handle, ensure_bytes(path), bool(recursive))
418+
if out != 0:
419+
msg = ensure_string(_lib.hdfsGetLastError())
420+
raise IOError('Remove failed on %s %s' % (path, msg))
415421

416422
def exists(self, path):
417423
""" Is there an entry at path? """
418-
out = _lib.hdfsExists(self._handle, ensure_bytes(path) )
424+
with lock:
425+
out = _lib.hdfsExists(self._handle, ensure_bytes(path) )
419426
return out == 0
420427

421428
def chmod(self, path, mode):
@@ -441,20 +448,22 @@ def chmod(self, path, mode):
441448
"""
442449
if not self.exists(path):
443450
raise FileNotFoundError(path)
444-
out = _lib.hdfsChmod(self._handle, ensure_bytes(path), ctypes.c_short(mode))
445-
if out != 0:
446-
msg = ensure_string(_lib.hdfsGetLastError())
447-
raise IOError("chmod failed on %s %s" % (path, msg))
451+
with lock:
452+
out = _lib.hdfsChmod(self._handle, ensure_bytes(path), ctypes.c_short(mode))
453+
if out != 0:
454+
msg = ensure_string(_lib.hdfsGetLastError())
455+
raise IOError("chmod failed on %s %s" % (path, msg))
448456

449457
def chown(self, path, owner, group):
450458
""" Change owner/group """
451459
if not self.exists(path):
452460
raise FileNotFoundError(path)
453-
out = _lib.hdfsChown(self._handle, ensure_bytes(path), ensure_bytes(owner),
454-
ensure_bytes(group))
455-
if out != 0:
456-
msg = ensure_string(_lib.hdfsGetLastError())
457-
raise IOError("chown failed on %s %s" % (path, msg))
461+
with lock:
462+
out = _lib.hdfsChown(self._handle, ensure_bytes(path), ensure_bytes(owner),
463+
ensure_bytes(group))
464+
if out != 0:
465+
msg = ensure_string(_lib.hdfsGetLastError())
466+
raise IOError("chown failed on %s %s" % (path, msg))
458467

459468
def cat(self, path):
460469
""" Return contents of file """

hdfs3/tests/test_hdfs3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def test_token_and_ticket_cache_in_same_time():
6666
assert msg in str(ctx.value)
6767

6868

69+
@pytest.mark.slow
6970
def test_connection_error():
7071
with pytest.raises(ConnectionError) as ctx:
7172
hdfs = HDFileSystem(host='localhost', port=9999, connect=False)

0 commit comments

Comments
 (0)