deezer-linux/flatpak-node-generator.py

2053 lines
77 KiB
Python
Executable File

#!/usr/bin/env python3
# pyright: strict
__license__ = 'MIT'
from typing import * # pyright: reportWildcardImportFromLibrary=false
# Explictly import these.
from typing import cast, IO
from pathlib import Path
import argparse
import asyncio
import base64
import binascii
import collections
import contextlib
import functools
import hashlib
import json
import os
import re
import shlex
import shutil
import sys
import tempfile
import textwrap
import types
import urllib.parse
import urllib.request
DEFAULT_PART_SIZE = 4096
GIT_SCHEMES: Dict[str, Dict[str, str]] = {
'github': {
'scheme': 'https',
'netloc': 'github.com'
},
'gitlab': {
'scheme': 'https',
'netloc': 'gitlab.com'
},
'bitbucket': {
'scheme': 'https',
'netloc': 'bitbucket.com'
},
'git': {},
'git+http': {
'scheme': 'http'
},
'git+https': {
'scheme': 'https'
},
}
GIT_URL_PATTERNS = [
re.compile(r'^git:'),
re.compile(r'^git\+.+:'),
re.compile(r'^ssh:'),
re.compile(r'^https?:.+\.git$'),
re.compile(r'^https?:.+\.git#.+'),
]
GIT_URL_HOSTS = ['github.com', 'gitlab.com', 'bitbucket.com', 'bitbucket.org']
NPM_MIRROR = 'https://unpkg.com/'
class SemVer(NamedTuple):
# Note that we ignore the metadata part, since all we do is version
# comparisons.
_SEMVER_RE = re.compile(r'(\d+)\.(\d+)\.(\d+)(?:-(?P<prerelease>[^+]+))?')
@functools.total_ordering
class Prerelease:
def __init__(self, parts: Tuple[Union[str, int]]) -> None:
self._parts = parts
@staticmethod
def parse(rel: str) -> Optional['SemVer.Prerelease']:
if not rel:
return None
parts: List[Union[str, int]] = []
for part in rel.split('.'):
try:
part = int(part)
except ValueError:
pass
parts.append(part)
return SemVer.Prerelease(tuple(parts))
@property
def parts(self) -> Tuple[Union[str, int]]:
return self._parts
def __lt__(self, other: 'SemVer.Prerelease'):
for our_part, other_part in zip(self._parts, other._parts):
if type(our_part) == type(other_part):
if our_part < other_part: # type: ignore
return True
# Number parts are always less than strings.
elif isinstance(our_part, int):
return True
return len(self._parts) < len(other._parts)
def __repr__(self) -> str:
return f'Prerelease(parts={self.parts})'
major: int
minor: int
patch: int
prerelease: Optional[Prerelease] = None
@staticmethod
def parse(version: str) -> 'SemVer':
match = SemVer._SEMVER_RE.match(version)
if match is None:
raise ValueError(f'Invalid semver version: {version}')
major, minor, patch = map(int, match.groups()[:3])
prerelease = SemVer.Prerelease.parse(match.group('prerelease'))
return SemVer(major, minor, patch, prerelease)
class Cache:
instance: 'Cache'
@classmethod
def get_working_instance_if(cls, condition: bool) -> 'Cache':
return cls.instance if condition else NullCache()
class BucketReader:
def read_parts(self, size: int = DEFAULT_PART_SIZE) -> Iterator[bytes]:
raise NotImplementedError
def read_all(self) -> bytes:
raise NotImplementedError
def close(self) -> None:
raise NotImplementedError
def __enter__(self) -> 'Cache.BucketReader':
return self
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[types.TracebackType]) -> None:
self.close()
class BucketWriter:
def write(self, data: bytes) -> None:
raise NotImplementedError
def cancel(self) -> None:
raise NotImplementedError
def seal(self) -> None:
raise NotImplementedError
def __enter__(self) -> 'Cache.BucketWriter':
return self
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[types.TracebackType]) -> None:
if traceback is None:
self.seal()
else:
self.cancel()
class BucketRef:
def __init__(self, key: str) -> None:
self.key = key
def open_read(self) -> Optional['Cache.BucketReader']:
raise NotImplementedError
def open_write(self) -> 'Cache.BucketWriter':
raise NotImplementedError
def get(self, key: str) -> BucketRef:
raise NotImplementedError
class NullCache(Cache):
class NullBucketWriter(Cache.BucketWriter):
def write(self, data: bytes) -> None:
pass
def cancel(self) -> None:
pass
def seal(self) -> None:
pass
class NullBucketRef(Cache.BucketRef):
def __init__(self, key: str) -> None:
super().__init__(key)
def open_read(self) -> Optional[Cache.BucketReader]:
return None
def open_write(self) -> Cache.BucketWriter:
return NullCache.NullBucketWriter()
def get(self, key: str) -> Cache.BucketRef:
return NullCache.NullBucketRef(key)
class FilesystemBasedCache(Cache):
_SUBDIR = 'flatpak-node-generator'
_KEY_CHAR_ESCAPE_RE = re.compile(r'[^A-Za-z0-9._\-]')
@staticmethod
def _escape_key(key: str) -> str:
return FilesystemBasedCache._KEY_CHAR_ESCAPE_RE.sub(
lambda m: f'_{ord(m.group()):02X}', key)
class FilesystemBucketReader(Cache.BucketReader):
def __init__(self, file: IO[bytes]) -> None:
self.file = file
def close(self) -> None:
self.file.close()
def read_parts(self, size: int = DEFAULT_PART_SIZE) -> Iterator[bytes]:
while True:
data = self.file.read(size)
if not data:
break
yield data
def read_all(self) -> bytes:
return self.file.read()
class FilesystemBucketWriter(Cache.BucketWriter):
def __init__(self, file: IO[bytes], temp: Path, target: Path) -> None:
self.file = file
self.temp = temp
self.target = target
def write(self, data: bytes) -> None:
self.file.write(data)
def cancel(self) -> None:
self.file.close()
self.temp.unlink()
def seal(self) -> None:
self.file.close()
self.temp.rename(self.target)
class FilesystemBucketRef(Cache.BucketRef):
def __init__(self, key: str, cache_root: Path) -> None:
super().__init__(key)
self._cache_root = cache_root
self._cache_path = self._cache_root / \
FilesystemBasedCache._escape_key(key)
def open_read(self) -> Optional[Cache.BucketReader]:
try:
fp = self._cache_path.open('rb')
except FileNotFoundError:
return None
else:
return FilesystemBasedCache.FilesystemBucketReader(fp)
def open_write(self) -> Cache.BucketWriter:
target = self._cache_path
if not target.parent.exists():
target.parent.mkdir(exist_ok=True, parents=True)
fd, temp = tempfile.mkstemp(
dir=self._cache_root, prefix='__temp__')
return FilesystemBasedCache.FilesystemBucketWriter(os.fdopen(fd, 'wb'),
Path(temp), target)
@property
def _cache_root(self) -> Path:
xdg_cache_home = os.environ.get(
'XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
return Path(xdg_cache_home) / self._SUBDIR
def get(self, key: str) -> Cache.BucketRef:
return FilesystemBasedCache.FilesystemBucketRef(key, self._cache_root)
Cache.instance = NullCache()
class Requests:
instance: 'Requests'
DEFAULT_RETRIES = 5
retries: ClassVar[int] = DEFAULT_RETRIES
@property
def is_async(self) -> bool:
raise NotImplementedError
def __get_cache_bucket(self, cachable: bool, url: str) -> Cache.BucketRef:
return Cache.get_working_instance_if(cachable).get(f'requests:{url}')
async def _read_parts(self,
url: str,
size: int = DEFAULT_PART_SIZE) -> AsyncIterator[bytes]:
raise NotImplementedError
yield b'' # Silence mypy.
async def _read_all(self, url: str) -> bytes:
raise NotImplementedError
async def read_parts(self,
url: str,
*,
cachable: bool,
size: int = DEFAULT_PART_SIZE) -> AsyncIterator[bytes]:
bucket = self.__get_cache_bucket(cachable, url)
bucket_reader = bucket.open_read()
if bucket_reader is not None:
for part in bucket_reader.read_parts(size):
yield part
return
for i in range(1, Requests.retries + 1):
try:
with bucket.open_write() as bucket_writer:
async for part in self._read_parts(url, size):
bucket_writer.write(part)
yield part
return
except Exception:
if i == Requests.retries:
raise
async def read_all(self, url: str, *, cachable: bool = False) -> bytes:
bucket = self.__get_cache_bucket(cachable, url)
bucket_reader = bucket.open_read()
if bucket_reader is not None:
return bucket_reader.read_all()
for i in range(1, Requests.retries + 1):
try:
with bucket.open_write() as bucket_writer:
data = await self._read_all(url)
bucket_writer.write(data)
return data
except Exception:
if i == Requests.retries:
raise
assert False
class UrllibRequests(Requests):
@property
def is_async(self) -> bool:
return False
async def _read_parts(self,
url: str,
size: int = DEFAULT_PART_SIZE) -> AsyncIterator[bytes]:
with urllib.request.urlopen(url) as response:
while True:
data = response.read(size)
if not data:
return
yield data
async def _read_all(self, url: str) -> bytes:
with urllib.request.urlopen(url) as response:
return cast(bytes, response.read())
class StubRequests(Requests):
@property
def is_async(self) -> bool:
return True
async def _read_parts(self,
url: str,
size: int = DEFAULT_PART_SIZE) -> AsyncIterator[bytes]:
yield b''
async def _read_all(self, url: str) -> bytes:
return b''
Requests.instance = UrllibRequests()
try:
import aiohttp
class AsyncRequests(Requests):
@property
def is_async(self) -> bool:
return True
@contextlib.asynccontextmanager
async def _open_stream(self, url: str) -> AsyncIterator[aiohttp.StreamReader]:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
yield response.content
async def _read_parts(self,
url: str,
size: int = DEFAULT_PART_SIZE) -> AsyncIterator[bytes]:
async with self._open_stream(url) as stream:
while True:
data = await stream.read(size)
if not data:
return
yield data
async def _read_all(self, url: str) -> bytes:
async with self._open_stream(url) as stream:
return await stream.read()
Requests.instance = AsyncRequests()
except ImportError:
pass
class Integrity(NamedTuple):
algorithm: str
digest: str
@staticmethod
def parse(value: str) -> 'Integrity':
algorithm, encoded_digest = value.split('-', 1)
assert algorithm.startswith('sha'), algorithm
digest = binascii.hexlify(base64.b64decode(encoded_digest)).decode()
return Integrity(algorithm, digest)
@staticmethod
def from_sha1(sha1: str) -> 'Integrity':
assert len(sha1) == 40, f'Invalid length of sha1: {sha1}'
return Integrity('sha1', sha1)
@staticmethod
def generate(data: Union[str, bytes], *, algorithm: str = 'sha256') -> 'Integrity':
builder = IntegrityBuilder(algorithm)
builder.update(data)
return builder.build()
@staticmethod
def from_json_object(data: Any) -> 'Integrity':
return Integrity(algorithm=data['algorithm'], digest=data['digest'])
def to_json_object(self) -> Any:
return {'algorithm': self.algorithm, 'digest': self.digest}
def to_base64(self) -> str:
return base64.b64encode(binascii.unhexlify(self.digest)).decode()
class IntegrityBuilder:
def __init__(self, algorithm: str = 'sha256') -> None:
self.algorithm = algorithm
self._hasher = hashlib.new(algorithm)
def update(self, data: Union[str, bytes]) -> None:
data_bytes: bytes
if isinstance(data, str):
data_bytes = data.encode()
else:
data_bytes = data
self._hasher.update(data_bytes)
def build(self) -> Integrity:
return Integrity(algorithm=self.algorithm, digest=self._hasher.hexdigest())
class RemoteUrlMetadata(NamedTuple):
integrity: Integrity
size: int
@staticmethod
def __get_cache_bucket(cachable: bool, kind: str, url: str) -> Cache.BucketRef:
return Cache.get_working_instance_if(cachable).get(
f'remote-url-metadata:{kind}:{url}')
@staticmethod
def from_json_object(data: Any) -> 'RemoteUrlMetadata':
return RemoteUrlMetadata(integrity=Integrity.from_json_object(data['integrity']),
size=data['size'])
@classmethod
async def get(cls,
url: str,
*,
cachable: bool,
integrity_algorithm: str = 'sha256') -> 'RemoteUrlMetadata':
bucket = cls.__get_cache_bucket(cachable, 'full', url)
bucket_reader = bucket.open_read()
if bucket_reader is not None:
data = json.loads(bucket_reader.read_all())
return RemoteUrlMetadata.from_json_object(data)
builder = IntegrityBuilder(integrity_algorithm)
size = 0
async for part in Requests.instance.read_parts(url, cachable=False):
builder.update(part)
size += len(part)
metadata = RemoteUrlMetadata(integrity=builder.build(), size=size)
with bucket.open_write() as bucket_writer:
bucket_writer.write(json.dumps(
metadata.to_json_object()).encode('ascii'))
return metadata
@classmethod
async def get_size(cls, url: str, *, cachable: bool) -> int:
bucket = cls.__get_cache_bucket(cachable, 'size', url)
bucket_reader = bucket.open_read()
if bucket_reader is not None:
return int(bucket_reader.read_all())
size = 0
async for part in Requests.instance.read_parts(url, cachable=False):
size += len(part)
with bucket.open_write() as bucket_writer:
bucket_writer.write(str(size).encode('ascii'))
return size
def to_json_object(self) -> Any:
return {'integrity': self.integrity.to_json_object(), 'size': self.size}
class ResolvedSource(NamedTuple):
resolved: str
integrity: Optional[Integrity]
async def retrieve_integrity(self) -> Integrity:
if self.integrity is not None:
return self.integrity
else:
url = self.resolved
assert url is not None, 'registry source has no resolved URL'
metadata = await RemoteUrlMetadata.get(url, cachable=True)
return metadata.integrity
class UnresolvedRegistrySource:
pass
class GitSource(NamedTuple):
original: str
url: str
commit: str
from_: Optional[str]
PackageSource = Union[ResolvedSource, UnresolvedRegistrySource, GitSource]
class Package(NamedTuple):
name: str
version: str
source: PackageSource
lockfile: Path
class NodeHeaders(NamedTuple):
target: str
runtime: str
disturl: str
@classmethod
def with_defaults(cls,
target: str,
runtime: Optional[str] = None,
disturl: Optional[str] = None):
if runtime is None:
runtime = 'node'
if disturl is None:
if runtime == 'node':
disturl = 'http://nodejs.org/dist'
elif runtime == 'electron':
disturl = 'https://www.electronjs.org/headers'
else:
raise ValueError(
f'Can\'t guess `disturl` for {runtime} version {target}')
return cls(target, runtime, disturl)
@property
def url(self) -> str:
# TODO it may be better to retrieve urls from disturl/index.json
return f'{self.disturl}/v{self.target}/node-v{self.target}-headers.tar.gz'
@property
def install_version(self) -> str:
# FIXME not sure if this static value will always work
return "9"
class ManifestGenerator(ContextManager['ManifestGenerator']):
MAX_GITHUB_SIZE = 49 * 1000 * 1000
JSON_INDENT = 4
def __init__(self) -> None:
# Store the dicts as a set of tuples, then rebuild the dict when returning it.
# That way, we ensure uniqueness.
self._sources: Set[Tuple[Tuple[str, Any], ...]] = set()
self._commands: List[str] = []
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType]) -> None:
self._finalize()
@property
def data_root(self) -> Path:
return Path('flatpak-node')
@property
def tmp_root(self) -> Path:
return self.data_root / 'tmp'
@property
def source_count(self) -> int:
return len(self._sources)
def ordered_sources(self) -> Iterator[Dict[Any, Any]]:
return map(dict, sorted(self._sources)) # type: ignore
def split_sources(self) -> Iterator[List[Dict[Any, Any]]]:
BASE_CURRENT_SIZE = len('[\n]')
current_size = BASE_CURRENT_SIZE
current: List[Dict[Any, Any]] = []
for source in self.ordered_sources():
# Generate one source by itself, then check the length without the closing and
# opening brackets.
source_json = json.dumps(
[source], indent=ManifestGenerator.JSON_INDENT)
source_json_len = len('\n'.join(source_json.splitlines()[1:-1]))
if current_size + source_json_len >= ManifestGenerator.MAX_GITHUB_SIZE:
yield current
current = []
current_size = BASE_CURRENT_SIZE
current.append(source)
current_size += source_json_len
if current:
yield current
def _add_source(self, source: Dict[str, Any]) -> None:
self._sources.add(tuple(source.items()))
def _add_source_with_destination(self,
source: Dict[str, Any],
destination: Optional[Path],
*,
is_dir: bool,
only_arches: Optional[List[str]] = None) -> None:
if destination is not None:
if is_dir:
source['dest'] = str(destination)
else:
source['dest-filename'] = destination.name
if len(destination.parts) > 1:
source['dest'] = str(destination.parent)
if only_arches:
source['only-arches'] = tuple(only_arches)
self._add_source(source)
def add_url_source(self,
url: str,
integrity: Integrity,
destination: Optional[Path] = None,
*,
only_arches: Optional[List[str]] = None) -> None:
source: Dict[str, Any] = {
'type': 'file',
'url': url,
integrity.algorithm: integrity.digest
}
self._add_source_with_destination(source,
destination,
is_dir=False,
only_arches=only_arches)
def add_archive_source(self,
url: str,
integrity: Integrity,
destination: Optional[Path] = None,
only_arches: Optional[List[str]] = None,
strip_components: int = 1) -> None:
source: Dict[str, Any] = {
'type': 'archive',
'url': url,
'strip-components': strip_components,
integrity.algorithm: integrity.digest
}
self._add_source_with_destination(source,
destination,
is_dir=True,
only_arches=only_arches)
def add_data_source(self, data: Union[str, bytes], destination: Path) -> None:
if isinstance(data, bytes):
source = {
'type': 'inline',
'contents': base64.b64encode(data).decode('ascii'),
'base64': True,
}
else:
assert isinstance(data, str)
source = {
'type': 'inline',
'contents': data,
}
self._add_source_with_destination(source, destination, is_dir=False)
def add_git_source(self,
url: str,
commit: str,
destination: Optional[Path] = None) -> None:
source = {'type': 'git', 'url': url, 'commit': commit}
self._add_source_with_destination(source, destination, is_dir=True)
def add_script_source(self, commands: List[str], destination: Path) -> None:
source = {'type': 'script', 'commands': tuple(commands)}
self._add_source_with_destination(source, destination, is_dir=False)
def add_shell_source(self,
commands: List[str],
destination: Optional[Path] = None,
only_arches: Optional[List[str]] = None) -> None:
"""This might be slow for multiple instances. Use `add_command()` instead."""
source = {'type': 'shell', 'commands': tuple(commands)}
self._add_source_with_destination(source,
destination=destination,
only_arches=only_arches,
is_dir=True)
def add_command(self, command: str) -> None:
self._commands.append(command)
def _finalize(self) -> None:
if self._commands:
self._add_source(
{'type': 'shell', 'commands': tuple(self._commands)})
class LockfileProvider:
def parse_git_source(self, version: str, from_: Optional[str] = None) -> GitSource:
# https://github.com/microsoft/pyright/issues/1589
# pyright: reportPrivateUsage=false
original_url = urllib.parse.urlparse(version)
assert original_url.scheme and original_url.path and original_url.fragment
replacements = GIT_SCHEMES.get(original_url.scheme, {})
new_url = original_url._replace(fragment='', **replacements)
# Replace e.g. git:github.com/owner/repo with git://github.com/owner/repo
if not new_url.netloc:
path = new_url.path.split('/')
new_url = new_url._replace(netloc=path[0], path='/'.join(path[1:]))
return GitSource(original=original_url.geturl(),
url=new_url.geturl(),
commit=original_url.fragment,
from_=from_)
def process_lockfile(self, lockfile: Path) -> Iterator[Package]:
raise NotImplementedError()
class RCFileProvider:
RCFILE_NAME: str
def parse_rcfile(self, rcfile: Path) -> Dict[str, str]:
with open(rcfile, 'r') as r:
rcfile_text = r.read()
parser_re = re.compile(r'^(?!#|;)(\S+)(?:\s+|\s*=\s*)(?:"(.+)"|(\S+))$',
re.MULTILINE)
result: Dict[str, str] = {}
for key, quoted_val, val in parser_re.findall(rcfile_text):
result[key] = quoted_val or val
return result
def get_node_headers(self, rcfile: Path) -> Optional[NodeHeaders]:
rc_data = self.parse_rcfile(rcfile)
if 'target' not in rc_data:
return None
target = rc_data['target']
runtime = rc_data.get('runtime')
disturl = rc_data.get('disturl')
assert isinstance(runtime, str) and isinstance(disturl, str)
return NodeHeaders.with_defaults(target, runtime, disturl)
class ModuleProvider(ContextManager['ModuleProvider']):
async def generate_package(self, package: Package) -> None:
raise NotImplementedError()
class ElectronBinaryManager:
class Arch(NamedTuple):
electron: str
flatpak: str
class Binary(NamedTuple):
filename: str
url: str
integrity: Integrity
arch: Optional['ElectronBinaryManager.Arch'] = None
ELECTRON_ARCHES_TO_FLATPAK = {
'ia32': 'i386',
'x64': 'x86_64',
'armv7l': 'arm',
'arm64': 'aarch64',
}
INTEGRITY_BASE_FILENAME = 'SHASUMS256.txt'
def __init__(self, version: str, base_url: str, integrities: Dict[str,
Integrity]) -> None:
self.version = version
self.base_url = base_url
self.integrities = integrities
def child_url(self, child: str) -> str:
return f'{self.base_url}/{child}'
def find_binaries(self, binary: str) -> Iterator['ElectronBinaryManager.Binary']:
for electron_arch, flatpak_arch in self.ELECTRON_ARCHES_TO_FLATPAK.items():
binary_filename = f'{binary}-v{self.version}-linux-{electron_arch}.zip'
binary_url = self.child_url(binary_filename)
arch = ElectronBinaryManager.Arch(electron=electron_arch,
flatpak=flatpak_arch)
yield ElectronBinaryManager.Binary(
filename=binary_filename,
url=binary_url,
integrity=self.integrities[binary_filename],
arch=arch)
@property
def integrity_file(self) -> 'ElectronBinaryManager.Binary':
return ElectronBinaryManager.Binary(
filename=f'SHASUMS256.txt-{self.version}',
url=self.child_url(self.INTEGRITY_BASE_FILENAME),
integrity=self.integrities[self.INTEGRITY_BASE_FILENAME])
@staticmethod
async def for_version(version: str) -> 'ElectronBinaryManager':
base_url = f'https://github.com/electron/electron/releases/download/v{version}'
integrity_url = f'{base_url}/{ElectronBinaryManager.INTEGRITY_BASE_FILENAME}'
integrity_data = (await Requests.instance.read_all(integrity_url,
cachable=True)).decode()
integrities: Dict[str, Integrity] = {}
for line in integrity_data.splitlines():
digest, star_filename = line.split()
filename = star_filename.strip('*')
integrities[filename] = Integrity(
algorithm='sha256', digest=digest)
integrities[ElectronBinaryManager.INTEGRITY_BASE_FILENAME] = (
Integrity.generate(integrity_data))
return ElectronBinaryManager(version=version,
base_url=base_url,
integrities=integrities)
class SpecialSourceProvider:
class Options(NamedTuple):
node_chromedriver_from_electron: str
electron_ffmpeg: str
electron_node_headers: bool
electron_from_rcfile: bool
nwjs_version: str
nwjs_node_headers: bool
nwjs_ffmpeg: bool
xdg_layout: bool
def __init__(self, gen: ManifestGenerator, options: Options):
self.gen = gen
self.node_chromedriver_from_electron = options.node_chromedriver_from_electron
self.electron_ffmpeg = options.electron_ffmpeg
self.electron_node_headers = options.electron_node_headers
self.electron_bins_for_headers = options.electron_from_rcfile
self.nwjs_version = options.nwjs_version
self.nwjs_node_headers = options.nwjs_node_headers
self.nwjs_ffmpeg = options.nwjs_ffmpeg
self.xdg_layout = options.xdg_layout
@property
def electron_cache_dir(self) -> Path:
if self.xdg_layout:
return self.gen.data_root / 'cache' / 'electron'
return self.gen.data_root / 'electron-cache'
@property
def gyp_dir(self) -> Path:
return self.gen.data_root / 'cache' / 'node-gyp'
def _add_electron_cache_downloads(self,
manager: ElectronBinaryManager,
binary_name: str,
*,
add_integrities: bool = True) -> None:
electron_cache_dir = self.electron_cache_dir
for binary in manager.find_binaries(binary_name):
assert binary.arch is not None
self.gen.add_url_source(binary.url,
binary.integrity,
electron_cache_dir / binary.filename,
only_arches=[binary.arch.flatpak])
# Symlinks for @electron/get, which stores electron zips in a subdir
if self.xdg_layout:
sanitized_url = ''.join(c for c in binary.url if c not in '/:')
# And for @electron/get >= 1.12.4 its sha256 hash of url dirname
url = urllib.parse.urlparse(binary.url)
url_dir = urllib.parse.urlunparse(
url._replace(path=os.path.dirname(url.path)))
url_hash = hashlib.sha256(url_dir.encode()).hexdigest()
self.gen.add_shell_source([
f'mkdir -p "{sanitized_url}"',
f'ln -s "../{binary.filename}" "{sanitized_url}/{binary.filename}"',
f'mkdir -p "{url_hash}"',
f'ln -s "../{binary.filename}" "{url_hash}/{binary.filename}"'
],
destination=electron_cache_dir,
only_arches=[binary.arch.flatpak])
if add_integrities:
integrity_file = manager.integrity_file
self.gen.add_url_source(integrity_file.url, integrity_file.integrity,
electron_cache_dir / integrity_file.filename)
async def __add_electron(self, version: str) -> None:
manager = await ElectronBinaryManager.for_version(version)
self._add_electron_cache_downloads(manager, 'electron')
if self.electron_ffmpeg is not None:
if self.electron_ffmpeg == 'archive':
self._add_electron_cache_downloads(manager,
'ffmpeg',
add_integrities=False)
elif self.electron_ffmpeg == 'lib':
for binary in manager.find_binaries('ffmpeg'):
assert binary.arch is not None
self.gen.add_archive_source(binary.url,
binary.integrity,
destination=self.gen.data_root,
only_arches=[binary.arch.flatpak])
else:
assert False, self.electron_ffmpeg
async def _handle_electron(self, package: Package) -> None:
await self.__add_electron(package.version)
def _handle_gulp_atom_electron(self, package: Package) -> None:
# Versions after 1.22.0 use @electron/get and don't need this
if SemVer.parse(package.version) <= SemVer.parse('1.22.0'):
cache_path = self.gen.data_root / 'tmp' / \
'gulp-electron-cache' / 'atom' / 'electron'
self.gen.add_command(f'mkdir -p "{cache_path.parent}"')
self.gen.add_command(
f'ln -sfTr "{self.electron_cache_dir}" "{cache_path}"')
async def _handle_electron_headers(self, package: Package) -> None:
node_headers = NodeHeaders.with_defaults(runtime='electron',
target=package.version)
if self.xdg_layout:
node_gyp_headers_dir = self.gen.data_root / \
'cache' / 'node-gyp' / package.version
else:
node_gyp_headers_dir = self.gen.data_root / 'node-gyp' / 'electron-current'
await self.generate_node_headers(node_headers, dest=node_gyp_headers_dir)
async def _get_chromedriver_binary_version(self, package: Package) -> str:
# Note: node-chromedriver seems to not have tagged all releases on GitHub, so
# just use unpkg instead.
url = urllib.parse.urljoin(NPM_MIRROR,
f'chromedriver@{package.version}/lib/chromedriver')
js = await Requests.instance.read_all(url, cachable=True)
# XXX: a tad ugly
match = re.search(r"exports\.version = '([^']+)'", js.decode())
assert match is not None, f'Failed to get ChromeDriver binary version from {url}'
return match.group(1)
async def _handle_electron_chromedriver(self, package: Package) -> None:
manager = await ElectronBinaryManager.for_version(package.version)
self._add_electron_cache_downloads(manager, 'chromedriver')
async def _handle_node_chromedriver(self, package: Package) -> None:
version = await self._get_chromedriver_binary_version(package)
destination = self.gen.data_root / 'chromedriver'
if self.node_chromedriver_from_electron is not None:
manager = await ElectronBinaryManager.for_version(
self.node_chromedriver_from_electron)
for binary in manager.find_binaries('chromedriver'):
assert binary.arch is not None
self.gen.add_archive_source(binary.url,
binary.integrity,
destination=destination,
only_arches=[binary.arch.flatpak])
else:
url = (f'https://chromedriver.storage.googleapis.com/{version}/'
'chromedriver_linux64.zip')
metadata = await RemoteUrlMetadata.get(url, cachable=True)
self.gen.add_archive_source(url,
metadata.integrity,
destination=destination,
only_arches=['x86_64'])
async def _add_nwjs_cache_downloads(self, version: str, flavor: str = 'normal'):
assert not version.startswith('v')
nwjs_mirror = 'https://dl.nwjs.io'
ffmpeg_dl_base = 'https://github.com/iteufel/nwjs-ffmpeg-prebuilt/releases/download'
if self.nwjs_node_headers:
headers_dl_url = f'{nwjs_mirror}/v{version}/nw-headers-v{version}.tar.gz'
headers_dest = self.gen.data_root / 'node-gyp' / 'nwjs-current'
headers_metadata = await RemoteUrlMetadata.get(headers_dl_url, cachable=True)
self.gen.add_archive_source(headers_dl_url,
headers_metadata.integrity,
destination=headers_dest)
if flavor == 'normal':
filename_base = 'nwjs'
else:
filename_base = f'nwjs-{flavor}'
destdir = self.gen.data_root / 'nwjs-cache'
nwjs_arch_map = [
('x86_64', 'linux-x64', 'linux64'),
('i386', 'linux-ia32', 'linux32'),
]
for flatpak_arch, nwjs_arch, platform in nwjs_arch_map:
filename = f'{filename_base}-v{version}-{nwjs_arch}.tar.gz'
dl_url = f'{nwjs_mirror}/v{version}/{filename}'
metadata = await RemoteUrlMetadata.get(dl_url, cachable=True)
dest = destdir / f'{version}-{flavor}' / platform
self.gen.add_archive_source(dl_url,
metadata.integrity,
destination=dest,
only_arches=[flatpak_arch])
if self.nwjs_ffmpeg:
ffmpeg_dl_url = f'{ffmpeg_dl_base}/{version}/{version}-{nwjs_arch}.zip'
ffmpeg_metadata = await RemoteUrlMetadata.get(ffmpeg_dl_url,
cachable=True)
self.gen.add_archive_source(ffmpeg_dl_url,
ffmpeg_metadata.integrity,
destination=dest,
strip_components=0,
only_arches=[flatpak_arch])
async def _handle_nw_builder(self, package: Package) -> None:
if self.nwjs_version:
version = self.nwjs_version
else:
versions_json = json.loads(await Requests.instance.read_all(
'https://nwjs.io/versions.json', cachable=False))
version = versions_json['latest'].lstrip('v')
await self._add_nwjs_cache_downloads(version)
self.gen.add_data_source(
version, destination=self.gen.data_root / 'nwjs-version')
async def _handle_dugite_native(self, package: Package) -> None:
dl_json_url = urllib.parse.urljoin(
NPM_MIRROR, f'{package.name}@{package.version}/script/embedded-git.json')
dl_json = json.loads(await Requests.instance.read_all(dl_json_url, cachable=True))
dugite_arch_map = {
'x86_64': 'linux-x64',
}
destdir = self.gen.data_root / 'tmp'
for arch, dugite_arch in dugite_arch_map.items():
url = dl_json[dugite_arch]['url']
filename = dl_json[dugite_arch]['name']
integrity = Integrity(algorithm='sha256',
digest=dl_json[dugite_arch]['checksum'])
self.gen.add_url_source(url,
integrity,
destination=destdir / filename,
only_arches=[arch])
async def _handle_ripgrep_prebuilt(self, package: Package) -> None:
async def get_ripgrep_tag(version: str) -> str:
url = f'https://github.com/microsoft/vscode-ripgrep/raw/v{version}/lib/postinstall.js'
tag_re = re.compile(r"VERSION\s+=\s+'(v[\d.-]+)';")
resp = await Requests.instance.read_all(url, cachable=True)
match = tag_re.search(resp.decode())
assert match is not None
return match.group(1)
tag = await get_ripgrep_tag(package.version)
ripgrep_arch_map = {
'x86_64': 'x86_64-unknown-linux-musl',
'i386': 'i686-unknown-linux-musl',
'arm': 'arm-unknown-linux-gnueabihf',
'aarch64': 'aarch64-unknown-linux-gnu'
}
destdir = self.gen.data_root / 'tmp' / \
f'vscode-ripgrep-cache-{package.version}'
for arch, ripgrep_arch in ripgrep_arch_map.items():
filename = f'ripgrep-{tag}-{ripgrep_arch}.tar.gz'
url = f'https://github.com/microsoft/ripgrep-prebuilt/releases/download/{tag}/{filename}'
metadata = await RemoteUrlMetadata.get(url, cachable=True)
self.gen.add_url_source(url,
metadata.integrity,
destination=destdir / filename,
only_arches=[arch])
async def _handle_playwright(self, package: Package) -> None:
base_url = f'https://github.com/microsoft/playwright/raw/v{package.version}/'
if SemVer.parse(package.version) >= SemVer.parse('1.16.0'):
browsers_json_url = base_url + 'packages/playwright-core/browsers.json'
else:
browsers_json_url = base_url + 'browsers.json'
browsers_json = json.loads(await Requests.instance.read_all(browsers_json_url,
cachable=True))
for browser in browsers_json['browsers']:
if not browser.get('installByDefault', True):
continue
name = browser['name']
revision = int(browser['revision'])
if name == 'chromium':
if revision < 792639:
url_tp = 'https://storage.googleapis.com/chromium-browser-snapshots/Linux_x64/%d/%s'
dl_file = 'chrome-linux.zip'
else:
url_tp = 'https://playwright.azureedge.net/builds/chromium/%d/%s'
dl_file = 'chromium-linux.zip'
elif name == 'firefox':
url_tp = 'https://playwright.azureedge.net/builds/firefox/%d/%s'
if revision < 1140:
dl_file = 'firefox-linux.zip'
else:
dl_file = 'firefox-ubuntu-18.04.zip'
elif name == 'webkit':
url_tp = 'https://playwright.azureedge.net/builds/webkit/%d/%s'
if revision < 1317:
dl_file = 'minibrowser-gtk-wpe.zip'
else:
dl_file = 'webkit-ubuntu-20.04.zip'
elif name == 'ffmpeg':
url_tp = 'https://playwright.azureedge.net/builds/ffmpeg/%d/%s'
dl_file = 'ffmpeg-linux.zip'
else:
raise ValueError(f'Unknown playwright browser {name}')
dl_url = url_tp % (revision, dl_file)
metadata = await RemoteUrlMetadata.get(dl_url, cachable=True)
destdir = self.gen.data_root / 'cache' / \
'ms-playwright' / f'{name}-{revision}'
self.gen.add_archive_source(dl_url,
metadata.integrity,
destination=destdir,
strip_components=0)
# Arbitrary string here; flatpak-builder segfaults on empty data: url
self.gen.add_data_source("flatpak-node-cache",
destination=destdir / 'INSTALLATION_COMPLETE')
async def _handle_esbuild(self, package: Package) -> None:
pkg_names = {
'x86_64': 'esbuild-linux-64',
'i386': 'esbuild-linux-32',
'arm': 'esbuild-linux-arm',
'aarch64': 'esbuild-linux-arm64'
}
for flatpak_arch, pkg_name in pkg_names.items():
dl_url = f'https://registry.npmjs.org/{pkg_name}/-/{pkg_name}-{package.version}.tgz'
metadata = await RemoteUrlMetadata.get(dl_url, cachable=True)
cache_dst = self.gen.data_root / 'cache' / 'esbuild'
archive_dst = cache_dst / '.package' / \
f'{pkg_name}@{package.version}'
bin_src = archive_dst / 'bin' / 'esbuild'
bin_dst = cache_dst / 'bin' / f'{pkg_name}@{package.version}'
self.gen.add_archive_source(dl_url,
metadata.integrity,
destination=archive_dst,
only_arches=[flatpak_arch],
strip_components=1)
cmd = [
f'mkdir -p "{bin_dst.parent.relative_to(cache_dst)}"',
f'cp "{bin_src.relative_to(cache_dst)}" "{bin_dst.relative_to(cache_dst)}"',
f'ln -sf "{bin_dst.name}" "bin/esbuild-current"'
]
self.gen.add_shell_source(cmd,
only_arches=[flatpak_arch],
destination=cache_dst)
def _handle_electron_builder(self, package: Package) -> None:
destination = self.gen.data_root / 'electron-builder-arch-args.sh'
script: List[str] = []
script.append('case "$FLATPAK_ARCH" in')
for electron_arch, flatpak_arch in (
ElectronBinaryManager.ELECTRON_ARCHES_TO_FLATPAK.items()):
script.append(f'"{flatpak_arch}")')
script.append(
f' export ELECTRON_BUILDER_ARCH_ARGS="--{electron_arch}"')
script.append(' ;;')
script.append('esac')
self.gen.add_script_source(script, destination)
async def generate_node_headers(self,
node_headers: NodeHeaders,
dest: Optional[Path] = None):
url = node_headers.url
install_version = node_headers.install_version
if dest is None:
dest = self.gyp_dir / node_headers.target
metadata = await RemoteUrlMetadata.get(url, cachable=True)
self.gen.add_archive_source(url, metadata.integrity, destination=dest)
self.gen.add_data_source(
install_version, destination=dest / 'installVersion')
if self.electron_bins_for_headers and node_headers.runtime == "electron":
await self.__add_electron(node_headers.target)
async def generate_special_sources(self, package: Package) -> None:
if isinstance(Requests.instance, StubRequests):
# This is going to crash and burn.
return
if package.name == 'electron':
await self._handle_electron(package)
if self.electron_node_headers:
await self._handle_electron_headers(package)
elif package.name == 'electron-chromedriver':
await self._handle_electron_chromedriver(package)
elif package.name == 'chromedriver':
await self._handle_node_chromedriver(package)
elif package.name == 'electron-builder':
self._handle_electron_builder(package)
elif package.name == 'gulp-atom-electron':
self._handle_gulp_atom_electron(package)
elif package.name == 'nw-builder':
await self._handle_nw_builder(package)
elif package.name in {'dugite', '@shiftkey/dugite'}:
await self._handle_dugite_native(package)
elif package.name in {'vscode-ripgrep', '@vscode/ripgrep'}:
await self._handle_ripgrep_prebuilt(package)
elif package.name == 'playwright':
await self._handle_playwright(package)
elif package.name == 'esbuild':
await self._handle_esbuild(package)
class NpmLockfileProvider(LockfileProvider):
_ALIAS_RE = re.compile(r'^npm:(.[^@]*)@(.*)$')
class Options(NamedTuple):
no_devel: bool
def __init__(self, options: Options):
self.no_devel = options.no_devel
def process_dependencies(
self, lockfile: Path,
dependencies: Dict[str, Dict[Any, Any]]) -> Iterator[Package]:
for name, info in dependencies.items():
if info.get('dev') and self.no_devel:
continue
elif info.get('bundled'):
continue
version: str = info['version']
alias_match = self._ALIAS_RE.match(version)
if alias_match is not None:
name, version = alias_match.groups()
source: PackageSource
if info.get('from'):
git_source = self.parse_git_source(version, info['from'])
source = git_source
else:
# NOTE: npm ignores the resolved field and just uses the provided
# registry instead. We follow the same behavior here.
source = UnresolvedRegistrySource()
yield Package(name=name, version=version, source=source, lockfile=lockfile)
if 'dependencies' in info:
yield from self.process_dependencies(lockfile, info['dependencies'])
def process_lockfile(self, lockfile: Path) -> Iterator[Package]:
with open(lockfile) as fp:
data = json.load(fp)
assert data['lockfileVersion'] <= 2, data['lockfileVersion']
yield from self.process_dependencies(lockfile, data.get('dependencies', {}))
class NpmRCFileProvider(RCFileProvider):
RCFILE_NAME = '.npmrc'
class NpmModuleProvider(ModuleProvider):
class Options(NamedTuple):
registry: str
no_autopatch: bool
no_trim_index: bool
class RegistryPackageIndex(NamedTuple):
url: str
data: Dict[Any, Any]
used_versions: Set[str]
def __init__(self, gen: ManifestGenerator, special: SpecialSourceProvider,
lockfile_root: Path, options: Options) -> None:
self.gen = gen
self.special_source_provider = special
self.lockfile_root = lockfile_root
self.registry = options.registry
self.no_autopatch = options.no_autopatch
self.no_trim_index = options.no_trim_index
self.npm_cache_dir = self.gen.data_root / 'npm-cache'
self.cacache_dir = self.npm_cache_dir / '_cacache'
# Awaitable so multiple tasks can be waiting on the same package info.
self.registry_packages: Dict[
str, asyncio.Future[NpmModuleProvider.RegistryPackageIndex]] = {}
self.index_entries: Dict[Path, str] = {}
self.all_lockfiles: Set[Path] = set()
# Mapping of lockfiles to a dict of the Git source target paths and GitSource objects.
self.git_sources: DefaultDict[Path, Dict[
Path, GitSource]] = collections.defaultdict(lambda: {})
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType]) -> None:
# Don't bother finalizing if an exception was thrown.
if exc_type is None:
self._finalize()
def get_cacache_integrity_path(self, integrity: Integrity) -> Path:
digest = integrity.digest
return Path(digest[0:2]) / digest[2:4] / digest[4:]
def get_cacache_index_path(self, integrity: Integrity) -> Path:
return self.cacache_dir / Path('index-v5') / self.get_cacache_integrity_path(
integrity)
def get_cacache_content_path(self, integrity: Integrity) -> Path:
return (self.cacache_dir / Path('content-v2') / integrity.algorithm /
self.get_cacache_integrity_path(integrity))
def add_index_entry(self, url: str, metadata: RemoteUrlMetadata) -> None:
key = f'make-fetch-happen:request-cache:{url}'
index_json = json.dumps({
'key':
key,
'integrity':
f'{metadata.integrity.algorithm}-{metadata.integrity.to_base64()}',
'time':
0,
'size':
metadata.size,
'metadata': {
'url': url,
'reqHeaders': {},
'resHeaders': {},
},
})
content_integrity = Integrity.generate(index_json, algorithm='sha1')
index = '\t'.join((content_integrity.digest, index_json))
key_integrity = Integrity.generate(key)
index_path = self.get_cacache_index_path(key_integrity)
self.index_entries[index_path] = index
async def resolve_source(self, package: Package) -> ResolvedSource:
# These results are going to be the same each time.
if package.name not in self.registry_packages:
cache_future = asyncio.get_event_loop().create_future()
self.registry_packages[package.name] = cache_future
data_url = f'{self.registry}/{package.name.replace("/", "%2f")}'
# NOTE: Not cachable, because this is an API call.
raw_data = await Requests.instance.read_all(data_url, cachable=False)
data = json.loads(raw_data)
assert 'versions' in data, f'{data_url} returned an invalid package index'
cache_future.set_result(
NpmModuleProvider.RegistryPackageIndex(url=data_url,
data=data,
used_versions=set()))
if not self.no_trim_index:
for key in list(data):
if key != 'versions':
del data[key]
index = await self.registry_packages[package.name]
versions = index.data['versions']
assert package.version in versions, \
f'{package.name} versions available are {", ".join(versions)}, not {package.version}'
dist = versions[package.version]['dist']
assert 'tarball' in dist, f'{package.name}@{package.version} has no tarball in dist'
index.used_versions.add(package.version)
integrity: Integrity
if 'integrity' in dist:
integrity = Integrity.parse(dist['integrity'])
elif 'shasum' in dist:
integrity = Integrity.from_sha1(dist['shasum'])
else:
assert False, f'{package.name}@{package.version} has no integrity in dist'
return ResolvedSource(resolved=dist['tarball'], integrity=integrity)
async def generate_package(self, package: Package) -> None:
self.all_lockfiles.add(package.lockfile)
source = package.source
assert not isinstance(source, ResolvedSource)
if isinstance(source, UnresolvedRegistrySource):
source = await self.resolve_source(package)
assert source.resolved is not None
assert source.integrity is not None
integrity = await source.retrieve_integrity()
size = await RemoteUrlMetadata.get_size(source.resolved, cachable=True)
metadata = RemoteUrlMetadata(integrity=integrity, size=size)
content_path = self.get_cacache_content_path(integrity)
self.gen.add_url_source(source.resolved, integrity, content_path)
self.add_index_entry(source.resolved, metadata)
await self.special_source_provider.generate_special_sources(package)
# pyright: reportUnnecessaryIsInstance=false
elif isinstance(source, GitSource):
# Get a unique name to use for the Git repository folder.
name = f'{package.name}-{source.commit}'
path = self.gen.data_root / 'git-packages' / name
self.git_sources[package.lockfile][path] = source
self.gen.add_git_source(source.url, source.commit, path)
def relative_lockfile_dir(self, lockfile: Path) -> Path:
return lockfile.parent.relative_to(self.lockfile_root)
def _finalize(self) -> None:
for _, async_index in self.registry_packages.items():
index = async_index.result()
if not self.no_trim_index:
for version in list(index.data['versions'].keys()):
if version not in index.used_versions:
del index.data['versions'][version]
raw_data = json.dumps(index.data).encode()
metadata = RemoteUrlMetadata(integrity=Integrity.generate(raw_data),
size=len(raw_data))
content_path = self.get_cacache_content_path(metadata.integrity)
self.gen.add_data_source(raw_data, content_path)
self.add_index_entry(index.url, metadata)
patch_commands: DefaultDict[Path, List[str]
] = collections.defaultdict(lambda: [])
if self.git_sources:
# Generate jq scripts to patch the package*.json files.
scripts = {
'package.json':
r'''
walk(
if type == "object"
then
to_entries | map(
if (.value | type == "string") and $data[.value]
then .value = "git+file:\($buildroot)/\($data[.value])"
else .
end
) | from_entries
else .
end
)
''',
'package-lock.json':
r'''
walk(
if type == "object" and (.version | type == "string") and $data[.version]
then
.version = "git+file:\($buildroot)/\($data[.version])"
else .
end
)
''',
}
for lockfile, sources in self.git_sources.items():
prefix = self.relative_lockfile_dir(lockfile)
data: Dict[str, Dict[str, str]] = {
'package.json': {},
'package-lock.json': {},
}
for path, source in sources.items():
original_version = f'{source.original}'
new_version = f'{path}#{source.commit}'
assert source.from_ is not None
data['package.json'][source.from_] = new_version
data['package-lock.json'][original_version] = new_version
for filename, script in scripts.items():
target = Path('$FLATPAK_BUILDER_BUILDDIR') / \
prefix / filename
script = textwrap.dedent(script.lstrip('\n')).strip().replace(
'\n', '')
json_data = json.dumps(data[filename])
patch_commands[lockfile].append(
'jq'
' --arg buildroot "$FLATPAK_BUILDER_BUILDDIR"'
f' --argjson data {shlex.quote(json_data)}'
f' {shlex.quote(script)} {target}'
f' > {target}.new')
patch_commands[lockfile].append(f'mv {target}{{.new,}}')
patch_all_commands: List[str] = []
for lockfile in self.all_lockfiles:
patch_dest = self.gen.data_root / 'patch' / self.relative_lockfile_dir(
lockfile)
# Don't use with_extension to avoid problems if the package has a . in its name.
patch_dest = patch_dest.with_name(patch_dest.name + '.sh')
self.gen.add_script_source(patch_commands[lockfile], patch_dest)
patch_all_commands.append(
f'$FLATPAK_BUILDER_BUILDDIR/{patch_dest}')
patch_all_dest = self.gen.data_root / 'patch-all.sh'
self.gen.add_script_source(patch_all_commands, patch_all_dest)
if not self.no_autopatch:
# FLATPAK_BUILDER_BUILDDIR isn't defined yet for script sources.
self.gen.add_command(
f'FLATPAK_BUILDER_BUILDDIR=$PWD {patch_all_dest}')
if self.index_entries:
for path, entry in self.index_entries.items():
self.gen.add_data_source(entry, path)
class YarnLockfileProvider(LockfileProvider):
@staticmethod
def is_git_version(version: str) -> bool:
for pattern in GIT_URL_PATTERNS:
if pattern.match(version):
return True
url = urllib.parse.urlparse(version)
if url.netloc in GIT_URL_HOSTS:
return len([p for p in url.path.split("/") if p]) == 2
return False
def unquote(self, string: str) -> str:
if string.startswith('"'):
assert string.endswith('"')
return string[1:-1]
else:
return string
def parse_package_section(self, lockfile: Path, section: List[str]) -> Package:
assert section
name_line = section[0]
assert name_line.endswith(':'), name_line
name_line = name_line[:-1]
name = self.unquote(name_line.split(',', 1)[0])
name, _ = name.rsplit('@', 1)
version: Optional[str] = None
resolved: Optional[str] = None
integrity: Optional[Integrity] = None
section_indent = 0
line = None
for line in section[1:]:
indent = 0
while line[indent].isspace():
indent += 1
assert indent, line
if not section_indent:
section_indent = indent
elif indent > section_indent:
# Inside some nested section.
continue
line = line.strip()
if line.startswith('version'):
version = self.unquote(line.split(' ', 1)[1])
elif line.startswith('resolved'):
resolved = self.unquote(line.split(' ', 1)[1])
elif line.startswith('integrity'):
_, values_str = line.split(' ', 1)
values = self.unquote(values_str).split(' ')
integrity = Integrity.parse(values[0])
assert version and resolved, line
source: PackageSource
if self.is_git_version(resolved):
source = self.parse_git_source(version=resolved)
else:
source = ResolvedSource(resolved=resolved, integrity=integrity)
return Package(name=name, version=version, source=source, lockfile=lockfile)
def process_lockfile(self, lockfile: Path) -> Iterator[Package]:
section: List[str] = []
with open(lockfile) as fp:
for line in map(str.rstrip, fp):
if not line.strip() or line.strip().startswith('#'):
continue
if not line[0].isspace():
if section:
yield self.parse_package_section(lockfile, section)
section = []
section.append(line)
if section:
yield self.parse_package_section(lockfile, section)
class YarnRCFileProvider(RCFileProvider):
RCFILE_NAME = '.yarnrc'
class YarnModuleProvider(ModuleProvider):
# From https://github.com/yarnpkg/yarn/blob/v1.22.4/src/fetchers/tarball-fetcher.js
_PACKAGE_TARBALL_URL_RE = re.compile(
r'(?:(@[^/]+)(?:/|%2f))?[^/]+/(?:-|_attachments)/(?:@[^/]+/)?([^/]+)$')
def __init__(self, gen: ManifestGenerator, special: SpecialSourceProvider) -> None:
self.gen = gen
self.special_source_provider = special
self.mirror_dir = self.gen.data_root / 'yarn-mirror'
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType]) -> None:
pass
async def generate_package(self, package: Package) -> None:
source = package.source
if isinstance(source, ResolvedSource):
integrity = await source.retrieve_integrity()
url_parts = urllib.parse.urlparse(source.resolved)
match = self._PACKAGE_TARBALL_URL_RE.search(url_parts.path)
if match is not None:
scope, filename = match.groups()
if scope:
filename = f'{scope}-{filename}'
else:
filename = os.path.basename(url_parts.path)
self.gen.add_url_source(source.resolved, integrity,
self.mirror_dir / filename)
elif isinstance(source, GitSource):
repo_name = urllib.parse.urlparse(source.url).path.split('/')[-1]
name = f'{repo_name}-{source.commit}'
repo_dir = self.gen.tmp_root / name
target_tar = os.path.relpath(self.mirror_dir / name, repo_dir)
self.gen.add_git_source(source.url, source.commit, repo_dir)
self.gen.add_command(
f'cd {repo_dir}; git archive --format tar -o {target_tar} HEAD')
await self.special_source_provider.generate_special_sources(package)
class ProviderFactory:
def create_lockfile_provider(self) -> LockfileProvider:
raise NotImplementedError()
def create_rcfile_providers(self) -> List[RCFileProvider]:
raise NotImplementedError()
def create_module_provider(self, gen: ManifestGenerator,
special: SpecialSourceProvider) -> ModuleProvider:
raise NotImplementedError()
class NpmProviderFactory(ProviderFactory):
class Options(NamedTuple):
lockfile: NpmLockfileProvider.Options
module: NpmModuleProvider.Options
def __init__(self, lockfile_root: Path, options: Options) -> None:
self.lockfile_root = lockfile_root
self.options = options
def create_lockfile_provider(self) -> NpmLockfileProvider:
return NpmLockfileProvider(self.options.lockfile)
def create_rcfile_providers(self) -> List[RCFileProvider]:
return [NpmRCFileProvider()]
def create_module_provider(self, gen: ManifestGenerator,
special: SpecialSourceProvider) -> NpmModuleProvider:
return NpmModuleProvider(gen, special, self.lockfile_root, self.options.module)
class YarnProviderFactory(ProviderFactory):
def __init__(self) -> None:
pass
def create_lockfile_provider(self) -> YarnLockfileProvider:
return YarnLockfileProvider()
def create_rcfile_providers(self) -> List[RCFileProvider]:
return [YarnRCFileProvider(), NpmRCFileProvider()]
def create_module_provider(self, gen: ManifestGenerator,
special: SpecialSourceProvider) -> YarnModuleProvider:
return YarnModuleProvider(gen, special)
class GeneratorProgress(ContextManager['GeneratorProgress']):
def __init__(self, packages: Collection[Package],
module_provider: ModuleProvider) -> None:
self.finished = 0
self.packages = packages
self.module_provider = module_provider
self.previous_package: Optional[Package] = None
self.current_package: Optional[Package] = None
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType]) -> None:
print()
def _format_package(self, package: Package, max_width: int) -> str:
result = f'{package.name} @ {package.version}'
if len(result) > max_width:
result = result[:max_width - 3] + '...'
return result
def _update(self) -> None:
columns, _ = shutil.get_terminal_size()
sys.stdout.write('\r' + ' ' * columns)
prefix_string = f'\rGenerating packages [{self.finished}/{len(self.packages)}] '
sys.stdout.write(prefix_string)
max_package_width = columns - len(prefix_string)
if self.current_package is not None:
sys.stdout.write(self._format_package(self.current_package,
max_package_width))
sys.stdout.flush()
def _update_with_package(self, package: Package) -> None:
self.previous_package, self.current_package = self.current_package, package
self._update()
async def _generate(self, package: Package) -> None:
self._update_with_package(package)
await self.module_provider.generate_package(package)
self.finished += 1
self._update_with_package(package)
async def run(self) -> None:
self._update()
tasks = [asyncio.create_task(self._generate(pkg))
for pkg in self.packages]
for coro in asyncio.as_completed(tasks):
try:
await coro
except:
# If an exception occurred, make sure to cancel all the other
# tasks.
for task in tasks:
task.cancel()
raise
def scan_for_lockfiles(base: Path, patterns: List[str]) -> Iterator[Path]:
for root, _, files in os.walk(base.parent):
if base.name in files:
lockfile = Path(root) / base.name
if not patterns or any(map(lockfile.match, patterns)):
yield lockfile
async def main() -> None:
parser = argparse.ArgumentParser(description='Flatpak Node generator')
parser.add_argument('type', choices=['npm', 'yarn'])
parser.add_argument('lockfile',
help='The lockfile path (package-lock.json or yarn.lock)')
parser.add_argument('-o',
'--output',
help='The output sources file',
default='generated-sources.json')
parser.add_argument(
'-r',
'--recursive',
action='store_true',
help='Recursively process all files under the lockfile directory with '
'the lockfile basename')
parser.add_argument(
'-R',
'--recursive-pattern',
action='append',
help='Given -r, restrict files to those matching the given pattern.')
parser.add_argument('--registry',
help='The registry to use (npm only)',
default='https://registry.npmjs.org')
parser.add_argument('--no-trim-index',
action='store_true',
help="Don't trim npm package metadata (npm only)")
parser.add_argument('--no-devel',
action='store_true',
help="Don't include devel dependencies (npm only)")
parser.add_argument('--no-aiohttp',
action='store_true',
help="Don't use aiohttp, and silence any warnings related to it")
parser.add_argument('--no-requests-cache',
action='store_true',
help='Disable the requests cache')
parser.add_argument('--retries',
type=int,
help='Number of retries of failed requests',
default=Requests.DEFAULT_RETRIES)
parser.add_argument('-P',
'--no-autopatch',
action='store_true',
help="Don't automatically patch Git sources from package*.json")
parser.add_argument('-s',
'--split',
action='store_true',
help='Split the sources file to fit onto GitHub.')
parser.add_argument('--node-chromedriver-from-electron',
help='Use the ChromeDriver version associated with the given '
'Electron version for node-chromedriver')
# Deprecated alternative to --node-chromedriver-from-electron
parser.add_argument('--electron-chromedriver', help=argparse.SUPPRESS)
parser.add_argument('--electron-ffmpeg',
choices=['archive', 'lib'],
help='Download prebuilt ffmpeg for matching electron version')
parser.add_argument('--electron-node-headers',
action='store_true',
help='Download the electron node headers')
parser.add_argument('--electron-from-rcfile',
action='store_true',
help='Download electron version corresponding to '
'the node headers version(s) from .yarnrc/.npmrc')
parser.add_argument('--nwjs-version',
help='Specify NW.js version (will use latest otherwise)')
parser.add_argument('--nwjs-node-headers',
action='store_true',
help='Download the NW.js node headers')
parser.add_argument('--nwjs-ffmpeg',
action='store_true',
help='Download prebuilt ffmpeg for current NW.js version')
parser.add_argument('--xdg-layout',
action='store_true',
help='Use XDG layout for caches')
# Internal option, useful for testing.
parser.add_argument('--stub-requests',
action='store_true', help=argparse.SUPPRESS)
args = parser.parse_args()
Requests.retries = args.retries
if args.type == 'yarn' and (args.no_devel or args.no_autopatch):
sys.exit('--no-devel and --no-autopatch do not apply to Yarn.')
if args.electron_chromedriver:
print('WARNING: --electron-chromedriver is deprecated', file=sys.stderr)
print(' (Use --node-chromedriver-from-electron instead.)', file=sys.stderr)
if args.stub_requests:
Requests.instance = StubRequests()
elif args.no_aiohttp:
if Requests.instance.is_async:
Requests.instance = UrllibRequests()
elif not Requests.instance.is_async:
print('WARNING: aiohttp is not found, performance will suffer.',
file=sys.stderr)
print(' (Pass --no-aiohttp to silence this warning.)', file=sys.stderr)
if not args.no_requests_cache:
Cache.instance = FilesystemBasedCache()
lockfiles: List[Path]
if args.recursive or args.recursive_pattern:
lockfiles = list(scan_for_lockfiles(
Path(args.lockfile), args.recursive_pattern))
if not lockfiles:
sys.exit('No lockfiles found.')
print(f'Found {len(lockfiles)} lockfiles.')
else:
lockfiles = [Path(args.lockfile)]
lockfile_root = Path(args.lockfile).parent
provider_factory: ProviderFactory
if args.type == 'npm':
npm_options = NpmProviderFactory.Options(
NpmLockfileProvider.Options(no_devel=args.no_devel),
NpmModuleProvider.Options(registry=args.registry,
no_autopatch=args.no_autopatch,
no_trim_index=args.no_trim_index))
provider_factory = NpmProviderFactory(lockfile_root, npm_options)
elif args.type == 'yarn':
provider_factory = YarnProviderFactory()
else:
assert False, args.type
print('Reading packages from lockfiles...')
packages: Set[Package] = set()
rcfile_node_headers: Set[NodeHeaders] = set()
for lockfile in lockfiles:
lockfile_provider = provider_factory.create_lockfile_provider()
rcfile_providers = provider_factory.create_rcfile_providers()
packages.update(lockfile_provider.process_lockfile(lockfile))
for rcfile_provider in rcfile_providers:
rcfile = lockfile.parent / rcfile_provider.RCFILE_NAME
if rcfile.is_file():
nh = rcfile_provider.get_node_headers(rcfile)
if nh is not None:
rcfile_node_headers.add(nh)
print(f'{len(packages)} packages read.')
gen = ManifestGenerator()
with gen:
options = SpecialSourceProvider.Options(
node_chromedriver_from_electron=args.node_chromedriver_from_electron
or args.electron_chromedriver,
nwjs_version=args.nwjs_version,
nwjs_node_headers=args.nwjs_node_headers,
nwjs_ffmpeg=args.nwjs_ffmpeg,
xdg_layout=args.xdg_layout,
electron_ffmpeg=args.electron_ffmpeg,
electron_node_headers=args.electron_node_headers,
electron_from_rcfile=args.electron_from_rcfile)
special = SpecialSourceProvider(gen, options)
with provider_factory.create_module_provider(gen, special) as module_provider:
with GeneratorProgress(packages, module_provider) as progress:
await progress.run()
for headers in rcfile_node_headers:
print(f'Generating headers {headers.runtime} @ {headers.target}')
await special.generate_node_headers(headers)
if args.xdg_layout:
script_name = "setup_sdk_node_headers.sh"
node_gyp_dir = gen.data_root / "cache" / "node-gyp"
gen.add_script_source([
'version=$(node --version | sed "s/^v//")',
'nodedir=$(dirname "$(dirname "$(which node)")")',
f'mkdir -p "{node_gyp_dir}/$version"',
f'ln -s "$nodedir/include" "{node_gyp_dir}/$version/include"',
f'echo 9 > "{node_gyp_dir}/$version/installVersion"',
],
destination=gen.data_root / script_name)
gen.add_command(f"bash {gen.data_root / script_name}")
if args.split:
i = 0
for i, part in enumerate(gen.split_sources()):
output = Path(args.output)
output = output.with_suffix(f'.{i}{output.suffix}')
with open(output, 'w') as fp:
json.dump(part, fp, indent=ManifestGenerator.JSON_INDENT)
print(f'Wrote {gen.source_count} to {i + 1} file(s).')
else:
with open(args.output, 'w') as fp:
json.dump(list(gen.ordered_sources()),
fp,
indent=ManifestGenerator.JSON_INDENT)
if fp.tell() >= ManifestGenerator.MAX_GITHUB_SIZE:
print('WARNING: generated-sources.json is too large for GitHub.',
file=sys.stderr)
print(' (Pass -s to enable splitting.)')
print(f'Wrote {gen.source_count} source(s).')
if __name__ == '__main__':
asyncio.run(main())