# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: 2011-2024 Blender Authors
# <pep8 compliant>

## Utility functions used by all builders.

import argparse
import atexit
import logging
import os
import pathlib
import platform
import re
import shutil
import subprocess
import sys
import time

from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

# Logging
_error_pattern = re.compile(
    r"(^FATAL:|^ERROR:|^ERROR!|^Unhandled Error|^Traceback| error: | error | Error |FAILED: |ninja: build stopped: subcommand failed|CMake Error|SEGFAULT|Exception: SegFault |The following tests FAILED:|\*\*\*Failed|\*\*\*Exception|\*\*\*Abort|^fatal:)"
)
_warning_pattern = re.compile(
    r"(^WARNING:|^WARNING!|^WARN |Warning: | warning: | warning |warning | nvcc warning :|CMake Warning)"
)
_ignore_pattern = re.compile(
    r"(SignTool Error: CryptSIPRemoveSignedDataMsg returned error: 0x00000057|unknown mimetype for .*doctree)"
)

_errors: List[str] = []
_warnings: List[str] = []


def _print_warning(msg: str) -> None:
    print("\033[33m" + msg + "\033[0m", flush=True)


def _print_error(msg: str) -> None:
    print("\033[31m" + msg + "\033[0m", flush=True)


def _print_cmd(msg: str) -> None:
    print("\033[32m" + msg + "\033[0m", flush=True)


def _exit_handler() -> None:
    if len(_warnings):
        print("")
        print("=" * 80)
        print("WARNING Summary:")
        print("=" * 80)
        for msg in _warnings:
            _print_warning(msg)
    if len(_errors):
        print("")
        print("=" * 80)
        print("ERROR Summary:")
        print("=" * 80)
        for msg in _errors:
            _print_error(msg)


atexit.register(_exit_handler)


def info(msg: str) -> None:
    print("INFO: " + msg, flush=True)


def warning(msg: str) -> None:
    _print_warning("WARN: " + msg)
    global _warnings
    _warnings += [msg]


def error(msg: str) -> None:
    _print_error("ERROR: " + msg)
    global _errors
    _errors += [msg]


def exception(e: BaseException) -> None:
    logging.exception(e)
    global _errors
    _errors += [str(e)]


def _log_cmd(msg: str) -> None:
    if re.search(_error_pattern, msg):
        if not re.search(_ignore_pattern, msg):
            _print_error(msg)
            global _errors
            _errors += [msg]
            return
    elif re.search(_warning_pattern, msg):
        if not re.search(_ignore_pattern, msg):
            _print_warning(msg)
            global _warnings
            _warnings += [msg]
            return

    print(msg.encode("ascii", errors="replace").decode("ascii"), flush=True)


# Command execution
class HiddenArgument:
    def __init__(self, value: Union[str, pathlib.Path]):
        self.value = value


CmdArgument = Union[str, pathlib.Path, HiddenArgument, Any]
CmdList = List[CmdArgument]
CmdSequence = Sequence[CmdArgument]
CmdFilterOutput = Optional[Callable[[str], Optional[str]]]
CmdEnvironment = Optional[Dict[str, str]]


def _prepare_call(
    cmd: CmdSequence, dry_run: bool = False
) -> Sequence[Union[str, pathlib.Path]]:
    real_cmd: List[Union[str, pathlib.Path]] = []
    log_cmd: List[str] = []

    for arg in cmd:
        if isinstance(arg, HiddenArgument):
            real_cmd += [arg.value]
        else:
            log_cmd += [str(arg)]
            real_cmd += [arg]

    if dry_run:
        info(f"Dry run command in path [{os.getcwd()}]")
    else:
        info(f"Run command in path [{os.getcwd()}]")
    _print_cmd(" ".join(log_cmd))

    return real_cmd


