Skip to content

gh-101566: Sync with zipp 3.14. #102018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Lib/test/test_zipfile/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import contextlib
import time


class DeadlineExceeded(Exception):
pass


class TimedContext(contextlib.ContextDecorator):
"""
A context that will raise DeadlineExceeded if the
max duration is reached during the execution.

>>> TimedContext(1)(time.sleep)(.1)
>>> TimedContext(0)(time.sleep)(.1)
Traceback (most recent call last):
...
tests._context.DeadlineExceeded: (..., 0)
"""

def __init__(self, max_duration: int):
self.max_duration = max_duration

def __enter__(self):
self.start = time.monotonic()

def __exit__(self, *err):
duration = time.monotonic() - self.start
if duration > self.max_duration:
raise DeadlineExceeded(duration, self.max_duration)
8 changes: 8 additions & 0 deletions Lib/test/test_zipfile/_func_timeout_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
try:
from func_timeout import func_set_timeout as set_timeout
except ImportError: # pragma: no cover
# provide a fallback that doesn't actually time out
from ._context import TimedContext as set_timeout


__all__ = ['set_timeout']
29 changes: 29 additions & 0 deletions Lib/test/test_zipfile/_itertools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
import itertools


# from jaraco.itertools 6.3.0
class Counter:
"""
Wrap an iterable in an object that stores the count of items
that pass through it.

>>> items = Counter(range(20))
>>> items.count
0
>>> values = list(items)
>>> items.count
20
"""

def __init__(self, i):
self.count = 0
self.iter = zip(itertools.count(1), i)

def __iter__(self):
return self

def __next__(self):
self.count, result = next(self.iter)
return result


# from more_itertools v8.13.0
def always_iterable(obj, base_type=(str, bytes)):
if obj is None:
Expand Down
137 changes: 84 additions & 53 deletions Lib/test/test_zipfile/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,25 @@
import pathlib
import pickle
import string
from test.support.script_helper import assert_python_ok
import sys
import unittest
import zipfile

from ._test_params import parameterize, Invoked
from ._functools import compose
from ._itertools import Counter

from ._test_params import parameterize, Invoked
from ._func_timeout_compat import set_timeout

from test.support.os_helper import temp_dir


# Poor man's technique to consume a (smallish) iterable.
consume = tuple


# from jaraco.itertools 5.0
class jaraco:
class itertools:
class Counter:
def __init__(self, i):
self.count = 0
self._orig_iter = iter(i)
Counter = Counter

def __iter__(self):
return self

def __next__(self):
result = next(self._orig_iter)
self.count += 1
return result
consume = tuple


def add_dirs(zf):
Expand Down Expand Up @@ -161,10 +150,10 @@ def test_open_encoding_utf16(self):
u16 = path.joinpath("16.txt")
with u16.open('r', "utf-16") as strm:
data = strm.read()
self.assertEqual(data, "This was utf-16")
assert data == "This was utf-16"
with u16.open(encoding="utf-16") as strm:
data = strm.read()
self.assertEqual(data, "This was utf-16")
assert data == "This was utf-16"

def test_open_encoding_errors(self):
in_memory_file = io.BytesIO()
Expand All @@ -177,9 +166,9 @@ def test_open_encoding_errors(self):

# encoding= as a positional argument for gh-101144.
data = u16.read_text("utf-8", errors="ignore")
self.assertEqual(data, "invalid utf-8: .")
assert data == "invalid utf-8: ."
with u16.open("r", "utf-8", errors="surrogateescape") as f:
self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.")
assert f.read() == "invalid utf-8: \udcff\udcff."

# encoding= both positional and keyword is an error; gh-101144.
with self.assertRaisesRegex(TypeError, "encoding"):
Expand All @@ -191,24 +180,21 @@ def test_open_encoding_errors(self):
with self.assertRaises(UnicodeDecodeError):
f.read()

def test_encoding_warnings(self):
@unittest.skipIf(
not getattr(sys.flags, 'warn_default_encoding', 0),
"Requires warn_default_encoding",
)
@pass_alpharep
def test_encoding_warnings(self, alpharep):
"""EncodingWarning must blame the read_text and open calls."""
code = '''\
import io, zipfile
with zipfile.ZipFile(io.BytesIO(), "w") as zf:
zf.filename = '<test_encoding_warnings in memory zip file>'
zf.writestr("path/file.txt", b"Spanish Inquisition")
root = zipfile.Path(zf)
(path,) = root.iterdir()
file_path = path.joinpath("file.txt")
unused = file_path.read_text() # should warn
file_path.open("r").close() # should warn
'''
proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
warnings = proc.err.splitlines()
self.assertEqual(len(warnings), 2, proc.err)
self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:")
self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:")
assert sys.flags.warn_default_encoding
root = zipfile.Path(alpharep)
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").read_text()
assert __file__ == wc.filename
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").open("r").close()
assert __file__ == wc.filename

