Fix project isolation: Make loadChatHistory respect active project sessions

- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
uroma
2026-01-22 14:43:05 +00:00
Unverified
parent b82837aa5f
commit 55aafbae9a
6463 changed files with 1115462 additions and 4486 deletions

View File

@@ -0,0 +1,3 @@
from __future__ import annotations
__version__ = "0.46.2"

View File

@@ -0,0 +1,25 @@
"""
Wheel command line tool (enables the ``python -m wheel`` syntax)
"""
from __future__ import annotations
import sys
from typing import NoReturn
def main() -> NoReturn: # needed for console script
if __package__ == "":
# To be able to run 'python wheel-0.9.whl/wheel':
import os.path
path = os.path.dirname(os.path.dirname(__file__))
sys.path[0:0] = [path]
from ._commands import main as cli_main
sys.exit(cli_main())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,616 @@
"""
Create a wheel (.whl) distribution.
A wheel is a built archive format.
"""
from __future__ import annotations
import logging
import os
import re
import shutil
import stat
import struct
import sys
import sysconfig
import warnings
from collections.abc import Iterable, Sequence
from email.generator import BytesGenerator, Generator
from email.policy import EmailPolicy
from glob import iglob
from shutil import rmtree
from typing import TYPE_CHECKING, Callable, Literal, cast
from zipfile import ZIP_DEFLATED, ZIP_STORED
import setuptools
from packaging import tags
from packaging import version as _packaging_version
from setuptools import Command
from . import __version__ as wheel_version
from ._metadata import pkginfo_to_metadata
from .wheelfile import WheelFile
if TYPE_CHECKING:
import types
# ensure Python logging is configured
try:
__import__("setuptools.logging")
except ImportError:
# setuptools < ??
from . import _setuptools_logging
_setuptools_logging.configure()
log = logging.getLogger("wheel")
def safe_name(name: str) -> str:
"""Convert an arbitrary string to a standard distribution name
Any runs of non-alphanumeric/. characters are replaced with a single '-'.
"""
return re.sub("[^A-Za-z0-9.]+", "-", name)
def safe_version(version: str) -> str:
"""
Convert an arbitrary string to a standard version string
"""
try:
# normalize the version
return str(_packaging_version.Version(version))
except _packaging_version.InvalidVersion:
version = version.replace(" ", ".")
return re.sub("[^A-Za-z0-9.]+", "-", version)
setuptools_major_version = int(setuptools.__version__.split(".")[0])
PY_LIMITED_API_PATTERN = r"cp3\d"
def _is_32bit_interpreter() -> bool:
return struct.calcsize("P") == 4
def python_tag() -> str:
return f"py{sys.version_info[0]}"
def get_platform(archive_root: str | None) -> str:
"""Return our platform name 'win32', 'linux_x86_64'"""
result = sysconfig.get_platform()
if result.startswith("macosx") and archive_root is not None:
from .macosx_libfile import calculate_macosx_platform_tag
result = calculate_macosx_platform_tag(archive_root, result)
elif _is_32bit_interpreter():
if result == "linux-x86_64":
# pip pull request #3497
result = "linux-i686"
elif result == "linux-aarch64":
# packaging pull request #234
# TODO armv8l, packaging pull request #690 => this did not land
# in pip/packaging yet
result = "linux-armv7l"
return result.replace("-", "_")
def get_flag(
var: str, fallback: bool, expected: bool = True, warn: bool = True
) -> bool:
"""Use a fallback value for determining SOABI flags if the needed config
var is unset or unavailable."""
val = sysconfig.get_config_var(var)
if val is None:
if warn:
warnings.warn(
f"Config variable '{var}' is unset, Python ABI tag may be incorrect",
RuntimeWarning,
stacklevel=2,
)
return fallback
return val == expected
def get_abi_tag() -> str | None:
"""Return the ABI tag based on SOABI (if available) or emulate SOABI (PyPy2)."""
soabi: str = sysconfig.get_config_var("SOABI")
impl = tags.interpreter_name()
if not soabi and impl in ("cp", "pp") and hasattr(sys, "maxunicode"):
d = ""
m = ""
u = ""
if get_flag("Py_DEBUG", hasattr(sys, "gettotalrefcount"), warn=(impl == "cp")):
d = "d"
if get_flag(
"WITH_PYMALLOC",
impl == "cp",
warn=(impl == "cp" and sys.version_info < (3, 8)),
) and sys.version_info < (3, 8):
m = "m"
abi = f"{impl}{tags.interpreter_version()}{d}{m}{u}"
elif soabi and impl == "cp" and soabi.startswith("cpython"):
# non-Windows
abi = "cp" + soabi.split("-")[1]
elif soabi and impl == "cp" and soabi.startswith("cp"):
# Windows
abi = soabi.split("-")[0]
elif soabi and impl == "pp":
# we want something like pypy36-pp73
abi = "-".join(soabi.split("-")[:2])
abi = abi.replace(".", "_").replace("-", "_")
elif soabi and impl == "graalpy":
abi = "-".join(soabi.split("-")[:3])
abi = abi.replace(".", "_").replace("-", "_")
elif soabi:
abi = soabi.replace(".", "_").replace("-", "_")
else:
abi = None
return abi
def safer_name(name: str) -> str:
return safe_name(name).replace("-", "_")
def safer_version(version: str) -> str:
return safe_version(version).replace("-", "_")
def remove_readonly(
func: Callable[..., object],
path: str,
excinfo: tuple[type[Exception], Exception, types.TracebackType],
) -> None:
remove_readonly_exc(func, path, excinfo[1])
def remove_readonly_exc(func: Callable[..., object], path: str, exc: Exception) -> None:
os.chmod(path, stat.S_IWRITE)
func(path)
class bdist_wheel(Command):
description = "create a wheel distribution"
supported_compressions = {
"stored": ZIP_STORED,
"deflated": ZIP_DEFLATED,
}
user_options = [
("bdist-dir=", "b", "temporary directory for creating the distribution"),
(
"plat-name=",
"p",
"platform name to embed in generated filenames "
f"(default: {get_platform(None)})",
),
(
"keep-temp",
"k",
"keep the pseudo-installation tree around after "
"creating the distribution archive",
),
("dist-dir=", "d", "directory to put final built distributions in"),
("skip-build", None, "skip rebuilding everything (for testing/debugging)"),
(
"relative",
None,
"build the archive using relative paths (default: false)",
),
(
"owner=",
"u",
"Owner name used when creating a tar file [default: current user]",
),
(
"group=",
"g",
"Group name used when creating a tar file [default: current group]",
),
("universal", None, "make a universal wheel (default: false)"),
(
"compression=",
None,
"zipfile compression (one of: {}) (default: 'deflated')".format(
", ".join(supported_compressions)
),
),
(
"python-tag=",
None,
f"Python implementation compatibility tag (default: '{python_tag()}')",
),
(
"build-number=",
None,
"Build number for this particular version. "
"As specified in PEP-0427, this must start with a digit. "
"[default: None]",
),
(
"py-limited-api=",
None,
"Python tag (cp32|cp33|cpNN) for abi3 wheel tag (default: false)",
),
]
boolean_options = ["keep-temp", "skip-build", "relative", "universal"]
def initialize_options(self):
self.bdist_dir: str = None
self.data_dir = None
self.plat_name: str | None = None
self.plat_tag = None
self.format = "zip"
self.keep_temp = False
self.dist_dir: str | None = None
self.egginfo_dir = None
self.root_is_pure: bool | None = None
self.skip_build = None
self.relative = False
self.owner = None
self.group = None
self.universal: bool = False
self.compression: str | int = "deflated"
self.python_tag: str = python_tag()
self.build_number: str | None = None
self.py_limited_api: str | Literal[False] = False
self.plat_name_supplied = False
def finalize_options(self):
if self.bdist_dir is None:
bdist_base = self.get_finalized_command("bdist").bdist_base
self.bdist_dir = os.path.join(bdist_base, "wheel")
egg_info = self.distribution.get_command_obj("egg_info")
egg_info.ensure_finalized() # needed for correct `wheel_dist_name`
self.data_dir = self.wheel_dist_name + ".data"
self.plat_name_supplied = self.plat_name is not None
try:
self.compression = self.supported_compressions[self.compression]
except KeyError:
raise ValueError(f"Unsupported compression: {self.compression}") from None
need_options = ("dist_dir", "plat_name", "skip_build")
self.set_undefined_options("bdist", *zip(need_options, need_options))
self.root_is_pure = not (
self.distribution.has_ext_modules() or self.distribution.has_c_libraries()
)
if self.py_limited_api and not re.match(
PY_LIMITED_API_PATTERN, self.py_limited_api
):
raise ValueError(f"py-limited-api must match '{PY_LIMITED_API_PATTERN}'")
# Support legacy [wheel] section for setting universal
wheel = self.distribution.get_option_dict("wheel")
if "universal" in wheel:
# please don't define this in your global configs
log.warning(
"The [wheel] section is deprecated. Use [bdist_wheel] instead.",
)
val = wheel["universal"][1].strip()
if val.lower() in ("1", "true", "yes"):
self.universal = True
if self.build_number is not None and not self.build_number[:1].isdigit():
raise ValueError("Build tag (build-number) must start with a digit.")
@property
def wheel_dist_name(self):
"""Return distribution full name with - replaced with _"""
components = (
safer_name(self.distribution.get_name()),
safer_version(self.distribution.get_version()),
)
if self.build_number:
components += (self.build_number,)
return "-".join(components)
def get_tag(self) -> tuple[str, str, str]:
# bdist sets self.plat_name if unset, we should only use it for purepy
# wheels if the user supplied it.
if self.plat_name_supplied:
plat_name = cast(str, self.plat_name)
elif self.root_is_pure:
plat_name = "any"
else:
# macosx contains system version in platform name so need special handle
if self.plat_name and not self.plat_name.startswith("macosx"):
plat_name = self.plat_name
else:
# on macosx always limit the platform name to comply with any
# c-extension modules in bdist_dir, since the user can specify
# a higher MACOSX_DEPLOYMENT_TARGET via tools like CMake
# on other platforms, and on macosx if there are no c-extension
# modules, use the default platform name.
plat_name = get_platform(self.bdist_dir)
if _is_32bit_interpreter():
if plat_name in ("linux-x86_64", "linux_x86_64"):
plat_name = "linux_i686"
if plat_name in ("linux-aarch64", "linux_aarch64"):
# TODO armv8l, packaging pull request #690 => this did not land
# in pip/packaging yet
plat_name = "linux_armv7l"
plat_name = (
plat_name.lower().replace("-", "_").replace(".", "_").replace(" ", "_")
)
if self.root_is_pure:
if self.universal:
impl = "py2.py3"
else:
impl = self.python_tag
tag = (impl, "none", plat_name)
else:
impl_name = tags.interpreter_name()
impl_ver = tags.interpreter_version()
impl = impl_name + impl_ver
# We don't work on CPython 3.1, 3.0.
if self.py_limited_api and (impl_name + impl_ver).startswith("cp3"):
impl = self.py_limited_api
abi_tag = "abi3"
else:
abi_tag = str(get_abi_tag()).lower()
tag = (impl, abi_tag, plat_name)
# issue gh-374: allow overriding plat_name
supported_tags = [
(t.interpreter, t.abi, plat_name) for t in tags.sys_tags()
]
assert tag in supported_tags, (
f"would build wheel with unsupported tag {tag}"
)
return tag
def run(self):
build_scripts = self.reinitialize_command("build_scripts")
build_scripts.executable = "python"
build_scripts.force = True
build_ext = self.reinitialize_command("build_ext")
build_ext.inplace = False
if not self.skip_build:
self.run_command("build")
install = self.reinitialize_command("install", reinit_subcommands=True)
install.root = self.bdist_dir
install.compile = False
install.skip_build = self.skip_build
install.warn_dir = False
# A wheel without setuptools scripts is more cross-platform.
# Use the (undocumented) `no_ep` option to setuptools'
# install_scripts command to avoid creating entry point scripts.
install_scripts = self.reinitialize_command("install_scripts")
install_scripts.no_ep = True
# Use a custom scheme for the archive, because we have to decide
# at installation time which scheme to use.
for key in ("headers", "scripts", "data", "purelib", "platlib"):
setattr(install, "install_" + key, os.path.join(self.data_dir, key))
basedir_observed = ""
if os.name == "nt":
# win32 barfs if any of these are ''; could be '.'?
# (distutils.command.install:change_roots bug)
basedir_observed = os.path.normpath(os.path.join(self.data_dir, ".."))
self.install_libbase = self.install_lib = basedir_observed
setattr(
install,
"install_purelib" if self.root_is_pure else "install_platlib",
basedir_observed,
)
log.info(f"installing to {self.bdist_dir}")
self.run_command("install")
impl_tag, abi_tag, plat_tag = self.get_tag()
archive_basename = f"{self.wheel_dist_name}-{impl_tag}-{abi_tag}-{plat_tag}"
if not self.relative:
archive_root = self.bdist_dir
else:
archive_root = os.path.join(
self.bdist_dir, self._ensure_relative(install.install_base)
)
self.set_undefined_options("install_egg_info", ("target", "egginfo_dir"))
distinfo_dirname = (
f"{safer_name(self.distribution.get_name())}-"
f"{safer_version(self.distribution.get_version())}.dist-info"
)
distinfo_dir = os.path.join(self.bdist_dir, distinfo_dirname)
self.egg2dist(self.egginfo_dir, distinfo_dir)
self.write_wheelfile(distinfo_dir)
# Make the archive
if not os.path.exists(self.dist_dir):
os.makedirs(self.dist_dir)
wheel_path = os.path.join(self.dist_dir, archive_basename + ".whl")
with WheelFile(wheel_path, "w", self.compression) as wf:
wf.write_files(archive_root)
# Add to 'Distribution.dist_files' so that the "upload" command works
getattr(self.distribution, "dist_files", []).append(
(
"bdist_wheel",
"{}.{}".format(*sys.version_info[:2]), # like 3.7
wheel_path,
)
)
if not self.keep_temp:
log.info(f"removing {self.bdist_dir}")
if not self.dry_run:
if sys.version_info < (3, 12):
rmtree(self.bdist_dir, onerror=remove_readonly)
else:
rmtree(self.bdist_dir, onexc=remove_readonly_exc)
def write_wheelfile(
self, wheelfile_base: str, generator: str = f"bdist_wheel ({wheel_version})"
):
from email.message import Message
msg = Message()
msg["Wheel-Version"] = "1.0" # of the spec
msg["Generator"] = generator
msg["Root-Is-Purelib"] = str(self.root_is_pure).lower()
if self.build_number is not None:
msg["Build"] = self.build_number
# Doesn't work for bdist_wininst
impl_tag, abi_tag, plat_tag = self.get_tag()
for impl in impl_tag.split("."):
for abi in abi_tag.split("."):
for plat in plat_tag.split("."):
msg["Tag"] = "-".join((impl, abi, plat))
wheelfile_path = os.path.join(wheelfile_base, "WHEEL")
log.info(f"creating {wheelfile_path}")
with open(wheelfile_path, "wb") as f:
BytesGenerator(f, maxheaderlen=0).flatten(msg)
def _ensure_relative(self, path: str) -> str:
# copied from dir_util, deleted
drive, path = os.path.splitdrive(path)
if path[0:1] == os.sep:
path = drive + path[1:]
return path
@property
def license_paths(self) -> Iterable[str]:
if setuptools_major_version >= 57:
# Setuptools has resolved any patterns to actual file names
return self.distribution.metadata.license_files or ()
files: set[str] = set()
metadata = self.distribution.get_option_dict("metadata")
if setuptools_major_version >= 42:
# Setuptools recognizes the license_files option but does not do globbing
patterns = cast(Sequence[str], self.distribution.metadata.license_files)
else:
# Prior to those, wheel is entirely responsible for handling license files
if "license_files" in metadata:
patterns = metadata["license_files"][1].split()
else:
patterns = ()
if "license_file" in metadata:
warnings.warn(
'The "license_file" option is deprecated. Use "license_files" instead.',
DeprecationWarning,
stacklevel=2,
)
files.add(metadata["license_file"][1])
if not files and not patterns and not isinstance(patterns, list):
patterns = ("LICEN[CS]E*", "COPYING*", "NOTICE*", "AUTHORS*")
for pattern in patterns:
for path in iglob(pattern):
if path.endswith("~"):
log.debug(
f'ignoring license file "{path}" as it looks like a backup'
)
continue
if path not in files and os.path.isfile(path):
log.info(
f'adding license file "{path}" (matched pattern "{pattern}")'
)
files.add(path)
return files
def egg2dist(self, egginfo_path: str, distinfo_path: str):
"""Convert an .egg-info directory into a .dist-info directory"""
def adios(p: str) -> None:
"""Appropriately delete directory, file or link."""
if os.path.exists(p) and not os.path.islink(p) and os.path.isdir(p):
shutil.rmtree(p)
elif os.path.exists(p):
os.unlink(p)
adios(distinfo_path)
if not os.path.exists(egginfo_path):
# There is no egg-info. This is probably because the egg-info
# file/directory is not named matching the distribution name used
# to name the archive file. Check for this case and report
# accordingly.
import glob
pat = os.path.join(os.path.dirname(egginfo_path), "*.egg-info")
possible = glob.glob(pat)
err = f"Egg metadata expected at {egginfo_path} but not found"
if possible:
alt = os.path.basename(possible[0])
err += f" ({alt} found - possible misnamed archive file?)"
raise ValueError(err)
if os.path.isfile(egginfo_path):
# .egg-info is a single file
pkg_info = pkginfo_to_metadata(egginfo_path, egginfo_path)
os.mkdir(distinfo_path)
else:
# .egg-info is a directory
pkginfo_path = os.path.join(egginfo_path, "PKG-INFO")
pkg_info = pkginfo_to_metadata(egginfo_path, pkginfo_path)
# ignore common egg metadata that is useless to wheel
shutil.copytree(
egginfo_path,
distinfo_path,
ignore=lambda x, y: {
"PKG-INFO",
"requires.txt",
"SOURCES.txt",
"not-zip-safe",
},
)
# delete dependency_links if it is only whitespace
dependency_links_path = os.path.join(distinfo_path, "dependency_links.txt")
with open(dependency_links_path, encoding="utf-8") as dependency_links_file:
dependency_links = dependency_links_file.read().strip()
if not dependency_links:
adios(dependency_links_path)
pkg_info_path = os.path.join(distinfo_path, "METADATA")
serialization_policy = EmailPolicy(
utf8=True,
mangle_from_=False,
max_line_length=0,
)
with open(pkg_info_path, "w", encoding="utf-8") as out:
Generator(out, policy=serialization_policy).flatten(pkg_info)
for license_path in self.license_paths:
filename = os.path.basename(license_path)
shutil.copy(license_path, os.path.join(distinfo_path, filename))
adios(egginfo_path)

