482 lines
17 KiB
Python
482 lines
17 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Beautify Audio CLI.
|
||
|
|
|
||
|
|
Pipeline:
|
||
|
|
- Convert any supported audio file to a 320 kbps CBR MP3 (optional LUFS loudness correction).
|
||
|
|
- Strip existing metadata.
|
||
|
|
- Tag the file with OneTagger using the bundled AutoTagger profile (tokens/creds can be passed in).
|
||
|
|
- Rename the output using tagged artist/title (with fallbacks).
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import shutil
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
import tempfile
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
# Keep this script self-contained except for the OneTagger integration helper.
|
||
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||
|
|
if str(REPO_ROOT) not in sys.path:
|
||
|
|
sys.path.insert(0, str(REPO_ROOT))
|
||
|
|
|
||
|
|
try:
|
||
|
|
import mutagen # type: ignore
|
||
|
|
except ImportError:
|
||
|
|
mutagen = None
|
||
|
|
|
||
|
|
from onetagger_integration import ( # type: ignore
|
||
|
|
OneTaggerCliNotFoundError,
|
||
|
|
tag_audio_with_onetagger,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Beautify Audio: convert, clean, tag, and rename a single audio file."
|
||
|
|
)
|
||
|
|
parser.add_argument("input_file", type=Path, help="Path to the source audio file")
|
||
|
|
parser.add_argument(
|
||
|
|
"--lufs",
|
||
|
|
type=float,
|
||
|
|
default=-11.5,
|
||
|
|
help="Integrated loudness target (LUFS). Valid range: -16 to 0. Default: -11.5",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--no-loudness",
|
||
|
|
action="store_true",
|
||
|
|
help="Disable loudness correction (conversion still runs at 320 kbps CBR).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--output-dir",
|
||
|
|
type=Path,
|
||
|
|
help="Output directory (defaults to the input file's directory).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--onetagger-cli",
|
||
|
|
type=Path,
|
||
|
|
help="Path to onetagger-cli. Defaults to PATH lookup or the script folder copy.",
|
||
|
|
)
|
||
|
|
parser.add_argument("--artist", help="Optional artist name to help with renaming.")
|
||
|
|
parser.add_argument("--title", help="Optional title to help with renaming.")
|
||
|
|
parser.add_argument("--album", help="Optional album name (passed to OneTagger config copy).")
|
||
|
|
parser.add_argument(
|
||
|
|
"--discogs-token",
|
||
|
|
default=None,
|
||
|
|
help="Discogs token for OneTagger (CLI overrides auth file/env).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--spotify-id",
|
||
|
|
default=None,
|
||
|
|
help="Spotify client ID for OneTagger (CLI overrides auth file/env).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--spotify-secret",
|
||
|
|
default=None,
|
||
|
|
help="Spotify client secret for OneTagger (CLI overrides auth file/env).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--auth-file",
|
||
|
|
type=Path,
|
||
|
|
help="Optional JSON file with discogs_token/spotify_id/spotify_secret. Defaults to auth.json next to the script if present.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--skip-onetagger",
|
||
|
|
action="store_true",
|
||
|
|
help="Skip the OneTagger tagging step (conversion + cleanup still happen).",
|
||
|
|
)
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def validate_inputs(args: argparse.Namespace) -> tuple[Path, Path]:
|
||
|
|
src = args.input_file.expanduser().resolve()
|
||
|
|
if not src.exists() or not src.is_file():
|
||
|
|
raise FileNotFoundError(f"Input file not found: {src}")
|
||
|
|
if not (-16.0 <= args.lufs <= 0.0):
|
||
|
|
raise ValueError("LUFS must be between -16.0 and 0.0")
|
||
|
|
out_dir = args.output_dir.expanduser().resolve() if args.output_dir else src.parent
|
||
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
return src, out_dir
|
||
|
|
|
||
|
|
|
||
|
|
def next_available_path(path: Path) -> Path:
|
||
|
|
"""Return a unique path by appending '-<n>' before the suffix if needed."""
|
||
|
|
if not path.exists():
|
||
|
|
return path
|
||
|
|
stem = path.stem
|
||
|
|
suffix = path.suffix
|
||
|
|
counter = 1
|
||
|
|
while True:
|
||
|
|
candidate = path.with_name(f"{stem}-{counter}{suffix}")
|
||
|
|
if not candidate.exists():
|
||
|
|
return candidate
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
|
||
|
|
def find_onetagger_cli(user_supplied: Path | None) -> Path | None:
|
||
|
|
"""Resolve onetagger-cli from user input, script folder, or PATH."""
|
||
|
|
candidates: list[Path] = []
|
||
|
|
if user_supplied:
|
||
|
|
candidates.append(user_supplied)
|
||
|
|
script_dir = Path(__file__).resolve().parent
|
||
|
|
candidates.append(script_dir / "onetagger-cli")
|
||
|
|
found_in_path = shutil.which("onetagger-cli")
|
||
|
|
if found_in_path:
|
||
|
|
candidates.append(Path(found_in_path))
|
||
|
|
|
||
|
|
for cand in candidates:
|
||
|
|
if cand and cand.exists():
|
||
|
|
if os.access(cand, os.X_OK):
|
||
|
|
return cand.resolve()
|
||
|
|
raise OneTaggerCliNotFoundError(f"onetagger-cli is not executable: {cand}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def convert_to_mp3(
|
||
|
|
src: Path,
|
||
|
|
dest: Path,
|
||
|
|
loudness_enabled: bool,
|
||
|
|
target_lufs: float,
|
||
|
|
) -> Path:
|
||
|
|
"""Convert to 320 kbps CBR MP3 with optional loudness correction."""
|
||
|
|
if not shutil.which("ffmpeg"):
|
||
|
|
raise RuntimeError("ffmpeg not found in PATH. Please install ffmpeg.")
|
||
|
|
cmd = ["ffmpeg", "-y", "-i", str(src), "-vn"]
|
||
|
|
if loudness_enabled:
|
||
|
|
cmd += ["-af", f"loudnorm=I={target_lufs}:TP=-1.5:LRA=11"]
|
||
|
|
cmd += ["-c:a", "libmp3lame", "-b:a", "320k", "-ar", "44100", "-ac", "2", str(dest)]
|
||
|
|
print(f"🔊 Converting → {dest}")
|
||
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
|
if result.returncode != 0:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"ffmpeg failed (exit {result.returncode}).\nSTDERR:\n{result.stderr or result.stdout}"
|
||
|
|
)
|
||
|
|
return dest
|
||
|
|
|
||
|
|
|
||
|
|
def read_simple_tags(path: Path) -> tuple[str | None, str | None, str | None]:
|
||
|
|
"""Read (title, artist, album) tags using mutagen if available."""
|
||
|
|
if mutagen is None:
|
||
|
|
return None, None, None
|
||
|
|
try:
|
||
|
|
audio = mutagen.File(str(path), easy=True)
|
||
|
|
if not audio:
|
||
|
|
return None, None, None
|
||
|
|
title = audio.get("title", [None])[0]
|
||
|
|
artist = audio.get("artist", [None])[0]
|
||
|
|
album = audio.get("album", [None])[0]
|
||
|
|
return title, artist, album
|
||
|
|
except Exception:
|
||
|
|
return None, None, None
|
||
|
|
|
||
|
|
|
||
|
|
def parse_filename_artist_title(path: Path) -> tuple[str | None, str | None]:
|
||
|
|
"""If filename looks like 'Artist - Title', split and return parts."""
|
||
|
|
stem = path.stem
|
||
|
|
if stem.count("-") != 1:
|
||
|
|
return None, None
|
||
|
|
artist, title = [part.strip() for part in stem.split("-", 1)]
|
||
|
|
return (artist or None, title or None)
|
||
|
|
|
||
|
|
|
||
|
|
def collect_seed_metadata(
|
||
|
|
path: Path, cli_artist: str | None, cli_title: str | None, cli_album: str | None
|
||
|
|
) -> dict[str, str | None]:
|
||
|
|
"""Choose the best available artist/title/album for tagging and renaming."""
|
||
|
|
tag_title, tag_artist, tag_album = read_simple_tags(path)
|
||
|
|
fname_artist, fname_title = parse_filename_artist_title(path)
|
||
|
|
|
||
|
|
artist = cli_artist or tag_artist or fname_artist
|
||
|
|
title = cli_title or tag_title or fname_title
|
||
|
|
album = cli_album or tag_album
|
||
|
|
|
||
|
|
return {
|
||
|
|
"artist": artist,
|
||
|
|
"title": title,
|
||
|
|
"album": album,
|
||
|
|
"fname_artist": fname_artist,
|
||
|
|
"fname_title": fname_title,
|
||
|
|
"tag_artist": tag_artist,
|
||
|
|
"tag_title": tag_title,
|
||
|
|
"tag_album": tag_album,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def load_auth_credentials(auth_file: Path | None) -> dict[str, str]:
|
||
|
|
"""Load auth data from JSON if present; keys: discogs_token, spotify_id, spotify_secret."""
|
||
|
|
data = {"discogs_token": "", "spotify_id": "", "spotify_secret": ""}
|
||
|
|
candidate = auth_file
|
||
|
|
if candidate and candidate.exists():
|
||
|
|
try:
|
||
|
|
with candidate.open("r", encoding="utf-8") as f:
|
||
|
|
parsed = json.load(f)
|
||
|
|
data["discogs_token"] = parsed.get("discogs_token", "") or ""
|
||
|
|
data["spotify_id"] = parsed.get("spotify_id", "") or ""
|
||
|
|
data["spotify_secret"] = parsed.get("spotify_secret", "") or ""
|
||
|
|
print(f"🔑 Loaded auth from {candidate}")
|
||
|
|
except Exception as exc:
|
||
|
|
print(f"⚠️ Failed to read auth file {candidate}: {exc}")
|
||
|
|
return data
|
||
|
|
|
||
|
|
|
||
|
|
def strip_all_metadata(path: Path) -> None:
|
||
|
|
"""Remove all tags from the file in-place."""
|
||
|
|
if mutagen is None:
|
||
|
|
print("⚠️ mutagen not installed; skipping metadata strip.")
|
||
|
|
return
|
||
|
|
try:
|
||
|
|
audio = mutagen.File(str(path))
|
||
|
|
if audio is None:
|
||
|
|
return
|
||
|
|
if hasattr(audio, "delete"):
|
||
|
|
audio.delete()
|
||
|
|
elif audio.tags is not None:
|
||
|
|
audio.tags.clear()
|
||
|
|
audio.save()
|
||
|
|
print("🧹 Stripped existing metadata.")
|
||
|
|
except Exception as exc: # pragma: no cover - defensive
|
||
|
|
print(f"⚠️ Failed to strip metadata: {exc}")
|
||
|
|
|
||
|
|
|
||
|
|
def prepare_onetagger_config(
|
||
|
|
base_config: Path,
|
||
|
|
discogs_token: str,
|
||
|
|
spotify_id: str,
|
||
|
|
spotify_secret: str,
|
||
|
|
album: str | None = None,
|
||
|
|
seed_artist: str | None = None,
|
||
|
|
seed_title: str | None = None,
|
||
|
|
) -> Path:
|
||
|
|
"""Copy the base config and inject credentials when provided."""
|
||
|
|
with base_config.open("r", encoding="utf-8") as f:
|
||
|
|
cfg = json.load(f)
|
||
|
|
|
||
|
|
cfg.setdefault("custom", {})
|
||
|
|
cfg.setdefault("platforms", cfg.get("platforms") or [])
|
||
|
|
|
||
|
|
if discogs_token:
|
||
|
|
cfg["custom"].setdefault("discogs", {})["token"] = discogs_token
|
||
|
|
if "discogs" not in cfg["platforms"]:
|
||
|
|
cfg["platforms"].append("discogs")
|
||
|
|
if spotify_id and spotify_secret:
|
||
|
|
cfg["spotify"] = {"clientId": spotify_id, "clientSecret": spotify_secret}
|
||
|
|
if "spotify" not in cfg["platforms"]:
|
||
|
|
cfg["platforms"].append("spotify")
|
||
|
|
|
||
|
|
defaults = cfg.setdefault("custom", {}).setdefault("defaults", {})
|
||
|
|
if album:
|
||
|
|
defaults["album"] = album
|
||
|
|
if seed_artist:
|
||
|
|
defaults["artist"] = seed_artist
|
||
|
|
if seed_title:
|
||
|
|
defaults["title"] = seed_title
|
||
|
|
|
||
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json", prefix="onetagger_cfg_")
|
||
|
|
with open(tmp.name, "w", encoding="utf-8") as f:
|
||
|
|
json.dump(cfg, f, indent=2)
|
||
|
|
return Path(tmp.name)
|
||
|
|
|
||
|
|
|
||
|
|
def run_onetagger(
|
||
|
|
audio_path: Path,
|
||
|
|
cli_path: Path,
|
||
|
|
base_config: Path,
|
||
|
|
discogs_token: str,
|
||
|
|
spotify_id: str,
|
||
|
|
spotify_secret: str,
|
||
|
|
album: str | None,
|
||
|
|
seed_artist: str | None,
|
||
|
|
seed_title: str | None,
|
||
|
|
) -> bool:
|
||
|
|
cfg_copy = prepare_onetagger_config(
|
||
|
|
base_config,
|
||
|
|
discogs_token,
|
||
|
|
spotify_id,
|
||
|
|
spotify_secret,
|
||
|
|
album,
|
||
|
|
seed_artist,
|
||
|
|
seed_title,
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
res = tag_audio_with_onetagger(audio_file=audio_path, onetagger_cli_path=cli_path, config_path=cfg_copy)
|
||
|
|
if res.success:
|
||
|
|
print("🪪 OneTagger tagging succeeded.")
|
||
|
|
return True
|
||
|
|
print(f"⚠️ OneTagger tagging failed: {res.stderr or 'unknown error'}")
|
||
|
|
return False
|
||
|
|
except OneTaggerCliNotFoundError as exc:
|
||
|
|
print(f"⚠️ {exc}")
|
||
|
|
return False
|
||
|
|
except Exception as exc: # pragma: no cover - defensive
|
||
|
|
print(f"⚠️ OneTagger error: {exc}")
|
||
|
|
return False
|
||
|
|
finally:
|
||
|
|
try:
|
||
|
|
cfg_copy.unlink()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def build_safe_basename(path: Path, fallback_artist: str | None, fallback_title: str | None) -> str:
|
||
|
|
"""Derive a sanitized base filename using tags when available."""
|
||
|
|
tag_title, tag_artist, _ = read_simple_tags(path)
|
||
|
|
artist = tag_artist or fallback_artist
|
||
|
|
title = tag_title or fallback_title
|
||
|
|
if artist and title:
|
||
|
|
base = f"{artist} - {title}"
|
||
|
|
elif artist:
|
||
|
|
base = f"{artist} - {path.stem}"
|
||
|
|
elif title:
|
||
|
|
base = f"{path.stem} - {title}"
|
||
|
|
else:
|
||
|
|
base = path.stem
|
||
|
|
return re.sub(r"[\\/:*?\"<>|]", "_", base).strip("_ ").strip()
|
||
|
|
|
||
|
|
|
||
|
|
def rename_output(
|
||
|
|
path: Path,
|
||
|
|
output_dir: Path,
|
||
|
|
fallback_artist: str | None,
|
||
|
|
fallback_title: str | None,
|
||
|
|
) -> Path:
|
||
|
|
base = build_safe_basename(path, fallback_artist, fallback_title)
|
||
|
|
desired = output_dir / f"{base}{path.suffix}"
|
||
|
|
if desired.resolve() == path.resolve():
|
||
|
|
return path
|
||
|
|
dest = next_available_path(desired)
|
||
|
|
if dest.resolve() == path.resolve():
|
||
|
|
return path
|
||
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
path.rename(dest)
|
||
|
|
print(f"📝 Renamed to {dest.name}")
|
||
|
|
return dest
|
||
|
|
|
||
|
|
|
||
|
|
def rename_for_tagging(path: Path, output_dir: Path, artist: str | None, title: str | None) -> Path:
|
||
|
|
"""Rename the working file so OneTagger sees a helpful filename."""
|
||
|
|
if not artist or not title:
|
||
|
|
return path
|
||
|
|
desired = output_dir / f"{artist} - {title}{path.suffix}"
|
||
|
|
if desired.resolve() == path.resolve():
|
||
|
|
return path
|
||
|
|
target = next_available_path(desired)
|
||
|
|
if target.resolve() == path.resolve():
|
||
|
|
return path
|
||
|
|
try:
|
||
|
|
path.rename(target)
|
||
|
|
print(f"🏷️ Using filename hint for tagging: {target.name}")
|
||
|
|
return target
|
||
|
|
except Exception:
|
||
|
|
return path
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
args = parse_args()
|
||
|
|
src, out_dir = validate_inputs(args)
|
||
|
|
seed_info = collect_seed_metadata(src, args.artist, args.title, args.album)
|
||
|
|
album = args.album or seed_info.get("tag_album")
|
||
|
|
|
||
|
|
primary_artist = args.artist or seed_info.get("tag_artist") or seed_info.get("fname_artist")
|
||
|
|
primary_title = args.title or seed_info.get("tag_title") or seed_info.get("fname_title")
|
||
|
|
|
||
|
|
secondary_artist = None
|
||
|
|
secondary_title = None
|
||
|
|
if seed_info.get("fname_artist") and seed_info.get("fname_title"):
|
||
|
|
if (seed_info.get("fname_artist") != primary_artist) or (seed_info.get("fname_title") != primary_title):
|
||
|
|
secondary_artist = seed_info.get("fname_artist")
|
||
|
|
secondary_title = seed_info.get("fname_title")
|
||
|
|
|
||
|
|
# Derive the initial output path with collision avoidance
|
||
|
|
base_target = out_dir / f"{src.stem}.mp3"
|
||
|
|
target_mp3 = next_available_path(base_target)
|
||
|
|
|
||
|
|
loudness_on = not args.no_loudness
|
||
|
|
print(
|
||
|
|
f"🚀 Beautifying '{src.name}' → '{target_mp3.name}' "
|
||
|
|
f"(loudness {'on' if loudness_on else 'off'}, target {args.lufs} LUFS)"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Auth resolution precedence:
|
||
|
|
# 1) CLI flags
|
||
|
|
# 2) Explicit auth file (--auth-file)
|
||
|
|
# 3) Environment variables
|
||
|
|
# 4) Default auth.json next to the script (only if nothing else provided)
|
||
|
|
explicit_auth = load_auth_credentials(args.auth_file)
|
||
|
|
discogs_token = args.discogs_token or explicit_auth["discogs_token"] or os.environ.get("DISCOGS_TOKEN", "")
|
||
|
|
spotify_id = args.spotify_id or explicit_auth["spotify_id"] or os.environ.get("SPOTIFY_CLIENT_ID", "")
|
||
|
|
spotify_secret = args.spotify_secret or explicit_auth["spotify_secret"] or os.environ.get("SPOTIFY_CLIENT_SECRET", "")
|
||
|
|
|
||
|
|
if (
|
||
|
|
not args.auth_file
|
||
|
|
and not discogs_token
|
||
|
|
and not spotify_id
|
||
|
|
and not spotify_secret
|
||
|
|
):
|
||
|
|
default_auth_path = Path(__file__).resolve().parent / "auth.json"
|
||
|
|
default_auth = load_auth_credentials(default_auth_path)
|
||
|
|
discogs_token = default_auth["discogs_token"]
|
||
|
|
spotify_id = default_auth["spotify_id"]
|
||
|
|
spotify_secret = default_auth["spotify_secret"]
|
||
|
|
|
||
|
|
converted = convert_to_mp3(src, target_mp3, loudness_on, args.lufs)
|
||
|
|
strip_all_metadata(converted)
|
||
|
|
|
||
|
|
working_file = rename_for_tagging(converted, out_dir, primary_artist, primary_title)
|
||
|
|
tagged = working_file
|
||
|
|
tag_success = False
|
||
|
|
cli_path = None
|
||
|
|
if not args.skip_onetagger:
|
||
|
|
cli_path = find_onetagger_cli(args.onetagger_cli)
|
||
|
|
cfg_path = Path(__file__).resolve().parent / "onetagger-autotagger.json"
|
||
|
|
if cli_path and cfg_path.exists():
|
||
|
|
tag_success = run_onetagger(
|
||
|
|
audio_path=working_file,
|
||
|
|
cli_path=cli_path,
|
||
|
|
base_config=cfg_path,
|
||
|
|
discogs_token=discogs_token,
|
||
|
|
spotify_id=spotify_id,
|
||
|
|
spotify_secret=spotify_secret,
|
||
|
|
album=album,
|
||
|
|
seed_artist=primary_artist,
|
||
|
|
seed_title=primary_title,
|
||
|
|
)
|
||
|
|
if not tag_success and secondary_artist and secondary_title:
|
||
|
|
working_file = rename_for_tagging(working_file, out_dir, secondary_artist, secondary_title)
|
||
|
|
tagged = working_file
|
||
|
|
tag_success = run_onetagger(
|
||
|
|
audio_path=working_file,
|
||
|
|
cli_path=cli_path,
|
||
|
|
base_config=cfg_path,
|
||
|
|
discogs_token=discogs_token,
|
||
|
|
spotify_id=spotify_id,
|
||
|
|
spotify_secret=spotify_secret,
|
||
|
|
album=album,
|
||
|
|
seed_artist=secondary_artist,
|
||
|
|
seed_title=secondary_title,
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
print("⚠️ OneTagger skipped (binary or config not found).")
|
||
|
|
else:
|
||
|
|
print("⏭️ Skipping OneTagger tagging step as requested.")
|
||
|
|
|
||
|
|
fallback_artist = primary_artist or secondary_artist
|
||
|
|
fallback_title = primary_title or secondary_title
|
||
|
|
final_path = rename_output(tagged, out_dir, fallback_artist, fallback_title)
|
||
|
|
print(f"✅ Done: {final_path}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
try:
|
||
|
|
main()
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("\nAborted by user.")
|
||
|
|
sys.exit(1)
|
||
|
|
except Exception as exc:
|
||
|
|
print(f"❌ Error: {exc}")
|
||
|
|
sys.exit(1)
|