269 lines
9.1 KiB
Python
Executable file
269 lines
9.1 KiB
Python
Executable file
#!/usr/bin/env -S uv run
|
|
# /// script
|
|
# requires-python = ">=3.15"
|
|
# dependencies = [
|
|
# "python-hcl2==4.*",
|
|
# "requests==2.*",
|
|
# ]
|
|
# ///
|
|
|
|
# See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference
|
|
# https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from pathlib import Path
|
|
|
|
import hcl2
|
|
import requests
|
|
|
|
|
|
def load_hcl(path: Path) -> dict:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
return hcl2.load(handle)
|
|
|
|
|
|
def get_variable_default(hcl_data: dict, name: str) -> str | None:
|
|
for variable_block in hcl_data.get("variable", []):
|
|
if name in variable_block:
|
|
return variable_block[name].get("default")
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Unlock LUKS after install via Proxmox API (setup stage)."
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--template",
|
|
required=True,
|
|
help="Path to directory containing variables.pkr.hcl (also passed to mise build).",
|
|
)
|
|
parser.add_argument(
|
|
"--luks-wait-seconds",
|
|
type=int,
|
|
default=45,
|
|
help="Seconds to wait before sending the LUKS password (default: 45).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
script_root = Path(__file__).resolve().parents[1]
|
|
variables_common_path = script_root / "variables-common.pkr.hcl"
|
|
credentials_path = script_root / "credentials.auto.pkrvars.hcl"
|
|
vars_dir = Path(args.template)
|
|
if not vars_dir.is_absolute():
|
|
vars_dir = script_root / vars_dir
|
|
variables_path = vars_dir / "variables.pkr.hcl"
|
|
|
|
variables_common = load_hcl(variables_common_path)
|
|
credentials = load_hcl(credentials_path)
|
|
variables = load_hcl(variables_path)
|
|
|
|
proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url")
|
|
proxmox_skip_tls_verify = (
|
|
get_variable_default(variables_common, "proxmox_skip_tls_verify") or False
|
|
)
|
|
proxmox_node = get_variable_default(variables, "proxmox_node")
|
|
template_vm_id = get_variable_default(variables, "template_vm_id")
|
|
|
|
_ = proxmox_api_url, proxmox_node, template_vm_id, credentials
|
|
|
|
server_event = threading.Event()
|
|
|
|
class InstallFinishedHandler(BaseHTTPRequestHandler):
|
|
def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler
|
|
if self.path != "/install_finished":
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
self.wfile.write(b"Not Found")
|
|
return
|
|
|
|
_ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
|
|
server_event.set()
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
self.wfile.write(b"OK")
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
return
|
|
|
|
def find_random_port() -> tuple[int, HTTPServer]:
|
|
ports = list(range(10000, 11000))
|
|
random.SystemRandom().shuffle(ports)
|
|
for port in ports:
|
|
try:
|
|
server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler)
|
|
except OSError:
|
|
continue
|
|
return port, server
|
|
raise RuntimeError("No free port found in range 10000-10999")
|
|
|
|
port, httpd = find_random_port()
|
|
|
|
def stream_colors(stream: object) -> tuple[str, str]:
|
|
color = ""
|
|
reset = ""
|
|
is_tty = getattr(stream, "isatty", None)
|
|
if callable(is_tty) and is_tty():
|
|
if stream is sys.stderr:
|
|
color = "\033[31m"
|
|
else:
|
|
color = "\033[36m"
|
|
reset = "\033[0m"
|
|
return color, reset
|
|
|
|
def log(message: str, stream: object = sys.stdout) -> None:
|
|
color, reset = stream_colors(stream)
|
|
stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n")
|
|
stream.flush()
|
|
|
|
def write_status(message: str, stream: object = sys.stdout, *, newline: bool) -> None:
|
|
color, reset = stream_colors(stream)
|
|
is_tty = getattr(stream, "isatty", None)
|
|
prefix = f"{color}[luks-unlock-wrapper] "
|
|
suffix = f"{reset}\n" if newline else reset
|
|
if callable(is_tty) and is_tty():
|
|
stream.write(f"\r\033[2K{prefix}{message}{suffix}")
|
|
else:
|
|
stream.write(f"{prefix}{message}{suffix}")
|
|
stream.flush()
|
|
|
|
def serve() -> None:
|
|
httpd.serve_forever()
|
|
|
|
server_thread = threading.Thread(target=serve, daemon=True)
|
|
server_thread.start()
|
|
|
|
log(f"Listening for POST on /install_finished at port {port}")
|
|
|
|
build_cmd = ["mise", "build", args.template, "-i", str(port)]
|
|
build_proc = subprocess.Popen(
|
|
build_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
|
|
def relay_stream(stream: object, prefix: str, target: object) -> None:
|
|
if not stream:
|
|
return
|
|
for line in stream:
|
|
target.write(f"{prefix} {line}")
|
|
target.flush()
|
|
|
|
stdout_thread = threading.Thread(
|
|
target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True
|
|
)
|
|
stderr_thread = threading.Thread(
|
|
target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True
|
|
)
|
|
stdout_thread.start()
|
|
stderr_thread.start()
|
|
|
|
notified = False
|
|
action_started = False
|
|
|
|
def proxmox_request(method: str, path: str, **kwargs) -> requests.Response:
|
|
if not proxmox_api_url:
|
|
raise RuntimeError("proxmox_api_url not set")
|
|
token_id = credentials.get("proxmox_api_token_id")
|
|
token_secret = credentials.get("proxmox_api_token_secret")
|
|
if not token_id or not token_secret:
|
|
raise RuntimeError("Proxmox API token credentials missing")
|
|
|
|
headers = kwargs.pop("headers", {})
|
|
headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}"
|
|
url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}"
|
|
return requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
verify=not proxmox_skip_tls_verify,
|
|
timeout=30,
|
|
**kwargs,
|
|
)
|
|
|
|
def send_key(key: str) -> None:
|
|
if not proxmox_node or not template_vm_id:
|
|
raise RuntimeError("proxmox_node or template_vm_id not set")
|
|
path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey"
|
|
response = proxmox_request("PUT", path, data={"key": key})
|
|
response.raise_for_status()
|
|
|
|
def handle_install_finished() -> None:
|
|
retry_delay = 1
|
|
while True:
|
|
try:
|
|
response = proxmox_request("GET", "/version")
|
|
response.raise_for_status()
|
|
log("Authenticated to Proxmox VE API.")
|
|
break
|
|
except Exception as exc:
|
|
log(
|
|
f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.",
|
|
stream=sys.stderr,
|
|
)
|
|
time.sleep(retry_delay)
|
|
retry_delay += 1
|
|
|
|
try:
|
|
countdown_seconds = max(0, args.luks_wait_seconds)
|
|
# Braille spinner: 8 dots, one missing dot rotates clockwise.
|
|
full_mask = 0xFF # dots 1-8
|
|
dot_bits = {
|
|
1: 0x01,
|
|
2: 0x02,
|
|
3: 0x04,
|
|
4: 0x08,
|
|
5: 0x10,
|
|
6: 0x20,
|
|
7: 0x40,
|
|
8: 0x80,
|
|
}
|
|
rotation = [1, 4, 5, 6, 8, 7, 3, 2] # clockwise around the cell
|
|
spinner = [chr(0x2800 + (full_mask - dot_bits[dot])) for dot in rotation]
|
|
for remaining in range(countdown_seconds, -1, -1):
|
|
minutes, seconds = divmod(remaining, 60)
|
|
countdown = f"{minutes:02d}:{seconds:02d}"
|
|
frame = spinner[(countdown_seconds - remaining) % len(spinner)]
|
|
write_status(f"{frame} {countdown}", newline=False)
|
|
if remaining:
|
|
time.sleep(1)
|
|
write_status(f"{spinner[0]} 00:00", newline=True)
|
|
for char in "packer":
|
|
send_key(char)
|
|
time.sleep(0.1)
|
|
send_key("ret")
|
|
log("Unlocking encrypted disk. Entering LUKS password.")
|
|
except Exception as exc:
|
|
log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr)
|
|
|
|
try:
|
|
while True:
|
|
if server_event.is_set() and not notified:
|
|
log("Installation finished.\nRestarting.")
|
|
notified = True
|
|
if server_event.is_set() and not action_started:
|
|
action_started = True
|
|
threading.Thread(target=handle_install_finished, daemon=True).start()
|
|
if build_proc.poll() is not None:
|
|
break
|
|
server_event.wait(0.2)
|
|
finally:
|
|
httpd.shutdown()
|
|
httpd.server_close()
|
|
|
|
return build_proc.returncode or 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|