View File

@@ -0,0 +1,153 @@
"""
Wheel command-line utility.
"""
from __future__ import annotations
import argparse
import os
import sys
from argparse import ArgumentTypeError
from ..wheelfile import WheelError
def unpack_f(args: argparse.Namespace) -> None:
from .unpack import unpack
unpack(args.wheelfile, args.dest)
def pack_f(args: argparse.Namespace) -> None:
from .pack import pack
pack(args.directory, args.dest_dir, args.build_number)
def convert_f(args: argparse.Namespace) -> None:
from .convert import convert
convert(args.files, args.dest_dir, args.verbose)
def tags_f(args: argparse.Namespace) -> None:
from .tags import tags
names = (
tags(
wheel,
args.python_tag,
args.abi_tag,
args.platform_tag,
args.build,
args.remove,
)
for wheel in args.wheel
)
for name in names:
print(name)
def version_f(args: argparse.Namespace) -> None:
from .. import __version__
print(f"wheel {__version__}")
def parse_build_tag(build_tag: str) -> str:
if build_tag and not build_tag[0].isdigit():
raise ArgumentTypeError("build tag must begin with a digit")
elif "-" in build_tag:
raise ArgumentTypeError("invalid character ('-') in build tag")
return build_tag
TAGS_HELP = """\
Make a new wheel with given tags. Any tags unspecified will remain the same.
Starting the tags with a "+" will append to the existing tags. Starting with a
"-" will remove a tag (use --option=-TAG syntax). Multiple tags can be
separated by ".". The original file will remain unless --remove is given. The
output filename(s) will be displayed on stdout for further processing.
"""
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser()
s = p.add_subparsers(help="commands")
unpack_parser = s.add_parser("unpack", help="Unpack wheel")
unpack_parser.add_argument(
"--dest", "-d", help="Destination directory", default="."
)
unpack_parser.add_argument("wheelfile", help="Wheel file")
unpack_parser.set_defaults(func=unpack_f)
repack_parser = s.add_parser("pack", help="Repack wheel")
repack_parser.add_argument("directory", help="Root directory of the unpacked wheel")
repack_parser.add_argument(
"--dest-dir",
"-d",
default=os.path.curdir,
help="Directory to store the wheel (default %(default)s)",
)
repack_parser.add_argument(
"--build-number", help="Build tag to use in the wheel name"
)
repack_parser.set_defaults(func=pack_f)
convert_parser = s.add_parser("convert", help="Convert egg or wininst to wheel")
convert_parser.add_argument("files", nargs="*", help="Files to convert")
convert_parser.add_argument(
"--dest-dir",
"-d",
default=os.path.curdir,
help="Directory to store wheels (default %(default)s)",
)
convert_parser.add_argument("--verbose", "-v", action="store_true")
convert_parser.set_defaults(func=convert_f)
tags_parser = s.add_parser(
"tags", help="Add or replace the tags on a wheel", description=TAGS_HELP
)
tags_parser.add_argument("wheel", nargs="*", help="Existing wheel(s) to retag")
tags_parser.add_argument(
"--remove",
action="store_true",
help="Remove the original files, keeping only the renamed ones",
)
tags_parser.add_argument(
"--python-tag", metavar="TAG", help="Specify an interpreter tag(s)"
)
tags_parser.add_argument("--abi-tag", metavar="TAG", help="Specify an ABI tag(s)")
tags_parser.add_argument(
"--platform-tag", metavar="TAG", help="Specify a platform tag(s)"
)
tags_parser.add_argument(
"--build", type=parse_build_tag, metavar="BUILD", help="Specify a build tag"
)
tags_parser.set_defaults(func=tags_f)
version_parser = s.add_parser("version", help="Print version and exit")
version_parser.set_defaults(func=version_f)
help_parser = s.add_parser("help", help="Show this help")
help_parser.set_defaults(func=lambda args: p.print_help())
return p
def main() -> int:
p = parser()
args = p.parse_args()
if not hasattr(args, "func"):
p.print_help()
else:
try:
args.func(args)
return 0
except WheelError as e:
print(e, file=sys.stderr)
return 1