def test_open_write(self):
"""
Expand Down Expand Up @@ -250,7 +236,8 @@ def test_read(self, alpharep):
root = zipfile.Path(alpharep)
a, b, g = root.iterdir()
assert a.read_text(encoding="utf-8") == "content of a"
a.read_text("utf-8") # No positional arg TypeError per gh-101144.
# Also check positional encoding arg (gh-101144).
assert a.read_text("utf-8") == "content of a"
assert a.read_bytes() == b"content of a"

@pass_alpharep
Expand All @@ -275,19 +262,6 @@ def test_traverse_truediv(self, alpharep):
e = root / "b" / "d" / "e.txt"
assert e.read_text(encoding="utf-8") == "content of e"

@pass_alpharep
def test_traverse_simplediv(self, alpharep):
"""
Disable the __future__.division when testing traversal.
"""
code = compile(
source="zipfile.Path(alpharep) / 'a'",
filename="(test)",
mode="eval",
dont_inherit=True,
)
eval(code)

@pass_alpharep
def test_pathlike_construction(self, alpharep):
"""
Expand Down Expand Up @@ -356,7 +330,7 @@ def test_joinpath_constant_time(self):
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES

# @func_timeout.func_set_timeout(3)
@set_timeout(3)
def test_implied_dirs_performance(self):
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
zipfile.CompleteDirs._implied_dirs(data)
Expand Down Expand Up @@ -472,6 +446,52 @@ def test_root_unnamed(self, alpharep):
assert sub.name == "b"
assert sub.parent

@pass_alpharep
def test_match_and_glob(self, alpharep):
root = zipfile.Path(alpharep)
assert not root.match("*.txt")

assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]

files = root.glob("**/*.txt")
assert all(each.match("*.txt") for each in files)

assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))

def test_glob_empty(self):
root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
with self.assertRaises(ValueError):
root.glob('')

@pass_alpharep
def test_eq_hash(self, alpharep):
root = zipfile.Path(alpharep)
assert root == zipfile.Path(alpharep)

assert root != (root / "a.txt")
assert (root / "a.txt") == (root / "a.txt")

root = zipfile.Path(alpharep)
assert root in {root}

@pass_alpharep
def test_is_symlink(self, alpharep):
"""
See python/cpython#82102 for symlink support beyond this object.
"""

root = zipfile.Path(alpharep)
assert not root.is_symlink()

@pass_alpharep
def test_relative_to(self, alpharep):
root = zipfile.Path(alpharep)
relative = root.joinpath("b", "c.txt").relative_to(root / "b")
assert str(relative) == "c.txt"

relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
assert str(relative) == "d/e.txt"

@pass_alpharep
def test_inheritance(self, alpharep):
cls = type('PathChild', (zipfile.Path,), {})
Expand All @@ -493,3 +513,14 @@ def test_pickle(self, alpharep, path_type, subpath):
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
assert first.read_text().startswith('content of ')

@pass_alpharep
def test_extract_orig_with_implied_dirs(self, alpharep):
"""
A zip file wrapped in a Path should extract even with implied dirs.
"""
source_path = self.zipfile_ondisk(alpharep)
zf = zipfile.ZipFile(source_path)
# wrap the zipfile for its side effect
zipfile.Path(zf)
zf.extractall(source_path.parent)
63 changes: 60 additions & 3 deletions Lib/zipfile/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import itertools
import contextlib
import pathlib
import re
import fnmatch


__all__ = ['Path']
Expand Down Expand Up @@ -93,7 +95,7 @@ def _implied_dirs(names):
return _dedupe(_difference(as_dirs, names))

def namelist(self):
names = super(CompleteDirs, self).namelist()
names = super().namelist()
return names + list(self._implied_dirs(names))

def _name_set(self):
Expand All @@ -109,6 +111,17 @@ def resolve_dir(self, name):
dir_match = name not in names and dirname in names
return dirname if dir_match else name

def getinfo(self, name):
"""
Supplement getinfo for implied dirs.
"""
try:
return super().getinfo(name)
except KeyError:
if not name.endswith('/') or name not in self._name_set():
raise
return zipfile.ZipInfo(filename=name)

@classmethod
def make(cls, source):
"""
Expand Down Expand Up @@ -138,13 +151,13 @@ class FastLookup(CompleteDirs):
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
self.__names = super(FastLookup, self).namelist()
self.__names = super().namelist()
return self.__names

def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
self.__lookup = super(FastLookup, self)._name_set()
self.__lookup = super()._name_set()
return self.__lookup


Expand Down Expand Up @@ -246,6 +259,18 @@ def __init__(self, root, at=""):
self.root = FastLookup.make(root)
self.at = at

def __eq__(self, other):
"""
>>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
False
"""
if self.__class__ is not other.__class__:
return NotImplemented
return (self.root, self.at) == (other.root, other.at)

def __hash__(self):
return hash((self.root, self.at))

def open(self, mode='r', *args, pwd=None, **kwargs):
"""
Open this entry as text or binary following the semantics
Expand Down Expand Up @@ -316,6 +341,38 @@ def iterdir(self):
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)

def match(self, path_pattern):
return pathlib.Path(self.at).match(path_pattern)

def is_symlink(self):
"""
Return whether this path is a symlink. Always false (python/cpython#82102).
"""
return False

def _descendants(self):
for child in self.iterdir():
yield child
if child.is_dir():
yield from child._descendants()

def glob(self, pattern):
if not pattern:
raise ValueError(f"Unacceptable pattern: {pattern!r}")

matches = re.compile(fnmatch.translate(pattern)).fullmatch
return (
child
for child in self._descendants()
if matches(str(child.relative_to(self)))
)

def rglob(self, pattern):
return self.glob(f'**/{pattern}')

def relative_to(self, other, *extra):
return posixpath.relpath(str(self), str(other.joinpath(*extra)))

def __str__(self):
return posixpath.join(self.root.filename, self.at)

Expand Down
Loading