def call(
    cmd: CmdSequence,
    env: CmdEnvironment = None,
    exit_on_error: bool = True,
    filter_output: CmdFilterOutput = None,
    retry_count: int = 0,
    retry_wait_time: float = 1.0,
    dry_run: bool = False,
) -> int:
    cmd = _prepare_call(cmd, dry_run)
    if dry_run:
        return 0

    for try_count in range(0, retry_count + 1):
        # Flush to ensure correct order output on Windows.
        sys.stdout.flush()
        sys.stderr.flush()

        proc = subprocess.Popen(
            cmd,
            env=env,
            bufsize=1,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True,
            encoding="utf-8",
            errors="ignore",
        )
        while True:
            if not proc.stdout:
                break

            line = proc.stdout.readline()
            if line:
                line_str = line.strip("\n\r")
                if filter_output:
                    filter_output(line_str)
                else:
                    pass
                if line_str:
                    _log_cmd(line_str)
            else:
                break

        proc.communicate()

        if proc.returncode == 0:
            return 0

        if try_count == retry_count:
            if exit_on_error:
                sys.exit(proc.returncode)
            return proc.returncode
        else:
            warning("Command failed, retrying")
            time.sleep(retry_wait_time)

    return -1


def check_output(cmd: CmdSequence, exit_on_error: bool = True) -> str:
    cmd = _prepare_call(cmd)

    # Flush to ensure correct order output on Windows.
    sys.stdout.flush()
    sys.stderr.flush()

    try:
        output = subprocess.check_output(
            cmd, stderr=subprocess.STDOUT, universal_newlines=True
        )
    except subprocess.CalledProcessError as e:
        if exit_on_error:
            sys.exit(e.returncode)
        output = ""

    return output.strip()


def call_pipenv(
    cmd: CmdSequence, filter_output: CmdFilterOutput = None, dry_run: bool = False
) -> int:
    cmd_prefix: CmdList = ["pipenv"]
    return call(cmd_prefix + list(cmd), filter_output=filter_output, dry_run=dry_run)


def call_ssh(connect_id: str, cmd: CmdSequence, dry_run: bool = False) -> int:
    ssh_cmd = [
        "ssh",
        "-o",
        "ConnectTimeout=20",
        HiddenArgument(connect_id),
        " ".join([str(arg) for arg in cmd]),
    ]
    return call(ssh_cmd, retry_count=3, dry_run=dry_run)


def rsync(
    source_path: Union[pathlib.Path, str],
    dest_path: Union[pathlib.Path, str],
    exclude_paths: Sequence[str] = [],
    include_paths: Sequence[str] = [],
    change_modes: Sequence[str] = [],
    change_owner: Optional[str] = None,
    show_names: bool = False,
    delete: bool = False,
    delete_path_check: Optional[str] = None,
    dry_run: bool = False,
    port: int = 22,
    retry_count: int = 3,
) -> int:
    # Extra check on path, because delete is risky if pointed at a
    # root folder that contains other data.
    if delete:
        if not delete_path_check:
            raise Exception("Rsync: delete requires delete_path_check")
        if str(dest_path).find(delete_path_check) == -1:
            raise Exception("Rsync: remote path must contain '{delete_path_check}'")

    if show_names:
        pass

    cmd: List[Union[str, pathlib.Path, HiddenArgument]] = [
        "rsync",
        # SSH options
        "-e",
        f"ssh -o ConnectTimeout=20 -p {port}",
        # The -rlpgoDv options below are equivalent to --archive apart from updating
        # the timestamp of the files on the receiving side. This should prevent them
        # from getting marked for zfs-snapshots.
        "--timeout=60",
        "--checksum",
        "-rlpgoDv",
        "--partial",
    ]
    if change_owner:
        cmd += [f"--chown={change_owner}"]
    if delete:
        cmd += ["--delete"]
    # cmd += [f"--info={info_options}"]
    cmd += [f"--include={item}" for item in include_paths]
    cmd += [f"--exclude={item}" for item in exclude_paths]
    cmd += [f"--chmod={item}" for item in change_modes]

    cmd += [source_path]
    cmd += [HiddenArgument(dest_path)]

    return call(cmd, retry_count=retry_count, dry_run=dry_run)


def move(path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False) -> None:
    if dry_run:
        return
    # str() works around typing bug in Python 3.6.
    shutil.move(str(path_from), path_to)


def copy_dir(
    path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False
) -> None:
    if dry_run:
        return
    shutil.copytree(path_from, path_to)


def copy_file(
    path_from: pathlib.Path, path_to: pathlib.Path, dry_run: bool = False
) -> None:
    if dry_run:
        return
    shutil.copy2(path_from, path_to)