View File

@@ -0,0 +1,337 @@
from __future__ import annotations
import os.path
import re
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections.abc import Iterator
from email.message import Message
from email.parser import Parser
from email.policy import EmailPolicy
from glob import iglob
from pathlib import Path
from textwrap import dedent
from zipfile import ZipFile
from packaging.tags import parse_tag
from .. import __version__
from .._metadata import generate_requirements
from ..wheelfile import WheelFile
egg_filename_re = re.compile(
r"""
(?P<name>.+?)-(?P<ver>.+?)
(-(?P<pyver>py\d\.\d+)
(-(?P<arch>.+?))?
)?.egg$""",
re.VERBOSE,
)
egg_info_re = re.compile(
r"""
^(?P<name>.+?)-(?P<ver>.+?)
(-(?P<pyver>py\d\.\d+)
)?.egg-info/""",
re.VERBOSE,
)
wininst_re = re.compile(
r"\.(?P<platform>win32|win-amd64)(?:-(?P<pyver>py\d\.\d))?\.exe$"
)
pyd_re = re.compile(r"\.(?P<abi>[a-z0-9]+)-(?P<platform>win32|win_amd64)\.pyd$")
serialization_policy = EmailPolicy(
utf8=True,
mangle_from_=False,
max_line_length=0,
)
GENERATOR = f"wheel {__version__}"
def convert_requires(requires: str, metadata: Message) -> None:
extra: str | None = None
requirements: dict[str | None, list[str]] = defaultdict(list)
for line in requires.splitlines():
line = line.strip()
if not line:
continue
if line.startswith("[") and line.endswith("]"):
extra = line[1:-1]
continue
requirements[extra].append(line)
for key, value in generate_requirements(requirements):
metadata.add_header(key, value)
def convert_pkg_info(pkginfo: str, metadata: Message) -> None:
parsed_message = Parser().parsestr(pkginfo)
for key, value in parsed_message.items():
key_lower = key.lower()
if value == "UNKNOWN":
continue
if key_lower == "description":
description_lines = value.splitlines()
if description_lines:
value = "\n".join(
(
description_lines[0].lstrip(),
dedent("\n".join(description_lines[1:])),
"\n",
)
)
else:
value = "\n"
metadata.set_payload(value)
elif key_lower == "home-page":
metadata.add_header("Project-URL", f"Homepage, {value}")
elif key_lower == "download-url":
metadata.add_header("Project-URL", f"Download, {value}")
else:
metadata.add_header(key, value)
metadata.replace_header("Metadata-Version", "2.4")
def normalize(name: str) -> str:
return re.sub(r"[-_.]+", "-", name).lower().replace("-", "_")
class ConvertSource(metaclass=ABCMeta):
name: str
version: str
pyver: str = "py2.py3"
abi: str = "none"
platform: str = "any"
metadata: Message
@property
def dist_info_dir(self) -> str:
return f"{self.name}-{self.version}.dist-info"
@abstractmethod
def generate_contents(self) -> Iterator[tuple[str, bytes]]:
pass
class EggFileSource(ConvertSource):
def __init__(self, path: Path):
if not (match := egg_filename_re.match(path.name)):
raise ValueError(f"Invalid egg file name: {path.name}")
# Binary wheels are assumed to be for CPython
self.path = path
self.name = normalize(match.group("name"))
self.version = match.group("ver")
if pyver := match.group("pyver"):
self.pyver = pyver.replace(".", "")
if arch := match.group("arch"):
self.abi = self.pyver.replace("py", "cp")
self.platform = normalize(arch)
self.metadata = Message()
def generate_contents(self) -> Iterator[tuple[str, bytes]]:
with ZipFile(self.path, "r") as zip_file:
for filename in sorted(zip_file.namelist()):
# Skip pure directory entries
if filename.endswith("/"):
continue
# Handle files in the egg-info directory specially, selectively moving
# them to the dist-info directory while converting as needed
if filename.startswith("EGG-INFO/"):
if filename == "EGG-INFO/requires.txt":
requires = zip_file.read(filename).decode("utf-8")
convert_requires(requires, self.metadata)
elif filename == "EGG-INFO/PKG-INFO":
pkginfo = zip_file.read(filename).decode("utf-8")
convert_pkg_info(pkginfo, self.metadata)
elif filename == "EGG-INFO/entry_points.txt":
yield (
f"{self.dist_info_dir}/entry_points.txt",
zip_file.read(filename),
)
continue
# For any other file, just pass it through
yield filename, zip_file.read(filename)
class EggDirectorySource(EggFileSource):
def generate_contents(self) -> Iterator[tuple[str, bytes]]:
for dirpath, _, filenames in os.walk(self.path):
for filename in sorted(filenames):
path = Path(dirpath, filename)
if path.parent.name == "EGG-INFO":
if path.name == "requires.txt":
requires = path.read_text("utf-8")
convert_requires(requires, self.metadata)
elif path.name == "PKG-INFO":
pkginfo = path.read_text("utf-8")
convert_pkg_info(pkginfo, self.metadata)
if name := self.metadata.get("Name"):
self.name = normalize(name)
if version := self.metadata.get("Version"):
self.version = version
elif path.name == "entry_points.txt":
yield (
f"{self.dist_info_dir}/entry_points.txt",
path.read_bytes(),
)
continue
# For any other file, just pass it through
yield str(path.relative_to(self.path)), path.read_bytes()
class WininstFileSource(ConvertSource):
"""
Handles distributions created with ``bdist_wininst``.
The egginfo filename has the format::
name-ver(-pyver)(-arch).egg-info
The installer filename has the format::
name-ver.arch(-pyver).exe
Some things to note:
1. The installer filename is not definitive. An installer can be renamed
and work perfectly well as an installer. So more reliable data should
be used whenever possible.
2. The egg-info data should be preferred for the name and version, because
these come straight from the distutils metadata, and are mandatory.
3. The pyver from the egg-info data should be ignored, as it is
constructed from the version of Python used to build the installer,
which is irrelevant - the installer filename is correct here (even to
the point that when it's not there, any version is implied).
4. The architecture must be taken from the installer filename, as it is
not included in the egg-info data.
5. Architecture-neutral installers still have an architecture because the
installer format itself (being executable) is architecture-specific. We
should therefore ignore the architecture if the content is pure-python.
"""
def __init__(self, path: Path):
self.path = path
self.metadata = Message()
# Determine the initial architecture and Python version from the file name
# (if possible)
if match := wininst_re.search(path.name):
self.platform = normalize(match.group("platform"))
if pyver := match.group("pyver"):
self.pyver = pyver.replace(".", "")
# Look for an .egg-info directory and any .pyd files for more precise info
egg_info_found = pyd_found = False
with ZipFile(self.path) as zip_file:
for filename in zip_file.namelist():
prefix, filename = filename.split("/", 1)
if not egg_info_found and (match := egg_info_re.match(filename)):
egg_info_found = True
self.name = normalize(match.group("name"))
self.version = match.group("ver")
if pyver := match.group("pyver"):
self.pyver = pyver.replace(".", "")
elif not pyd_found and (match := pyd_re.search(filename)):
pyd_found = True
self.abi = match.group("abi")
self.platform = match.group("platform")
if egg_info_found and pyd_found:
break
def generate_contents(self) -> Iterator[tuple[str, bytes]]:
dist_info_dir = f"{self.name}-{self.version}.dist-info"
data_dir = f"{self.name}-{self.version}.data"
with ZipFile(self.path, "r") as zip_file:
for filename in sorted(zip_file.namelist()):
# Skip pure directory entries
if filename.endswith("/"):
continue
# Handle files in the egg-info directory specially, selectively moving
# them to the dist-info directory while converting as needed
prefix, target_filename = filename.split("/", 1)
if egg_info_re.search(target_filename):
basename = target_filename.rsplit("/", 1)[-1]
if basename == "requires.txt":
requires = zip_file.read(filename).decode("utf-8")
convert_requires(requires, self.metadata)
elif basename == "PKG-INFO":
pkginfo = zip_file.read(filename).decode("utf-8")
convert_pkg_info(pkginfo, self.metadata)
elif basename == "entry_points.txt":
yield (
f"{dist_info_dir}/entry_points.txt",
zip_file.read(filename),
)
continue
elif prefix == "SCRIPTS":
target_filename = f"{data_dir}/scripts/{target_filename}"
# For any other file, just pass it through
yield target_filename, zip_file.read(filename)
def convert(files: list[str], dest_dir: str, verbose: bool) -> None:
for pat in files:
for archive in iglob(pat):
path = Path(archive)
if path.suffix == ".egg":
if path.is_dir():
source: ConvertSource = EggDirectorySource(path)
else:
source = EggFileSource(path)
else:
source = WininstFileSource(path)
if verbose:
print(f"{archive}...", flush=True, end="")
dest_path = Path(dest_dir) / (
f"{source.name}-{source.version}-{source.pyver}-{source.abi}"
f"-{source.platform}.whl"
)
with WheelFile(dest_path, "w") as wheelfile:
for name_or_zinfo, contents in source.generate_contents():
wheelfile.writestr(name_or_zinfo, contents)
# Write the METADATA file
wheelfile.writestr(
f"{source.dist_info_dir}/METADATA",
source.metadata.as_string(policy=serialization_policy).encode(
"utf-8"
),
)
# Write the WHEEL file
wheel_message = Message()
wheel_message.add_header("Wheel-Version", "1.0")
wheel_message.add_header("Generator", GENERATOR)
wheel_message.add_header(
"Root-Is-Purelib", str(source.platform == "any").lower()
)
tags = parse_tag(f"{source.pyver}-{source.abi}-{source.platform}")
for tag in sorted(tags, key=lambda tag: tag.interpreter):
wheel_message.add_header("Tag", str(tag))
wheelfile.writestr(
f"{source.dist_info_dir}/WHEEL",
wheel_message.as_string(policy=serialization_policy).encode(
"utf-8"
),
)
if verbose:
print("OK")

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import email.policy
import os.path
import re
from email.generator import BytesGenerator
from email.parser import BytesParser
from ..wheelfile import WheelError, WheelFile
DIST_INFO_RE = re.compile(r"^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))\.dist-info$")
def pack(directory: str, dest_dir: str, build_number: str | None) -> None:
"""Repack a previously unpacked wheel directory into a new wheel file.
The .dist-info/WHEEL file must contain one or more tags so that the target
wheel file name can be determined.
:param directory: The unpacked wheel directory
:param dest_dir: Destination directory (defaults to the current directory)
"""
# Find the .dist-info directory
dist_info_dirs = [
fn
for fn in os.listdir(directory)
if os.path.isdir(os.path.join(directory, fn)) and DIST_INFO_RE.match(fn)
]
if len(dist_info_dirs) > 1:
raise WheelError(f"Multiple .dist-info directories found in {directory}")
elif not dist_info_dirs:
raise WheelError(f"No .dist-info directories found in {directory}")
# Determine the target wheel filename
dist_info_dir = dist_info_dirs[0]
name_version = DIST_INFO_RE.match(dist_info_dir).group("namever")
# Read the tags and the existing build number from .dist-info/WHEEL
wheel_file_path = os.path.join(directory, dist_info_dir, "WHEEL")
with open(wheel_file_path, "rb") as f:
info = BytesParser(policy=email.policy.compat32).parse(f)
tags: list[str] = info.get_all("Tag", [])
existing_build_number = info.get("Build")
if not tags:
raise WheelError(
f"No tags present in {dist_info_dir}/WHEEL; cannot determine target "
f"wheel filename"
)
# Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
build_number = build_number if build_number is not None else existing_build_number
if build_number is not None:
del info["Build"]
if build_number:
info["Build"] = build_number
name_version += "-" + build_number
if build_number != existing_build_number:
with open(wheel_file_path, "wb") as f:
BytesGenerator(f, maxheaderlen=0).flatten(info)
# Reassemble the tags for the wheel file
tagline = compute_tagline(tags)
# Repack the wheel
wheel_path = os.path.join(dest_dir, f"{name_version}-{tagline}.whl")
with WheelFile(wheel_path, "w") as wf:
print(f"Repacking wheel as {wheel_path}...", end="", flush=True)
wf.write_files(directory)
print("OK")
def compute_tagline(tags: list[str]) -> str:
"""Compute a tagline from a list of tags.
:param tags: A list of tags
:return: A tagline
"""
impls = sorted({tag.split("-")[0] for tag in tags})
abivers = sorted({tag.split("-")[1] for tag in tags})
platforms = sorted({tag.split("-")[2] for tag in tags})
return "-".join([".".join(impls), ".".join(abivers), ".".join(platforms)])

