builder.braak.pro/buildbot/config/worker/utils.py
2024-11-20 16:13:44 +01:00

583 lines
18 KiB
Python

# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: 2011-2024 Blender Authors
# <pep8 compliant>
## Utility functions used by all builders.
import argparse
import atexit
import logging
import os
import pathlib
import platform
import re
import shutil
import subprocess
import sys
import time
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
# Logging
_error_pattern = re.compile(
r"(^FATAL:|^ERROR:|^ERROR!|^Unhandled Error|^Traceback| error: | error | Error |FAILED: |ninja: build stopped: subcommand failed|CMake Error|SEGFAULT|Exception: SegFault |The following tests FAILED:|\*\*\*Failed|\*\*\*Exception|\*\*\*Abort|^fatal:)"
)
_warning_pattern = re.compile(
r"(^WARNING:|^WARNING!|^WARN |Warning: | warning: | warning |warning | nvcc warning :|CMake Warning)"
)
_ignore_pattern = re.compile(
r"(SignTool Error: CryptSIPRemoveSignedDataMsg returned error: 0x00000057|unknown mimetype for .*doctree)"
)
_errors: List[str] = []
_warnings: List[str] = []
def _print_warning(msg: str) -> None:
print("\033[33m" + msg + "\033[0m", flush=True)
def _print_error(msg: str) -> None:
print("\033[31m" + msg + "\033[0m", flush=True)
def _print_cmd(msg: str) -> None:
print("\033[32m" + msg + "\033[0m", flush=True)
def _exit_handler() -> None:
if len(_warnings):
print("")
print("=" * 80)
print("WARNING Summary:")
print("=" * 80)
for msg in _warnings:
_print_warning(msg)
if len(_errors):
print("")
print("=" * 80)
print("ERROR Summary:")
print("=" * 80)
for msg in _errors:
_print_error(msg)
atexit.register(_exit_handler)
def info(msg: str) -> None:
print("INFO: " + msg, flush=True)
def warning(msg: str) -> None:
_print_warning("WARN: " + msg)
global _warnings
_warnings += [msg]
def error(msg: str) -> None:
_print_error("ERROR: " + msg)
global _errors
_errors += [msg]
def exception(e: BaseException) -> None:
logging.exception(e)
global _errors
_errors += [str(e)]
def _log_cmd(msg: str) -> None:
if re.search(_error_pattern, msg):
if not re.search(_ignore_pattern, msg):
_print_error(msg)
global _errors
_errors += [msg]
return
elif re.search(_warning_pattern, msg):
if not re.search(_ignore_pattern, msg):
_print_warning(msg)
global _warnings
_warnings += [msg]
return
print(msg.encode("ascii", errors="replace").decode("ascii"), flush=True)
# Command execution
class HiddenArgument:
def __init__(self, value: Union[str, pathlib.Path]):
self.value = value
CmdArgument = Union[str, pathlib.Path, HiddenArgument, Any]
CmdList = List[CmdArgument]
CmdSequence = Sequence[CmdArgument]
CmdFilterOutput = Optional[Callable[[str], Optional[str]]]
CmdEnvironment = Optional[Dict[str, str]]
def _prepare_call(
cmd: CmdSequence, dry_run: bool = False
) -> Sequence[Union[str, pathlib.Path]]:
real_cmd: List[Union[str, pathlib.Path]] = []
log_cmd: List[str] = []
for arg in cmd:
if isinstance(arg, HiddenArgument):
real_cmd += [arg.value]
else:
log_cmd += [str(arg)]
real_cmd += [arg]
if dry_run:
info(f"Dry run command in path [{os.getcwd()}]")
else:
info(f"Run command in path [{os.getcwd()}]")
_print_cmd(" ".join(log_cmd))
return real_cmd
def call(
cmd: CmdSequence,
env: CmdEnvironment = None,
exit_on_error: bool = True,
filter_output: CmdFilterOutput = None,
retry_count: int = 0,
retry_wait_time: float = 1.0,
dry_run: bool = False,
) -> int:
cmd = _prepare_call(cmd, dry_run)
if dry_run:
return 0
for try_count in range(0, retry_count + 1):
# Flush to ensure correct order output on Windows.
sys.stdout.flush()
sys.stderr.flush()
proc = subprocess.Popen(
cmd,
env=env,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding="utf-8",
errors="ignore",
)
while True:
if not proc.stdout:
break
line = proc.stdout.readline()
if line:
line_str = line.strip("\n\r")
if filter_output:
filter_output(line_str)
else:
pass
if line_str:
_log_cmd(line_str)
else:
break
proc.communicate()
if proc.returncode == 0:
return 0
if try_count == retry_count:
if exit_on_error:
sys.exit(proc.returncode)
return proc.returncode
else:
warning("Command failed, retrying")
time.sleep(retry_wait_time)
return -1
def check_output(cmd: CmdSequence, exit_on_error: bool = True) -> str:
cmd = _prepare_call(cmd)
# Flush to ensure correct order output on Windows.
sys.stdout.flush()
sys.stderr.flush()
try:
output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT, universal_newlines=True
)
except subprocess.CalledProcessError as e:
if exit_on_error:
sys.exit(e.returncode)
output = ""
return output.strip()
def call_pipenv(
cmd: CmdSequence, filter_output: CmdFilterOutput = None, dry_run: bool = False
) -> int:
cmd_prefix: CmdList = ["pipenv"]
return call(cmd_prefix + list(cmd), filter_output=filter_output, dry_run=dry_run)
def call_ssh(connect_id: str, cmd: CmdSequence, dry_run: bool = False) -> int:
ssh_cmd = [
"ssh",
"-o",
"ConnectTimeout=20",
HiddenArgument(connect_id),
" ".join([str(arg) for arg in cmd]),
]
return call(ssh_cmd, retry_count=3, dry_run=dry_run)
def rsync(
source_path: Union[pathlib.Path, str],
dest_path: Union[pathlib.Path, str],
exclude_paths: Sequence[str] = [],
include_paths: Sequence[str] = [],
change_modes: Sequence[str] = [],
change_owner: Optional[str] = None,
show_names: bool = False,
delete: bool = False,
delete_path_check: Optional[str] = None,
dry_run: bool = False,
port: int = 22,
retry_count: int = 3,
) -> int:
# Extra check on path, because delete is risky if pointed at a
# root folder that contains other data.
if delete:
if not delete_path_check:
raise Exception("Rsync: delete requires delete_path_check")
if str(dest_path).find(delete_path_check) == -1:
raise Exception("Rsync: remote path must contain '{delete_path_check}'")
if show_names:
pass
cmd: List[Union[str, pathlib.Path, HiddenArgument]] = [
"rsync",
# SSH options
"-e",
f"ssh -o ConnectTimeout=20 -p {port}",
# The -rlpgoDv options below are equivalent to --archive apart from updating
# the timestamp of the files on the receiving side. This should prevent them
# from getting marked for zfs-snapshots.
"--timeout=60",
"--checksum",
"-rlpgoDv",
"--partial",
]
if change_owner:
cmd += [f"--chown={change_owner}"]
if delete:
cmd += ["--delete"]
# cmd += [f"--info={info_options}"]
cmd += [f"--include={item}" for item in include_paths]
cmd += [f"--exclude={item}" for item in exclude_paths]
cmd += [f"--chmod={item}" for item in change_modes]
cmd += [source_path]
cmd += [HiddenArgument(dest_path)]
return call(cmd, retry_count=retry_count, dry_run=dry_run)
def move(path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False) -> None:
if dry_run:
return
# str() works around typing bug in Python 3.6.
shutil.move(str(path_from), path_to)
def copy_dir(
path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False
) -> None:
if dry_run:
return
shutil.copytree(path_from, path_to)
def copy_file(
path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False
) -> None:
if dry_run:
return
shutil.copy2(path_from, path_to)
def remove_file(
path: pathlib.Path,
retry_count: int = 3,
retry_wait_time: float = 5.0,
dry_run: bool = False,
) -> None:
if not path.exists():
return
if dry_run:
info(f"Removing {path} (dry run)")
return
info(f"Removing {path}")
for try_count in range(retry_count):
try:
if path.exists():
path.unlink()
return
except FileNotFoundError:
# File was already removed by another process.
return
except PermissionError as e:
warning(f"Permission error when removing {path}: {e}")
time.sleep(retry_wait_time)
except OSError as e:
warning(f"OS error when removing {path}: {e}")
time.sleep(retry_wait_time)
# Final attempt outside the retry loop
try:
if path.exists():
path.unlink()
except FileNotFoundError:
pass
except PermissionError as e:
error(f"Failed to remove {path} due to permission issues: {e}")
except OSError as e:
error(f"Failed to remove {path} after retries due to OS error: {e}")
# Retry several times by default, giving it a chance for possible antivirus to release
# a lock on files in the build folder. Happened for example with MSI files on Windows.
def remove_dir(
path: pathlib.Path, retry_count: int = 3, retry_wait_time: float = 5.0
) -> None:
for try_count in range(retry_count):
try:
if path.exists():
shutil.rmtree(path)
return # Successfully removed, no need to retry
except PermissionError as e:
if platform.system().lower() == "windows":
# Debugging access denied errors on Windows
if path.name == "build_package":
info("Removal of package artifacts folder failed. Investigating...")
msi_path = (
path
/ "_CPack_Packages"
/ "Windows"
/ "WIX"
/ "blender-windows64.msi"
)
if msi_path.exists():
info(f"Information about [{msi_path}]")
call(["handle64", msi_path], exit_on_error=False)
call(
["pwsh", "-command", f"Get-Item {msi_path} | Format-List"],
exit_on_error=False,
)
call(
["pwsh", "-command", f"Get-Acl {msi_path} | Format-List"],
exit_on_error=False,
)
else:
info(f"MSI package file [{msi_path}] does not exist")
warning(f"Permission error when removing {path}: {e}")
time.sleep(retry_wait_time)
except FileNotFoundError:
# The directory is already gone; no action needed.
return
except OSError as e:
warning(f"OS error when attempting to remove {path}: {e}")
time.sleep(retry_wait_time)
# Final attempt outside of retries
if path.exists():
try:
shutil.rmtree(path)
except PermissionError as e:
error(f"Failed to remove {path} due to permission issues: {e}")
except OSError as e:
error(f"Failed to remove {path} after retries due to OS error: {e}")
def is_tool(name: Union[str, pathlib.Path]) -> bool:
"""Check whether `name` is on PATH and marked as executable."""
return shutil.which(name) is not None
# Update source code from git repository.
def update_source(
app_org: str,
app_id: str,
code_path: pathlib.Path,
branch_id: str = "main",
patch_id: Optional[str] = None,
commit_id: Optional[str] = None,
update_submodules: bool = False,
) -> None:
repo_url = f"https://projects.blender.org/{app_org}/{app_id}.git"
if not code_path.exists():
# Clone new
info(f"Cloning {repo_url}")
call(["git", "clone", "--progress", repo_url, code_path])
else:
for index_lock_path in code_path.rglob(".git/index.lock"):
warning("Removing git lock, probably left behind by killed git process")
remove_file(index_lock_path)
for index_lock_path in (code_path / ".git" / "modules").rglob("index.lock"):
warning(
"Removing submodule git lock, probably left behind by killed git process"
)
remove_file(index_lock_path)
os.chdir(code_path)
# Fix error: "fatal: bad object refs/remotes/origin/HEAD"
call(["git", "remote", "set-head", "origin", "--auto"])
# Change to new Gitea URL.
call(["git", "remote", "set-url", "origin", repo_url])
call(["git", "submodule", "sync"])
# Fetch and clean
call(["git", "fetch", "origin", "--prune"])
call(["git", "clean", "-f", "-d"])
call(["git", "reset", "--hard"])
rebase_merge_path = code_path / ".git" / "rebase-merge"
if rebase_merge_path.exists():
info(f"Path {rebase_merge_path} exists, removing !")
shutil.rmtree(rebase_merge_path)
if patch_id:
# Pull request.
pull_request_id = patch_id
branch_name = f"PR{pull_request_id}"
# Checkout pull request into PR123 branch.
call(["git", "checkout", "main"])
call(
[
"git",
"fetch",
"-f",
"origin",
f"pull/{pull_request_id}/head:{branch_name}",
]
)
call(["git", "checkout", branch_name])
if commit_id and (commit_id != "HEAD"):
call(["git", "reset", "--hard", commit_id])
else:
# Branch.
call(["git", "checkout", branch_id])
if commit_id and (commit_id != "HEAD"):
call(["git", "reset", "--hard", commit_id])
else:
call(["git", "reset", "--hard", "origin/" + branch_id])
if update_submodules:
call(["git", "submodule", "init"])
# Resolve potential issues with submodules even if other code
# is responsible for updating them.
call(["git", "submodule", "foreach", "git", "clean", "-f", "-d"])
call(["git", "submodule", "foreach", "git", "reset", "--hard"])
if update_submodules:
call(["git", "submodule", "update"])
# Workaroud missing type info in 3.8.
if sys.version_info >= (3, 9):
BuilderSteps = OrderedDict[str, Callable[[Any], None]]
else:
BuilderSteps = Any
class Builder:
def __init__(self, args: argparse.Namespace, app_org: str, app_id: str):
self.service_env_id = args.service_env_id
self.track_id = args.track_id
self.branch_id = args.branch_id
self.patch_id = args.patch_id
self.commit_id = args.commit_id
self.platform = platform.system().lower()
self.architecture = platform.machine().lower()
self.app_org = app_org
self.app_id = app_id
if not self.branch_id:
branches_config = self.get_branches_config()
self.branch_id = branches_config.track_code_branches[self.track_id]
self.tracks_root_path = self.get_worker_config().tracks_root_path
self.track_path = self.tracks_root_path / (self.app_id + "-" + self.track_id)
self.code_path = self.track_path / (self.app_id + ".git")
info(f"Setting up builder paths from [{self.track_path}]")
def setup_track_path(self) -> None:
# Create track directory if doesn't exist already.
os.makedirs(self.track_path, exist_ok=True)
os.chdir(self.track_path)
# Clean up any existing pipenv files.
remove_file(self.track_path / "Pipfile")
remove_file(self.track_path / "Pipfile.lock")
remove_file(self.code_path / "Pipfile")
remove_file(self.code_path / "Pipfile.lock")
def update_source(self, update_submodules: bool = False) -> None:
update_source(
self.app_org,
self.app_id,
self.code_path,
branch_id=self.branch_id,
patch_id=self.patch_id,
commit_id=self.commit_id,
update_submodules=update_submodules,
)
def run(self, step: str, steps: BuilderSteps) -> None:
try:
if step == "all":
for func in steps.values():
func(self)
else:
steps[step](self)
except Exception as e:
exception(e)
sys.exit(1)
def get_worker_config(self) -> Any:
import conf.worker
return conf.worker.get_config(self.service_env_id)
def get_branches_config(self) -> Any:
import conf.branches
return conf.branches
def create_argument_parser(steps: BuilderSteps) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("--service-env-id", type=str, required=False, default="LOCAL")
parser.add_argument("--track-id", default="vdev", type=str, required=False)
parser.add_argument("--branch-id", default="", type=str, required=False)
parser.add_argument("--patch-id", default="", type=str, required=False)
parser.add_argument("--commit-id", default="", type=str, required=False)
all_steps = list(steps.keys()) + ["all"]
parser.add_argument("step", choices=all_steps)
return parser