def remove_file(
    path: pathlib.Path,
    retry_count: int = 3,
    retry_wait_time: float = 5.0,
    dry_run: bool = False,
) -> None:
    if not path.exists():
        return
    if dry_run:
        info(f"Removing {path} (dry run)")
        return

    info(f"Removing {path}")
    for try_count in range(retry_count):
        try:
            if path.exists():
                path.unlink()
            return
        except FileNotFoundError:
            # File was already removed by another process.
            return
        except PermissionError as e:
            warning(f"Permission error when removing {path}: {e}")
            time.sleep(retry_wait_time)
        except OSError as e:
            warning(f"OS error when removing {path}: {e}")
            time.sleep(retry_wait_time)

    # Final attempt outside the retry loop
    try:
        if path.exists():
            path.unlink()
    except FileNotFoundError:
        pass
    except PermissionError as e:
        error(f"Failed to remove {path} due to permission issues: {e}")
    except OSError as e:
        error(f"Failed to remove {path} after retries due to OS error: {e}")


# Retry several times by default, giving it a chance for possible antivirus to release
# a lock on files in the build folder. Happened for example with MSI files on Windows.
def remove_dir(
    path: pathlib.Path, retry_count: int = 3, retry_wait_time: float = 5.0
) -> None:
    for try_count in range(retry_count):
        try:
            if path.exists():
                shutil.rmtree(path)
            return  # Successfully removed, no need to retry
        except PermissionError as e:
            if platform.system().lower() == "windows":
                # Debugging access denied errors on Windows
                if path.name == "build_package":
                    info("Removal of package artifacts folder failed. Investigating...")
                    msi_path = (
                        path
                        / "_CPack_Packages"
                        / "Windows"
                        / "WIX"
                        / "blender-windows64.msi"
                    )
                    if msi_path.exists():
                        info(f"Information about [{msi_path}]")
                        call(["handle64", msi_path], exit_on_error=False)
                        call(
                            ["pwsh", "-command", f"Get-Item {msi_path} | Format-List"],
                            exit_on_error=False,
                        )
                        call(
                            ["pwsh", "-command", f"Get-Acl {msi_path} | Format-List"],
                            exit_on_error=False,
                        )
                    else:
                        info(f"MSI package file [{msi_path}] does not exist")
            warning(f"Permission error when removing {path}: {e}")
            time.sleep(retry_wait_time)
        except FileNotFoundError:
            # The directory is already gone; no action needed.
            return
        except OSError as e:
            warning(f"OS error when attempting to remove {path}: {e}")
            time.sleep(retry_wait_time)

    # Final attempt outside of retries
    if path.exists():
        try:
            shutil.rmtree(path)
        except PermissionError as e:
            error(f"Failed to remove {path} due to permission issues: {e}")
        except OSError as e:
            error(f"Failed to remove {path} after retries due to OS error: {e}")


def is_tool(name: Union[str, pathlib.Path]) -> bool:
    """Check whether `name` is on PATH and marked as executable."""
    return shutil.which(name) is not None