View File

@@ -0,0 +1,140 @@
from __future__ import annotations
import email.policy
import itertools
import os
from collections.abc import Iterable
from email.parser import BytesParser
from ..wheelfile import WheelFile
def _compute_tags(original_tags: Iterable[str], new_tags: str | None) -> set[str]:
"""Add or replace tags. Supports dot-separated tags"""
if new_tags is None:
return set(original_tags)
if new_tags.startswith("+"):
return {*original_tags, *new_tags[1:].split(".")}
if new_tags.startswith("-"):
return set(original_tags) - set(new_tags[1:].split("."))
return set(new_tags.split("."))
def tags(
wheel: str,
python_tags: str | None = None,
abi_tags: str | None = None,
platform_tags: str | None = None,
build_tag: str | None = None,
remove: bool = False,
) -> str:
"""Change the tags on a wheel file.
The tags are left unchanged if they are not specified. To specify "none",
use ["none"]. To append to the previous tags, a tag should start with a
"+". If a tag starts with "-", it will be removed from existing tags.
Processing is done left to right.
:param wheel: The paths to the wheels
:param python_tags: The Python tags to set
:param abi_tags: The ABI tags to set
:param platform_tags: The platform tags to set
:param build_tag: The build tag to set
:param remove: Remove the original wheel
"""
with WheelFile(wheel, "r") as f:
assert f.filename, f"{f.filename} must be available"
wheel_info = f.read(f.dist_info_path + "/WHEEL")
info = BytesParser(policy=email.policy.compat32).parsebytes(wheel_info)
original_wheel_name = os.path.basename(f.filename)
namever = f.parsed_filename.group("namever")
build = f.parsed_filename.group("build")
original_python_tags = f.parsed_filename.group("pyver").split(".")
original_abi_tags = f.parsed_filename.group("abi").split(".")
original_plat_tags = f.parsed_filename.group("plat").split(".")
tags: list[str] = info.get_all("Tag", [])
existing_build_tag = info.get("Build")
impls = {tag.split("-")[0] for tag in tags}
abivers = {tag.split("-")[1] for tag in tags}
platforms = {tag.split("-")[2] for tag in tags}
if impls != set(original_python_tags):
msg = f"Wheel internal tags {impls!r} != filename tags {original_python_tags!r}"
raise AssertionError(msg)
if abivers != set(original_abi_tags):
msg = f"Wheel internal tags {abivers!r} != filename tags {original_abi_tags!r}"
raise AssertionError(msg)
if platforms != set(original_plat_tags):
msg = (
f"Wheel internal tags {platforms!r} != filename tags {original_plat_tags!r}"
)
raise AssertionError(msg)
if existing_build_tag != build:
msg = (
f"Incorrect filename '{build}' "
f"& *.dist-info/WHEEL '{existing_build_tag}' build numbers"
)
raise AssertionError(msg)
# Start changing as needed
if build_tag is not None:
build = build_tag
final_python_tags = sorted(_compute_tags(original_python_tags, python_tags))
final_abi_tags = sorted(_compute_tags(original_abi_tags, abi_tags))
final_plat_tags = sorted(_compute_tags(original_plat_tags, platform_tags))
final_tags = [
namever,
".".join(final_python_tags),
".".join(final_abi_tags),
".".join(final_plat_tags),
]
if build:
final_tags.insert(1, build)
final_wheel_name = "-".join(final_tags) + ".whl"
if original_wheel_name != final_wheel_name:
del info["Tag"], info["Build"]
for a, b, c in itertools.product(
final_python_tags, final_abi_tags, final_plat_tags
):
info["Tag"] = f"{a}-{b}-{c}"
if build:
info["Build"] = build
original_wheel_path = os.path.join(
os.path.dirname(f.filename), original_wheel_name
)
final_wheel_path = os.path.join(os.path.dirname(f.filename), final_wheel_name)
with (
WheelFile(original_wheel_path, "r") as fin,
WheelFile(final_wheel_path, "w") as fout,
):
fout.comment = fin.comment # preserve the comment
for item in fin.infolist():
if item.is_dir():
continue
if item.filename == f.dist_info_path + "/RECORD":
continue
if item.filename == f.dist_info_path + "/WHEEL":
fout.writestr(item, info.as_bytes())
else:
fout.writestr(item, fin.read(item))
if remove:
os.remove(original_wheel_path)
return final_wheel_name

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from pathlib import Path
from ..wheelfile import WheelFile
def unpack(path: str, dest: str = ".") -> None:
"""Unpack a wheel.
Wheel content will be unpacked to {dest}/{name}-{ver}, where {name}
is the package name and {ver} its version.
:param path: The path to the wheel.
:param dest: Destination directory (default to current directory).
"""
with WheelFile(path) as wf:
namever = wf.parsed_filename.group("namever")
destination = Path(dest) / namever
print(f"Unpacking to: {destination}...", end="", flush=True)
for zinfo in wf.filelist:
target_path = Path(wf.extract(zinfo, destination))
# Set permissions to the same values as they were set in the archive
# We have to do this manually due to
# https://github.com/python/cpython/issues/59999
permissions = zinfo.external_attr >> 16 & 0o777
target_path.chmod(permissions)
print("OK")

