Files
beautify-audio/beautify-audio.py

482 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Beautify Audio CLI.
Pipeline:
- Convert any supported audio file to a 320 kbps CBR MP3 (optional LUFS loudness correction).
- Strip existing metadata.
- Tag the file with OneTagger using the bundled AutoTagger profile (tokens/creds can be passed in).
- Rename the output using tagged artist/title (with fallbacks).
"""
from __future__ import annotations
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
# Keep this script self-contained except for the OneTagger integration helper.
REPO_ROOT = Path(__file__).resolve().parent.parent
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
try:
import mutagen # type: ignore
except ImportError:
mutagen = None
from onetagger_integration import ( # type: ignore
OneTaggerCliNotFoundError,
tag_audio_with_onetagger,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Beautify Audio: convert, clean, tag, and rename a single audio file."
)
parser.add_argument("input_file", type=Path, help="Path to the source audio file")
parser.add_argument(
"--lufs",
type=float,
default=-11.5,
help="Integrated loudness target (LUFS). Valid range: -16 to 0. Default: -11.5",
)
parser.add_argument(
"--no-loudness",
action="store_true",
help="Disable loudness correction (conversion still runs at 320 kbps CBR).",
)
parser.add_argument(
"--output-dir",
type=Path,
help="Output directory (defaults to the input file's directory).",
)
parser.add_argument(
"--onetagger-cli",
type=Path,
help="Path to onetagger-cli. Defaults to PATH lookup or the script folder copy.",
)
parser.add_argument("--artist", help="Optional artist name to help with renaming.")
parser.add_argument("--title", help="Optional title to help with renaming.")
parser.add_argument("--album", help="Optional album name (passed to OneTagger config copy).")
parser.add_argument(
"--discogs-token",
default=None,
help="Discogs token for OneTagger (CLI overrides auth file/env).",
)
parser.add_argument(
"--spotify-id",
default=None,
help="Spotify client ID for OneTagger (CLI overrides auth file/env).",
)
parser.add_argument(
"--spotify-secret",
default=None,
help="Spotify client secret for OneTagger (CLI overrides auth file/env).",
)
parser.add_argument(
"--auth-file",
type=Path,
help="Optional JSON file with discogs_token/spotify_id/spotify_secret. Defaults to auth.json next to the script if present.",
)
parser.add_argument(
"--skip-onetagger",
action="store_true",
help="Skip the OneTagger tagging step (conversion + cleanup still happen).",
)
return parser.parse_args()
def validate_inputs(args: argparse.Namespace) -> tuple[Path, Path]:
src = args.input_file.expanduser().resolve()
if not src.exists() or not src.is_file():
raise FileNotFoundError(f"Input file not found: {src}")
if not (-16.0 <= args.lufs <= 0.0):
raise ValueError("LUFS must be between -16.0 and 0.0")
out_dir = args.output_dir.expanduser().resolve() if args.output_dir else src.parent
out_dir.mkdir(parents=True, exist_ok=True)
return src, out_dir
def next_available_path(path: Path) -> Path:
"""Return a unique path by appending '-<n>' before the suffix if needed."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
counter = 1
while True:
candidate = path.with_name(f"{stem}-{counter}{suffix}")
if not candidate.exists():
return candidate
counter += 1
def find_onetagger_cli(user_supplied: Path | None) -> Path | None:
"""Resolve onetagger-cli from user input, script folder, or PATH."""
candidates: list[Path] = []
if user_supplied:
candidates.append(user_supplied)
script_dir = Path(__file__).resolve().parent
candidates.append(script_dir / "onetagger-cli")
found_in_path = shutil.which("onetagger-cli")
if found_in_path:
candidates.append(Path(found_in_path))
for cand in candidates:
if cand and cand.exists():
if os.access(cand, os.X_OK):
return cand.resolve()
raise OneTaggerCliNotFoundError(f"onetagger-cli is not executable: {cand}")
return None
def convert_to_mp3(
src: Path,
dest: Path,
loudness_enabled: bool,
target_lufs: float,
) -> Path:
"""Convert to 320 kbps CBR MP3 with optional loudness correction."""
if not shutil.which("ffmpeg"):
raise RuntimeError("ffmpeg not found in PATH. Please install ffmpeg.")
cmd = ["ffmpeg", "-y", "-i", str(src), "-vn"]
if loudness_enabled:
cmd += ["-af", f"loudnorm=I={target_lufs}:TP=-1.5:LRA=11"]
cmd += ["-c:a", "libmp3lame", "-b:a", "320k", "-ar", "44100", "-ac", "2", str(dest)]
print(f"🔊 Converting → {dest}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(
f"ffmpeg failed (exit {result.returncode}).\nSTDERR:\n{result.stderr or result.stdout}"
)
return dest
def read_simple_tags(path: Path) -> tuple[str | None, str | None, str | None]:
"""Read (title, artist, album) tags using mutagen if available."""
if mutagen is None:
return None, None, None
try:
audio = mutagen.File(str(path), easy=True)
if not audio:
return None, None, None
title = audio.get("title", [None])[0]
artist = audio.get("artist", [None])[0]
album = audio.get("album", [None])[0]
return title, artist, album
except Exception:
return None, None, None
def parse_filename_artist_title(path: Path) -> tuple[str | None, str | None]:
"""If filename looks like 'Artist - Title', split and return parts."""
stem = path.stem
if stem.count("-") != 1:
return None, None
artist, title = [part.strip() for part in stem.split("-", 1)]
return (artist or None, title or None)
def collect_seed_metadata(
path: Path, cli_artist: str | None, cli_title: str | None, cli_album: str | None
) -> dict[str, str | None]:
"""Choose the best available artist/title/album for tagging and renaming."""
tag_title, tag_artist, tag_album = read_simple_tags(path)
fname_artist, fname_title = parse_filename_artist_title(path)
artist = cli_artist or tag_artist or fname_artist
title = cli_title or tag_title or fname_title
album = cli_album or tag_album
return {
"artist": artist,
"title": title,
"album": album,
"fname_artist": fname_artist,
"fname_title": fname_title,
"tag_artist": tag_artist,
"tag_title": tag_title,
"tag_album": tag_album,
}
def load_auth_credentials(auth_file: Path | None) -> dict[str, str]:
"""Load auth data from JSON if present; keys: discogs_token, spotify_id, spotify_secret."""
data = {"discogs_token": "", "spotify_id": "", "spotify_secret": ""}
candidate = auth_file
if candidate and candidate.exists():
try:
with candidate.open("r", encoding="utf-8") as f:
parsed = json.load(f)
data["discogs_token"] = parsed.get("discogs_token", "") or ""
data["spotify_id"] = parsed.get("spotify_id", "") or ""
data["spotify_secret"] = parsed.get("spotify_secret", "") or ""
print(f"🔑 Loaded auth from {candidate}")
except Exception as exc:
print(f"⚠️ Failed to read auth file {candidate}: {exc}")
return data
def strip_all_metadata(path: Path) -> None:
"""Remove all tags from the file in-place."""
if mutagen is None:
print("⚠️ mutagen not installed; skipping metadata strip.")
return
try:
audio = mutagen.File(str(path))
if audio is None:
return
if hasattr(audio, "delete"):
audio.delete()
elif audio.tags is not None:
audio.tags.clear()
audio.save()
print("🧹 Stripped existing metadata.")
except Exception as exc: # pragma: no cover - defensive
print(f"⚠️ Failed to strip metadata: {exc}")
def prepare_onetagger_config(
base_config: Path,
discogs_token: str,
spotify_id: str,
spotify_secret: str,
album: str | None = None,
seed_artist: str | None = None,
seed_title: str | None = None,
) -> Path:
"""Copy the base config and inject credentials when provided."""
with base_config.open("r", encoding="utf-8") as f:
cfg = json.load(f)
cfg.setdefault("custom", {})
cfg.setdefault("platforms", cfg.get("platforms") or [])
if discogs_token:
cfg["custom"].setdefault("discogs", {})["token"] = discogs_token
if "discogs" not in cfg["platforms"]:
cfg["platforms"].append("discogs")
if spotify_id and spotify_secret:
cfg["spotify"] = {"clientId": spotify_id, "clientSecret": spotify_secret}
if "spotify" not in cfg["platforms"]:
cfg["platforms"].append("spotify")
defaults = cfg.setdefault("custom", {}).setdefault("defaults", {})
if album:
defaults["album"] = album
if seed_artist:
defaults["artist"] = seed_artist
if seed_title:
defaults["title"] = seed_title
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json", prefix="onetagger_cfg_")
with open(tmp.name, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
return Path(tmp.name)
def run_onetagger(
audio_path: Path,
cli_path: Path,
base_config: Path,
discogs_token: str,
spotify_id: str,
spotify_secret: str,
album: str | None,
seed_artist: str | None,
seed_title: str | None,
) -> bool:
cfg_copy = prepare_onetagger_config(
base_config,
discogs_token,
spotify_id,
spotify_secret,
album,
seed_artist,
seed_title,
)
try:
res = tag_audio_with_onetagger(audio_file=audio_path, onetagger_cli_path=cli_path, config_path=cfg_copy)
if res.success:
print("🪪 OneTagger tagging succeeded.")
return True
print(f"⚠️ OneTagger tagging failed: {res.stderr or 'unknown error'}")
return False
except OneTaggerCliNotFoundError as exc:
print(f"⚠️ {exc}")
return False
except Exception as exc: # pragma: no cover - defensive
print(f"⚠️ OneTagger error: {exc}")
return False
finally:
try:
cfg_copy.unlink()
except Exception:
pass
def build_safe_basename(path: Path, fallback_artist: str | None, fallback_title: str | None) -> str:
"""Derive a sanitized base filename using tags when available."""
tag_title, tag_artist, _ = read_simple_tags(path)
artist = tag_artist or fallback_artist
title = tag_title or fallback_title
if artist and title:
base = f"{artist} - {title}"
elif artist:
base = f"{artist} - {path.stem}"
elif title:
base = f"{path.stem} - {title}"
else:
base = path.stem
return re.sub(r"[\\/:*?\"<>|]", "_", base).strip("_ ").strip()
def rename_output(
path: Path,
output_dir: Path,
fallback_artist: str | None,
fallback_title: str | None,
) -> Path:
base = build_safe_basename(path, fallback_artist, fallback_title)
desired = output_dir / f"{base}{path.suffix}"
if desired.resolve() == path.resolve():
return path
dest = next_available_path(desired)
if dest.resolve() == path.resolve():
return path
dest.parent.mkdir(parents=True, exist_ok=True)
path.rename(dest)
print(f"📝 Renamed to {dest.name}")
return dest
def rename_for_tagging(path: Path, output_dir: Path, artist: str | None, title: str | None) -> Path:
"""Rename the working file so OneTagger sees a helpful filename."""
if not artist or not title:
return path
desired = output_dir / f"{artist} - {title}{path.suffix}"
if desired.resolve() == path.resolve():
return path
target = next_available_path(desired)
if target.resolve() == path.resolve():
return path
try:
path.rename(target)
print(f"🏷️ Using filename hint for tagging: {target.name}")
return target
except Exception:
return path
def main() -> None:
args = parse_args()
src, out_dir = validate_inputs(args)
seed_info = collect_seed_metadata(src, args.artist, args.title, args.album)
album = args.album or seed_info.get("tag_album")
primary_artist = args.artist or seed_info.get("tag_artist") or seed_info.get("fname_artist")
primary_title = args.title or seed_info.get("tag_title") or seed_info.get("fname_title")
secondary_artist = None
secondary_title = None
if seed_info.get("fname_artist") and seed_info.get("fname_title"):
if (seed_info.get("fname_artist") != primary_artist) or (seed_info.get("fname_title") != primary_title):
secondary_artist = seed_info.get("fname_artist")
secondary_title = seed_info.get("fname_title")
# Derive the initial output path with collision avoidance
base_target = out_dir / f"{src.stem}.mp3"
target_mp3 = next_available_path(base_target)
loudness_on = not args.no_loudness
print(
f"🚀 Beautifying '{src.name}''{target_mp3.name}' "
f"(loudness {'on' if loudness_on else 'off'}, target {args.lufs} LUFS)"
)
# Auth resolution precedence:
# 1) CLI flags
# 2) Explicit auth file (--auth-file)
# 3) Environment variables
# 4) Default auth.json next to the script (only if nothing else provided)
explicit_auth = load_auth_credentials(args.auth_file)
discogs_token = args.discogs_token or explicit_auth["discogs_token"] or os.environ.get("DISCOGS_TOKEN", "")
spotify_id = args.spotify_id or explicit_auth["spotify_id"] or os.environ.get("SPOTIFY_CLIENT_ID", "")
spotify_secret = args.spotify_secret or explicit_auth["spotify_secret"] or os.environ.get("SPOTIFY_CLIENT_SECRET", "")
if (
not args.auth_file
and not discogs_token
and not spotify_id
and not spotify_secret
):
default_auth_path = Path(__file__).resolve().parent / "auth.json"
default_auth = load_auth_credentials(default_auth_path)
discogs_token = default_auth["discogs_token"]
spotify_id = default_auth["spotify_id"]
spotify_secret = default_auth["spotify_secret"]
converted = convert_to_mp3(src, target_mp3, loudness_on, args.lufs)
strip_all_metadata(converted)
working_file = rename_for_tagging(converted, out_dir, primary_artist, primary_title)
tagged = working_file
tag_success = False
cli_path = None
if not args.skip_onetagger:
cli_path = find_onetagger_cli(args.onetagger_cli)
cfg_path = Path(__file__).resolve().parent / "onetagger-autotagger.json"
if cli_path and cfg_path.exists():
tag_success = run_onetagger(
audio_path=working_file,
cli_path=cli_path,
base_config=cfg_path,
discogs_token=discogs_token,
spotify_id=spotify_id,
spotify_secret=spotify_secret,
album=album,
seed_artist=primary_artist,
seed_title=primary_title,
)
if not tag_success and secondary_artist and secondary_title:
working_file = rename_for_tagging(working_file, out_dir, secondary_artist, secondary_title)
tagged = working_file
tag_success = run_onetagger(
audio_path=working_file,
cli_path=cli_path,
base_config=cfg_path,
discogs_token=discogs_token,
spotify_id=spotify_id,
spotify_secret=spotify_secret,
album=album,
seed_artist=secondary_artist,
seed_title=secondary_title,
)
else:
print("⚠️ OneTagger skipped (binary or config not found).")
else:
print("⏭️ Skipping OneTagger tagging step as requested.")
fallback_artist = primary_artist or secondary_artist
fallback_title = primary_title or secondary_title
final_path = rename_output(tagged, out_dir, fallback_artist, fallback_title)
print(f"✅ Done: {final_path}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nAborted by user.")
sys.exit(1)
except Exception as exc:
print(f"❌ Error: {exc}")
sys.exit(1)