# Update source code from git repository.
def update_source(
    app_org: str,
    app_id: str,
    code_path: pathlib.Path,
    branch_id: str = "main",
    patch_id: Optional[str] = None,
    commit_id: Optional[str] = None,
    update_submodules: bool = False,
) -> None:
    repo_url = f"https://projects.blender.org/{app_org}/{app_id}.git"

    if not code_path.exists():
        # Clone new
        info(f"Cloning {repo_url}")
        call(["git", "clone", "--progress", repo_url, code_path])
    else:
        for index_lock_path in code_path.rglob(".git/index.lock"):
            warning("Removing git lock, probably left behind by killed git process")
            remove_file(index_lock_path)
        for index_lock_path in (code_path / ".git" / "modules").rglob("index.lock"):
            warning(
                "Removing submodule git lock, probably left behind by killed git process"
            )
            remove_file(index_lock_path)

    os.chdir(code_path)

    # Fix error: "fatal: bad object refs/remotes/origin/HEAD"
    call(["git", "remote", "set-head", "origin", "--auto"])

    # Change to new Gitea URL.
    call(["git", "remote", "set-url", "origin", repo_url])
    call(["git", "submodule", "sync"])

    # Fetch and clean
    call(["git", "fetch", "origin", "--prune"])
    call(["git", "clean", "-f", "-d"])
    call(["git", "reset", "--hard"])

    rebase_merge_path = code_path / ".git" / "rebase-merge"
    if rebase_merge_path.exists():
        info(f"Path {rebase_merge_path} exists, removing !")
        shutil.rmtree(rebase_merge_path)

    if patch_id:
        # Pull request.
        pull_request_id = patch_id
        branch_name = f"PR{pull_request_id}"

        # Checkout pull request into PR123 branch.
        call(["git", "checkout", "main"])
        call(
            [
                "git",
                "fetch",
                "-f",
                "origin",
                f"pull/{pull_request_id}/head:{branch_name}",
            ]
        )
        call(["git", "checkout", branch_name])

        if commit_id and (commit_id != "HEAD"):
            call(["git", "reset", "--hard", commit_id])
    else:
        # Branch.
        call(["git", "checkout", branch_id])

        if commit_id and (commit_id != "HEAD"):
            call(["git", "reset", "--hard", commit_id])
        else:
            call(["git", "reset", "--hard", "origin/" + branch_id])

    if update_submodules:
        call(["git", "submodule", "init"])

    # Resolve potential issues with submodules even if other code
    # is responsible for updating them.
    call(["git", "submodule", "foreach", "git", "clean", "-f", "-d"])
    call(["git", "submodule", "foreach", "git", "reset", "--hard"])

    if update_submodules:
        call(["git", "submodule", "update"])


# Workaroud missing type info in 3.8.
if sys.version_info >= (3, 9):
    BuilderSteps = OrderedDict[str, Callable[[Any], None]]
else:
    BuilderSteps = Any


class Builder:
    def __init__(self, args: argparse.Namespace, app_org: str, app_id: str):
        self.service_env_id = args.service_env_id
        self.track_id = args.track_id
        self.branch_id = args.branch_id
        self.patch_id = args.patch_id
        self.commit_id = args.commit_id
        self.platform = platform.system().lower()
        self.architecture = platform.machine().lower()
        self.app_org = app_org
        self.app_id = app_id

        if not self.branch_id:
            branches_config = self.get_branches_config()
            self.branch_id = branches_config.track_code_branches[self.track_id]

        self.tracks_root_path = self.get_worker_config().tracks_root_path
        self.track_path = self.tracks_root_path / (self.app_id + "-" + self.track_id)
        self.code_path = self.track_path / (self.app_id + ".git")

        info(f"Setting up builder paths from [{self.track_path}]")

    def setup_track_path(self) -> None:
        # Create track directory if doesn't exist already.
        os.makedirs(self.track_path, exist_ok=True)
        os.chdir(self.track_path)

        # Clean up any existing pipenv files.
        remove_file(self.track_path / "Pipfile")
        remove_file(self.track_path / "Pipfile.lock")
        remove_file(self.code_path / "Pipfile")
        remove_file(self.code_path / "Pipfile.lock")

    def update_source(self, update_submodules: bool = False) -> None:
        update_source(
            self.app_org,
            self.app_id,
            self.code_path,
            branch_id=self.branch_id,
            patch_id=self.patch_id,
            commit_id=self.commit_id,
            update_submodules=update_submodules,
        )

    def run(self, step: str, steps: BuilderSteps) -> None:
        try:
            if step == "all":
                for func in steps.values():
                    func(self)
            else:
                steps[step](self)
        except Exception as e:
            exception(e)
            sys.exit(1)

    def get_worker_config(self) -> Any:
        import conf.worker

        return conf.worker.get_config(self.service_env_id)

    def get_branches_config(self) -> Any:
        import conf.branches

        return conf.branches


def create_argument_parser(steps: BuilderSteps) -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser()
    parser.add_argument("--service-env-id", type=str, required=False, default="LOCAL")
    parser.add_argument("--track-id", default="vdev", type=str, required=False)
    parser.add_argument("--branch-id", default="", type=str, required=False)
    parser.add_argument("--patch-id", default="", type=str, required=False)
    parser.add_argument("--commit-id", default="", type=str, required=False)
    all_steps = list(steps.keys()) + ["all"]
    parser.add_argument("step", choices=all_steps)
    return parser