View File

@@ -0,0 +1,184 @@
"""
Tools for converting old- to new-style metadata.
"""
from __future__ import annotations
import functools
import itertools
import os.path
import re
import textwrap
from collections.abc import Generator, Iterable, Iterator
from email.message import Message
from email.parser import Parser
from typing import Literal
from packaging.requirements import Requirement
def _nonblank(str: str) -> bool | Literal[""]:
return str and not str.startswith("#")
@functools.singledispatch
def yield_lines(iterable: Iterable[str]) -> Iterator[str]:
r"""
Yield valid lines of a string or iterable.
>>> list(yield_lines(''))
[]
>>> list(yield_lines(['foo', 'bar']))
['foo', 'bar']
>>> list(yield_lines('foo\nbar'))
['foo', 'bar']
>>> list(yield_lines('\nfoo\n#bar\nbaz #comment'))
['foo', 'baz #comment']
>>> list(yield_lines(['foo\nbar', 'baz', 'bing\n\n\n']))
['foo', 'bar', 'baz', 'bing']
"""
return itertools.chain.from_iterable(map(yield_lines, iterable))
@yield_lines.register(str)
def _(text: str) -> Iterator[str]:
return filter(_nonblank, map(str.strip, text.splitlines()))
def split_sections(
s: str | Iterator[str],
) -> Generator[tuple[str | None, list[str]], None, None]:
"""Split a string or iterable thereof into (section, content) pairs
Each ``section`` is a stripped version of the section header ("[section]")
and each ``content`` is a list of stripped lines excluding blank lines and
comment-only lines. If there are any such lines before the first section
header, they're returned in a first ``section`` of ``None``.
"""
section = None
content: list[str] = []
for line in yield_lines(s):
if line.startswith("["):
if line.endswith("]"):
if section or content:
yield section, content
section = line[1:-1].strip()
content = []
else:
raise ValueError("Invalid section heading", line)
else:
content.append(line)
# wrap up last segment
yield section, content
def safe_extra(extra: str) -> str:
"""Convert an arbitrary string to a standard 'extra' name
Any runs of non-alphanumeric characters are replaced with a single '_',
and the result is always lowercased.
"""
return re.sub("[^A-Za-z0-9.-]+", "_", extra).lower()
def safe_name(name: str) -> str:
"""Convert an arbitrary string to a standard distribution name
Any runs of non-alphanumeric/. characters are replaced with a single '-'.
"""
return re.sub("[^A-Za-z0-9.]+", "-", name)
def requires_to_requires_dist(requirement: Requirement) -> str:
"""Return the version specifier for a requirement in PEP 345/566 fashion."""
if requirement.url:
return " @ " + requirement.url
requires_dist: list[str] = []
for spec in requirement.specifier:
requires_dist.append(spec.operator + spec.version)
if requires_dist:
return " " + ",".join(sorted(requires_dist))
else:
return ""
def convert_requirements(requirements: list[str]) -> Iterator[str]:
"""Yield Requires-Dist: strings for parsed requirements strings."""
for req in requirements:
parsed_requirement = Requirement(req)
spec = requires_to_requires_dist(parsed_requirement)
extras = ",".join(sorted(safe_extra(e) for e in parsed_requirement.extras))
if extras:
extras = f"[{extras}]"
yield safe_name(parsed_requirement.name) + extras + spec
def generate_requirements(
extras_require: dict[str | None, list[str]],
) -> Iterator[tuple[str, str]]:
"""
Convert requirements from a setup()-style dictionary to
('Requires-Dist', 'requirement') and ('Provides-Extra', 'extra') tuples.
extras_require is a dictionary of {extra: [requirements]} as passed to setup(),
using the empty extra {'': [requirements]} to hold install_requires.
"""
for extra, depends in extras_require.items():
condition = ""
extra = extra or ""
if ":" in extra: # setuptools extra:condition syntax
extra, condition = extra.split(":", 1)
extra = safe_extra(extra)
if extra:
yield "Provides-Extra", extra
if condition:
condition = "(" + condition + ") and "
condition += f"extra == '{extra}'"
if condition:
condition = " ; " + condition
for new_req in convert_requirements(depends):
canonical_req = str(Requirement(new_req + condition))
yield "Requires-Dist", canonical_req
def pkginfo_to_metadata(egg_info_path: str, pkginfo_path: str) -> Message:
"""
Convert .egg-info directory with PKG-INFO to the Metadata 2.1 format
"""
with open(pkginfo_path, encoding="utf-8") as headers:
pkg_info = Parser().parse(headers)
pkg_info.replace_header("Metadata-Version", "2.1")
# Those will be regenerated from `requires.txt`.
del pkg_info["Provides-Extra"]
del pkg_info["Requires-Dist"]
requires_path = os.path.join(egg_info_path, "requires.txt")
if os.path.exists(requires_path):
with open(requires_path, encoding="utf-8") as requires_file:
requires = requires_file.read()
parsed_requirements = sorted(split_sections(requires), key=lambda x: x[0] or "")
for extra, reqs in parsed_requirements:
for key, value in generate_requirements({extra: reqs}):
if (key, value) not in pkg_info.items():
pkg_info[key] = value
description = pkg_info["Description"]
if description:
description_lines = pkg_info["Description"].splitlines()
dedented_description = "\n".join(
# if the first line of long_description is blank,
# the first line here will be indented.
(
description_lines[0].lstrip(),
textwrap.dedent("\n".join(description_lines[1:])),
"\n",
)
)
pkg_info.set_payload(dedented_description)
del pkg_info["Description"]
return pkg_info

