packer/_scripts/unlock-luks-after-install.py

282 lines
9.3 KiB
Python
Executable file

#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.15"
# dependencies = [
# "python-hcl2==4.*",
# "requests==2.*",
# "yaspin==3.*",
# ]
# ///
# See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference
# https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey
from __future__ import annotations
import argparse
import random
import subprocess
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
import hcl2
import requests
from yaspin import yaspin
def load_hcl(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
return hcl2.load(handle)
def get_variable_default(hcl_data: dict, name: str) -> str | None:
for variable_block in hcl_data.get("variable", []):
if name in variable_block:
return variable_block[name].get("default")
return None
def main() -> int:
parser = argparse.ArgumentParser(
description="Unlock LUKS after install via Proxmox API (setup stage)."
)
parser.add_argument(
"-t",
"--template",
required=True,
help="Path to directory containing variables.pkr.hcl (also passed to mise build).",
)
parser.add_argument(
"--luks-wait-seconds",
type=int,
default=45,
help="Seconds to wait before sending the LUKS password (default: 45).",
)
args = parser.parse_args()
script_root = Path(__file__).resolve().parents[1]
variables_common_path = script_root / "variables-common.pkr.hcl"
credentials_path = script_root / "credentials.auto.pkrvars.hcl"
vars_dir = Path(args.template)
if not vars_dir.is_absolute():
vars_dir = script_root / vars_dir
variables_path = vars_dir / "variables.pkr.hcl"
variables_common = load_hcl(variables_common_path)
credentials = load_hcl(credentials_path)
variables = load_hcl(variables_path)
proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url")
proxmox_skip_tls_verify = (
get_variable_default(variables_common, "proxmox_skip_tls_verify") or False
)
default_luks_passphrase = get_variable_default(
variables_common, "default_luks_passphrase"
)
proxmox_node = get_variable_default(variables, "proxmox_node")
template_vm_id = get_variable_default(variables, "template_vm_id")
_ = (
proxmox_api_url,
proxmox_node,
template_vm_id,
credentials,
)
server_event = threading.Event()
class InstallFinishedHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler
if self.path != "/install_finished":
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
return
_ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
server_event.set()
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
def log_message(self, format: str, *args: object) -> None:
return
def find_random_port() -> tuple[int, HTTPServer]:
ports = list(range(10000, 11000))
random.SystemRandom().shuffle(ports)
for port in ports:
try:
server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler)
except OSError:
continue
return port, server
raise RuntimeError("No free port found in range 10000-10999")
port, httpd = find_random_port()
def stream_colors(stream: object) -> tuple[str, str]:
color = ""
reset = ""
is_tty = getattr(stream, "isatty", None)
if callable(is_tty) and is_tty():
if stream is sys.stderr:
color = "\033[31m"
else:
color = "\033[36m"
reset = "\033[0m"
return color, reset
def log(message: str, stream: object = sys.stdout) -> None:
color, reset = stream_colors(stream)
stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n")
stream.flush()
def colorize_message(message: str, stream: object = sys.stdout) -> str:
color, reset = stream_colors(stream)
if not color:
return message
return f"{color}{message}{reset}"
def serve() -> None:
httpd.serve_forever()
server_thread = threading.Thread(target=serve, daemon=True)
server_thread.start()
log(f"Listening for POST on /install_finished at port {port}")
build_cmd = ["mise", "build", args.template, "-i", str(port)]
build_proc = subprocess.Popen(
build_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
def relay_stream(stream: object, prefix: str, target: object) -> None:
if not stream:
return
for line in stream:
target.write(f"{prefix} {line}")
target.flush()
stdout_thread = threading.Thread(
target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True
)
stderr_thread = threading.Thread(
target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True
)
stdout_thread.start()
stderr_thread.start()
notified = False
action_started = False
def proxmox_request(method: str, path: str, **kwargs) -> requests.Response:
if not proxmox_api_url:
raise RuntimeError("proxmox_api_url not set")
token_id = credentials.get("proxmox_api_token_id")
token_secret = credentials.get("proxmox_api_token_secret")
if not token_id or not token_secret:
raise RuntimeError("Proxmox API token credentials missing")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}"
url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}"
return requests.request(
method,
url,
headers=headers,
verify=not proxmox_skip_tls_verify,
timeout=30,
**kwargs,
)
def send_key(key: str) -> None:
if not proxmox_node or not template_vm_id:
raise RuntimeError("proxmox_node or template_vm_id not set")
path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey"
response = proxmox_request("PUT", path, data={"key": key})
response.raise_for_status()
def handle_install_finished() -> None:
retry_delay = 1
while True:
try:
response = proxmox_request("GET", "/version")
response.raise_for_status()
log("Authenticated to Proxmox VE API.")
break
except Exception as exc:
log(
f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.",
stream=sys.stderr,
)
time.sleep(retry_delay)
retry_delay += 1
spinner = None
try:
countdown_seconds = max(0, args.luks_wait_seconds)
spinner = yaspin(
text=colorize_message(
f"[luks-unlock-wrapper] {countdown_seconds // 60:02d}:{countdown_seconds % 60:02d}",
sys.stdout,
),
color="cyan",
stream=sys.stdout,
)
spinner.start()
for remaining in range(countdown_seconds, -1, -1):
minutes, seconds = divmod(remaining, 60)
countdown = f"{minutes:02d}:{seconds:02d}"
spinner.text = colorize_message(
f"[luks-unlock-wrapper] {countdown}", sys.stdout
)
if remaining:
time.sleep(1)
if not default_luks_passphrase:
raise RuntimeError("default_luks_passphrase not set")
for char in default_luks_passphrase:
send_key(char)
time.sleep(0.1)
send_key("ret")
spinner.text = colorize_message(
"[luks-unlock-wrapper] ✔ Unlocking encrypted disk. Entering LUKS password.",
sys.stdout,
)
spinner.ok("")
except Exception as exc:
if spinner:
spinner.text = colorize_message(
"[luks-unlock-wrapper] ✗ Post-install actions failed.",
sys.stdout,
)
spinner.fail("")
log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr)
try:
while True:
if server_event.is_set() and not notified:
log("Installation finished. -- Restarting.")
notified = True
if server_event.is_set() and not action_started:
action_started = True
threading.Thread(target=handle_install_finished, daemon=True).start()
if build_proc.poll() is not None:
break
server_event.wait(0.2)
finally:
httpd.shutdown()
httpd.server_close()
return build_proc.returncode or 0
if __name__ == "__main__":
raise SystemExit(main())