View File

@@ -0,0 +1,26 @@
from typing import TYPE_CHECKING
from warnings import warn
warn(
"The 'wheel' package is no longer the canonical location of the 'bdist_wheel' "
"command, and will be removed in a future release. Please update to setuptools "
"v70.1 or later which contains an integrated version of this command.",
FutureWarning,
stacklevel=1,
)
if TYPE_CHECKING:
from ._bdist_wheel import bdist_wheel as bdist_wheel
else:
try:
# Better integration/compatibility with setuptools:
# in the case new fixes or PEPs are implemented in setuptools
# there is no need to backport them to the deprecated code base.
# This is useful in the case of old packages in the ecosystem
# that are still used but have low maintenance.
from setuptools.command.bdist_wheel import bdist_wheel
except ImportError:
# Only used in the case of old setuptools versions.
# If the user wants to get the latest fixes/PEPs,
# they are encouraged to address the deprecation warning.
from ._bdist_wheel import bdist_wheel as bdist_wheel

View File

@@ -0,0 +1,486 @@
"""
IMPORTANT: DO NOT IMPORT THIS MODULE DIRECTLY.
THIS IS ONLY KEPT IN PLACE FOR BACKWARDS COMPATIBILITY WITH
setuptools.command.bdist_wheel.
This module contains function to analyse dynamic library
headers to extract system information
Currently only for MacOSX
Library file on macosx system starts with Mach-O or Fat field.
This can be distinguish by first 32 bites and it is called magic number.
Proper value of magic number is with suffix _MAGIC. Suffix _CIGAM means
reversed bytes order.
Both fields can occur in two types: 32 and 64 bytes.
FAT field inform that this library contains few version of library
(typically for different types version). It contains
information where Mach-O headers starts.
Each section started with Mach-O header contains one library
(So if file starts with this field it contains only one version).
After filed Mach-O there are section fields.
Each of them starts with two fields:
cmd - magic number for this command
cmdsize - total size occupied by this section information.
In this case only sections LC_VERSION_MIN_MACOSX (for macosx 10.13 and earlier)
and LC_BUILD_VERSION (for macosx 10.14 and newer) are interesting,
because them contains information about minimal system version.
Important remarks:
- For fat files this implementation looks for maximum number version.
It not check if it is 32 or 64 and do not compare it with currently built package.
So it is possible to false report higher version that needed.
- All structures signatures are taken form macosx header files.
- I think that binary format will be more stable than `otool` output.
and if apple introduce some changes both implementation will need to be updated.
- The system compile will set the deployment target no lower than
11.0 for arm64 builds. For "Universal 2" builds use the x86_64 deployment
target when the arm64 target is 11.0.
"""
from __future__ import annotations
import ctypes
import os
import sys
from io import BufferedIOBase
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Union
StrPath = Union[str, os.PathLike[str]]
"""here the needed const and struct from mach-o header files"""
FAT_MAGIC = 0xCAFEBABE
FAT_CIGAM = 0xBEBAFECA
FAT_MAGIC_64 = 0xCAFEBABF
FAT_CIGAM_64 = 0xBFBAFECA
MH_MAGIC = 0xFEEDFACE
MH_CIGAM = 0xCEFAEDFE
MH_MAGIC_64 = 0xFEEDFACF
MH_CIGAM_64 = 0xCFFAEDFE
LC_VERSION_MIN_MACOSX = 0x24
LC_BUILD_VERSION = 0x32
CPU_TYPE_ARM64 = 0x0100000C
mach_header_fields = [
("magic", ctypes.c_uint32),
("cputype", ctypes.c_int),
("cpusubtype", ctypes.c_int),
("filetype", ctypes.c_uint32),
("ncmds", ctypes.c_uint32),
("sizeofcmds", ctypes.c_uint32),
("flags", ctypes.c_uint32),
]
"""
struct mach_header {
uint32_t magic; /* mach magic number identifier */
cpu_type_t cputype; /* cpu specifier */
cpu_subtype_t cpusubtype; /* machine specifier */
uint32_t filetype; /* type of file */
uint32_t ncmds; /* number of load commands */
uint32_t sizeofcmds; /* the size of all the load commands */
uint32_t flags; /* flags */
};
typedef integer_t cpu_type_t;
typedef integer_t cpu_subtype_t;
"""
mach_header_fields_64 = mach_header_fields + [("reserved", ctypes.c_uint32)]
"""
struct mach_header_64 {
uint32_t magic; /* mach magic number identifier */
cpu_type_t cputype; /* cpu specifier */
cpu_subtype_t cpusubtype; /* machine specifier */
uint32_t filetype; /* type of file */
uint32_t ncmds; /* number of load commands */
uint32_t sizeofcmds; /* the size of all the load commands */
uint32_t flags; /* flags */
uint32_t reserved; /* reserved */
};
"""
fat_header_fields = [("magic", ctypes.c_uint32), ("nfat_arch", ctypes.c_uint32)]
"""
struct fat_header {
uint32_t magic; /* FAT_MAGIC or FAT_MAGIC_64 */
uint32_t nfat_arch; /* number of structs that follow */
};
"""
fat_arch_fields = [
("cputype", ctypes.c_int),
("cpusubtype", ctypes.c_int),
("offset", ctypes.c_uint32),
("size", ctypes.c_uint32),
("align", ctypes.c_uint32),
]
"""
struct fat_arch {
cpu_type_t cputype; /* cpu specifier (int) */
cpu_subtype_t cpusubtype; /* machine specifier (int) */
uint32_t offset; /* file offset to this object file */
uint32_t size; /* size of this object file */
uint32_t align; /* alignment as a power of 2 */
};
"""
fat_arch_64_fields = [
("cputype", ctypes.c_int),
("cpusubtype", ctypes.c_int),
("offset", ctypes.c_uint64),
("size", ctypes.c_uint64),
("align", ctypes.c_uint32),
("reserved", ctypes.c_uint32),
]
"""
struct fat_arch_64 {
cpu_type_t cputype; /* cpu specifier (int) */
cpu_subtype_t cpusubtype; /* machine specifier (int) */
uint64_t offset; /* file offset to this object file */
uint64_t size; /* size of this object file */
uint32_t align; /* alignment as a power of 2 */
uint32_t reserved; /* reserved */
};
"""
segment_base_fields = [("cmd", ctypes.c_uint32), ("cmdsize", ctypes.c_uint32)]
"""base for reading segment info"""
segment_command_fields = [
("cmd", ctypes.c_uint32),
("cmdsize", ctypes.c_uint32),
("segname", ctypes.c_char * 16),
("vmaddr", ctypes.c_uint32),
("vmsize", ctypes.c_uint32),
("fileoff", ctypes.c_uint32),
("filesize", ctypes.c_uint32),
("maxprot", ctypes.c_int),
("initprot", ctypes.c_int),
("nsects", ctypes.c_uint32),
("flags", ctypes.c_uint32),
]
"""
struct segment_command { /* for 32-bit architectures */
uint32_t cmd; /* LC_SEGMENT */
uint32_t cmdsize; /* includes sizeof section structs */
char segname[16]; /* segment name */
uint32_t vmaddr; /* memory address of this segment */
uint32_t vmsize; /* memory size of this segment */
uint32_t fileoff; /* file offset of this segment */
uint32_t filesize; /* amount to map from the file */
vm_prot_t maxprot; /* maximum VM protection */
vm_prot_t initprot; /* initial VM protection */
uint32_t nsects; /* number of sections in segment */
uint32_t flags; /* flags */
};
typedef int vm_prot_t;
"""
segment_command_fields_64 = [
("cmd", ctypes.c_uint32),
("cmdsize", ctypes.c_uint32),
("segname", ctypes.c_char * 16),
("vmaddr", ctypes.c_uint64),
("vmsize", ctypes.c_uint64),
("fileoff", ctypes.c_uint64),
("filesize", ctypes.c_uint64),
("maxprot", ctypes.c_int),
("initprot", ctypes.c_int),
("nsects", ctypes.c_uint32),
("flags", ctypes.c_uint32),
]
"""
struct segment_command_64 { /* for 64-bit architectures */
uint32_t cmd; /* LC_SEGMENT_64 */
uint32_t cmdsize; /* includes sizeof section_64 structs */
char segname[16]; /* segment name */
uint64_t vmaddr; /* memory address of this segment */
uint64_t vmsize; /* memory size of this segment */
uint64_t fileoff; /* file offset of this segment */
uint64_t filesize; /* amount to map from the file */
vm_prot_t maxprot; /* maximum VM protection */
vm_prot_t initprot; /* initial VM protection */
uint32_t nsects; /* number of sections in segment */
uint32_t flags; /* flags */
};
"""
version_min_command_fields = segment_base_fields + [
("version", ctypes.c_uint32),
("sdk", ctypes.c_uint32),
]
"""
struct version_min_command {
uint32_t cmd; /* LC_VERSION_MIN_MACOSX or
LC_VERSION_MIN_IPHONEOS or
LC_VERSION_MIN_WATCHOS or
LC_VERSION_MIN_TVOS */
uint32_t cmdsize; /* sizeof(struct min_version_command) */
uint32_t version; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t sdk; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
};
"""
build_version_command_fields = segment_base_fields + [
("platform", ctypes.c_uint32),
("minos", ctypes.c_uint32),
("sdk", ctypes.c_uint32),
("ntools", ctypes.c_uint32),
]
"""
struct build_version_command {
uint32_t cmd; /* LC_BUILD_VERSION */
uint32_t cmdsize; /* sizeof(struct build_version_command) plus */
/* ntools * sizeof(struct build_tool_version) */
uint32_t platform; /* platform */
uint32_t minos; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t sdk; /* X.Y.Z is encoded in nibbles xxxx.yy.zz */
uint32_t ntools; /* number of tool entries following this */
};
"""
def swap32(x: int) -> int:
return (
((x << 24) & 0xFF000000)
| ((x << 8) & 0x00FF0000)
| ((x >> 8) & 0x0000FF00)
| ((x >> 24) & 0x000000FF)
)
def get_base_class_and_magic_number(
lib_file: BufferedIOBase,
seek: int | None = None,
) -> tuple[type[ctypes.Structure], int]:
if seek is None:
seek = lib_file.tell()
else:
lib_file.seek(seek)
magic_number = ctypes.c_uint32.from_buffer_copy(
lib_file.read(ctypes.sizeof(ctypes.c_uint32))
).value
# Handle wrong byte order
if magic_number in [FAT_CIGAM, FAT_CIGAM_64, MH_CIGAM, MH_CIGAM_64]:
if sys.byteorder == "little":
BaseClass = ctypes.BigEndianStructure
else:
BaseClass = ctypes.LittleEndianStructure
magic_number = swap32(magic_number)
else:
BaseClass = ctypes.Structure
lib_file.seek(seek)
return BaseClass, magic_number
def read_data(struct_class: type[ctypes.Structure], lib_file: BufferedIOBase):
return struct_class.from_buffer_copy(lib_file.read(ctypes.sizeof(struct_class)))
def extract_macosx_min_system_version(path_to_lib: str):
with open(path_to_lib, "rb") as lib_file:
BaseClass, magic_number = get_base_class_and_magic_number(lib_file, 0)
if magic_number not in [FAT_MAGIC, FAT_MAGIC_64, MH_MAGIC, MH_MAGIC_64]:
return
if magic_number in [FAT_MAGIC, FAT_CIGAM_64]:
class FatHeader(BaseClass):
_fields_ = fat_header_fields
fat_header = read_data(FatHeader, lib_file)
if magic_number == FAT_MAGIC:
class FatArch(BaseClass):
_fields_ = fat_arch_fields
else:
class FatArch(BaseClass):
_fields_ = fat_arch_64_fields
fat_arch_list = [
read_data(FatArch, lib_file) for _ in range(fat_header.nfat_arch)
]
versions_list: list[tuple[int, int, int]] = []
for el in fat_arch_list:
try:
version = read_mach_header(lib_file, el.offset)
if version is not None:
if el.cputype == CPU_TYPE_ARM64 and len(fat_arch_list) != 1:
# Xcode will not set the deployment target below 11.0.0
# for the arm64 architecture. Ignore the arm64 deployment
# in fat binaries when the target is 11.0.0, that way
# the other architectures can select a lower deployment
# target.
# This is safe because there is no arm64 variant for
# macOS 10.15 or earlier.
if version == (11, 0, 0):
continue
versions_list.append(version)
except ValueError:
pass
if len(versions_list) > 0:
return max(versions_list)
else:
return None
else:
try:
return read_mach_header(lib_file, 0)
except ValueError:
"""when some error during read library files"""
return None
def read_mach_header(
lib_file: BufferedIOBase,
seek: int | None = None,
) -> tuple[int, int, int] | None:
"""
This function parses a Mach-O header and extracts
information about the minimal macOS version.
:param lib_file: reference to opened library file with pointer
"""
base_class, magic_number = get_base_class_and_magic_number(lib_file, seek)
arch = "32" if magic_number == MH_MAGIC else "64"
class SegmentBase(base_class):
_fields_ = segment_base_fields
if arch == "32":
class MachHeader(base_class):
_fields_ = mach_header_fields
else:
class MachHeader(base_class):
_fields_ = mach_header_fields_64
mach_header = read_data(MachHeader, lib_file)
for _i in range(mach_header.ncmds):
pos = lib_file.tell()
segment_base = read_data(SegmentBase, lib_file)
lib_file.seek(pos)
if segment_base.cmd == LC_VERSION_MIN_MACOSX:
class VersionMinCommand(base_class):
_fields_ = version_min_command_fields
version_info = read_data(VersionMinCommand, lib_file)
return parse_version(version_info.version)
elif segment_base.cmd == LC_BUILD_VERSION:
class VersionBuild(base_class):
_fields_ = build_version_command_fields
version_info = read_data(VersionBuild, lib_file)
return parse_version(version_info.minos)
else:
lib_file.seek(pos + segment_base.cmdsize)
continue
def parse_version(version: int) -> tuple[int, int, int]:
x = (version & 0xFFFF0000) >> 16
y = (version & 0x0000FF00) >> 8
z = version & 0x000000FF
return x, y, z
def calculate_macosx_platform_tag(archive_root: StrPath, platform_tag: str) -> str:
"""
Calculate proper macosx platform tag basing on files which are included to wheel
Example platform tag `macosx-10.14-x86_64`
"""
prefix, base_version, suffix = platform_tag.split("-")
base_version = tuple(int(x) for x in base_version.split("."))
base_version = base_version[:2]
if base_version[0] > 10:
base_version = (base_version[0], 0)
assert len(base_version) == 2
if "MACOSX_DEPLOYMENT_TARGET" in os.environ:
deploy_target = tuple(
int(x) for x in os.environ["MACOSX_DEPLOYMENT_TARGET"].split(".")
)
deploy_target = deploy_target[:2]
if deploy_target[0] > 10:
deploy_target = (deploy_target[0], 0)
if deploy_target < base_version:
sys.stderr.write(
"[WARNING] MACOSX_DEPLOYMENT_TARGET is set to a lower value ({}) than "
"the version on which the Python interpreter was compiled ({}), and "
"will be ignored.\n".format(
".".join(str(x) for x in deploy_target),
".".join(str(x) for x in base_version),
)
)
else:
base_version = deploy_target
assert len(base_version) == 2
start_version = base_version
versions_dict: dict[str, tuple[int, int]] = {}
for dirpath, _dirnames, filenames in os.walk(archive_root):
for filename in filenames:
if filename.endswith(".dylib") or filename.endswith(".so"):
lib_path = os.path.join(dirpath, filename)
min_ver = extract_macosx_min_system_version(lib_path)
if min_ver is not None:
min_ver = min_ver[0:2]
if min_ver[0] > 10:
min_ver = (min_ver[0], 0)
versions_dict[lib_path] = min_ver
if len(versions_dict) > 0:
base_version = max(base_version, max(versions_dict.values()))
# macosx platform tag do not support minor bugfix release
fin_base_version = "_".join([str(x) for x in base_version])
if start_version < base_version:
problematic_files = [k for k, v in versions_dict.items() if v > start_version]
problematic_files = "\n".join(problematic_files)
if len(problematic_files) == 1:
files_form = "this file"
else:
files_form = "these files"
error_message = (
"[WARNING] This wheel needs a higher macOS version than {} "
"To silence this warning, set MACOSX_DEPLOYMENT_TARGET to at least "
+ fin_base_version
+ " or recreate "
+ files_form
+ " with lower "
"MACOSX_DEPLOYMENT_TARGET: \n" + problematic_files
)
if "MACOSX_DEPLOYMENT_TARGET" in os.environ:
error_message = error_message.format(
"is set in MACOSX_DEPLOYMENT_TARGET variable."
)
else:
error_message = error_message.format(
"the version your Python interpreter is compiled against."
)
sys.stderr.write(error_message)
platform_tag = prefix + "_" + fin_base_version + "_" + suffix
return platform_tag

View File

@@ -0,0 +1,17 @@
from warnings import warn
from ._metadata import convert_requirements as convert_requirements
from ._metadata import generate_requirements as generate_requirements
from ._metadata import pkginfo_to_metadata as pkginfo_to_metadata
from ._metadata import requires_to_requires_dist as requires_to_requires_dist
from ._metadata import safe_extra as safe_extra
from ._metadata import safe_name as safe_name
from ._metadata import split_sections as split_sections
warn(
f"The {__name__!r} package has been made private and should no longer be imported. "
f"Please either copy the code or find an alternative library to import it from, as "
f"this warning will be removed in a future version of 'wheel'.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -0,0 +1,241 @@
from __future__ import annotations
__all__ = ["WHEEL_INFO_RE", "WheelFile", "WheelError"]
import base64
import csv
import hashlib
import logging
import os.path
import re
import stat
import time
from io import StringIO, TextIOWrapper
from typing import IO, TYPE_CHECKING, Literal
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo
if TYPE_CHECKING:
from _typeshed import SizedBuffer, StrPath
# Non-greedy matching of an optional build number may be too clever (more
# invalid wheel filenames will match). Separate regex for .dist-info?
WHEEL_INFO_RE = re.compile(
r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))?
-(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""",
re.VERBOSE,
)
MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC
log = logging.getLogger("wheel")
class WheelError(Exception):
pass
def urlsafe_b64encode(data: bytes) -> bytes:
"""urlsafe_b64encode without padding"""
return base64.urlsafe_b64encode(data).rstrip(b"=")
def urlsafe_b64decode(data: bytes) -> bytes:
"""urlsafe_b64decode without padding"""
pad = b"=" * (4 - (len(data) & 3))
return base64.urlsafe_b64decode(data + pad)
def get_zipinfo_datetime(
timestamp: float | None = None,
) -> tuple[int, int, int, int, int]:
# Some applications need reproducible .whl files, but they can't do this without
# forcing the timestamp of the individual ZipInfo objects. See issue #143.
timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time()))
timestamp = max(timestamp, MINIMUM_TIMESTAMP)
return time.gmtime(timestamp)[0:6]
class WheelFile(ZipFile):
"""A ZipFile derivative class that also reads SHA-256 hashes from
.dist-info/RECORD and checks any read files against those.
"""
_default_algorithm = hashlib.sha256
def __init__(
self,
file: StrPath,
mode: Literal["r", "w", "x", "a"] = "r",
compression: int = ZIP_DEFLATED,
):
basename = os.path.basename(file)
self.parsed_filename = WHEEL_INFO_RE.match(basename)
if not basename.endswith(".whl") or self.parsed_filename is None:
raise WheelError(f"Bad wheel filename {basename!r}")
ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True)
self.dist_info_path = "{}.dist-info".format(
self.parsed_filename.group("namever")
)
self.record_path = self.dist_info_path + "/RECORD"
self._file_hashes: dict[str, tuple[None, None] | tuple[int, bytes]] = {}
self._file_sizes = {}
if mode == "r":
# Ignore RECORD and any embedded wheel signatures
self._file_hashes[self.record_path] = None, None
self._file_hashes[self.record_path + ".jws"] = None, None
self._file_hashes[self.record_path + ".p7s"] = None, None
# Fill in the expected hashes by reading them from RECORD
try:
record = self.open(self.record_path)
except KeyError:
raise WheelError(f"Missing {self.record_path} file") from None
with record:
for line in csv.reader(
TextIOWrapper(record, newline="", encoding="utf-8")
):
path, hash_sum, size = line
if not hash_sum:
continue
algorithm, hash_sum = hash_sum.split("=")
try:
hashlib.new(algorithm)
except ValueError:
raise WheelError(
f"Unsupported hash algorithm: {algorithm}"
) from None
if algorithm.lower() in {"md5", "sha1"}:
raise WheelError(
f"Weak hash algorithm ({algorithm}) is not permitted by "
f"PEP 427"
)
self._file_hashes[path] = (
algorithm,
urlsafe_b64decode(hash_sum.encode("ascii")),
)
def open(
self,
name_or_info: str | ZipInfo,
mode: Literal["r", "w"] = "r",
pwd: bytes | None = None,
) -> IO[bytes]:
def _update_crc(newdata: bytes) -> None:
eof = ef._eof
update_crc_orig(newdata)
running_hash.update(newdata)
if eof and running_hash.digest() != expected_hash:
raise WheelError(f"Hash mismatch for file '{ef_name}'")
ef_name = (
name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info
)
if (
mode == "r"
and not ef_name.endswith("/")
and ef_name not in self._file_hashes
):
raise WheelError(f"No hash found for file '{ef_name}'")
ef = ZipFile.open(self, name_or_info, mode, pwd)
if mode == "r" and not ef_name.endswith("/"):
algorithm, expected_hash = self._file_hashes[ef_name]
if expected_hash is not None:
# Monkey patch the _update_crc method to also check for the hash from
# RECORD
running_hash = hashlib.new(algorithm)
update_crc_orig, ef._update_crc = ef._update_crc, _update_crc
return ef
def write_files(self, base_dir: str) -> None:
log.info("creating %r and adding %r to it", self.filename, base_dir)
deferred: list[tuple[str, str]] = []
for root, dirnames, filenames in os.walk(base_dir):
# Sort the directory names so that `os.walk` will walk them in a
# defined order on the next iteration.
dirnames.sort()
for name in sorted(filenames):
path = os.path.normpath(os.path.join(root, name))
if os.path.isfile(path):
arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/")
if arcname == self.record_path:
pass
elif root.endswith(".dist-info"):
deferred.append((path, arcname))
else:
self.write(path, arcname)
deferred.sort()
for path, arcname in deferred:
self.write(path, arcname)
def write(
self,
filename: str,
arcname: str | None = None,
compress_type: int | None = None,
) -> None:
with open(filename, "rb") as f:
st = os.fstat(f.fileno())
data = f.read()
zinfo = ZipInfo(
arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime)
)
zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16
zinfo.compress_type = compress_type or self.compression
self.writestr(zinfo, data, compress_type)
def writestr(
self,
zinfo_or_arcname: str | ZipInfo,
data: SizedBuffer | str,
compress_type: int | None = None,
) -> None:
if isinstance(zinfo_or_arcname, str):
zinfo_or_arcname = ZipInfo(
zinfo_or_arcname, date_time=get_zipinfo_datetime()
)
zinfo_or_arcname.compress_type = self.compression
zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16
if isinstance(data, str):
data = data.encode("utf-8")
ZipFile.writestr(self, zinfo_or_arcname, data, compress_type)
fname = (
zinfo_or_arcname.filename
if isinstance(zinfo_or_arcname, ZipInfo)
else zinfo_or_arcname
)
log.info("adding %r", fname)
if fname != self.record_path:
hash_ = self._default_algorithm(data)
self._file_hashes[fname] = (
hash_.name,
urlsafe_b64encode(hash_.digest()).decode("ascii"),
)
self._file_sizes[fname] = len(data)
def close(self) -> None:
# Write RECORD
if self.fp is not None and self.mode == "w" and self._file_hashes:
data = StringIO()
writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n")
writer.writerows(
(
(fname, algorithm + "=" + hash_, self._file_sizes[fname])
for fname, (algorithm, hash_) in self._file_hashes.items()
)
)
writer.writerow((format(self.record_path), "", ""))
self.writestr(self.record_path, data.getvalue())
ZipFile.close(self)