Files
10AM/server/news_collector.py

397 lines
17 KiB
Python
Raw Permalink Normal View History

2025-09-09 17:29:49 +02:00
import json
import os
import sqlite3
import time
import argparse
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo
import requests
import ollama
# --- Configuration ---
SCRIPT_DIR = os.path.dirname(__file__)
DB_FILE = os.path.join(SCRIPT_DIR, "news.db")
CAPITALS_FILE = os.environ.get("TENAM_DATA", os.path.join(SCRIPT_DIR, "capitals_tz.json"))
SEARXNG_URL = "http://localhost:8888"
OLLAMA_MODEL = "gpt-oss:20b"
# --- Database Setup ---
def setup_database():
"""
Creates the news table if it doesn't exist. New schema uses
multilingual summaries: summary_en, summary_de, summary_jp. All
summary columns are nullable to allow incremental backfill.
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# Create table with new schema if it doesn't exist
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country_name TEXT NOT NULL,
news_date DATE NOT NULL,
summary_en TEXT,
summary_de TEXT,
summary_jp TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(country_name, news_date)
)
"""
)
conn.commit()
conn.close()
def run_db_migrations():
"""
Performs safe, idempotent migrations on the `news` table. On old
databases, the column `summary` is renamed to `summary_en`. New
columns `summary_de` and `summary_jp` are added if missing. Uses
ALTER TABLE where available and falls back to table copy when
column rename isn't supported.
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
try:
# Inspect existing columns
cursor.execute("PRAGMA table_info(news)")
cols = [row[1] for row in cursor.fetchall()]
# Rename summary -> summary_en if needed
if "summary_en" not in cols and "summary" in cols:
try:
cursor.execute("ALTER TABLE news RENAME COLUMN summary TO summary_en")
conn.commit()
# Refresh column list after rename
cursor.execute("PRAGMA table_info(news)")
cols = [row[1] for row in cursor.fetchall()]
except sqlite3.OperationalError:
# Fallback: recreate table with proper schema
cursor.execute(
"""
CREATE TABLE news_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country_name TEXT NOT NULL,
news_date DATE NOT NULL,
summary_en TEXT,
summary_de TEXT,
summary_jp TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(country_name, news_date)
)
"""
)
# Copy old data: map summary -> summary_en
cursor.execute(
"INSERT INTO news_new (id, country_name, news_date, summary_en, created_at) "
"SELECT id, country_name, news_date, summary, created_at FROM news"
)
cursor.execute("DROP TABLE news")
cursor.execute("ALTER TABLE news_new RENAME TO news")
conn.commit()
cursor.execute("PRAGMA table_info(news)")
cols = [row[1] for row in cursor.fetchall()]
# Add missing translation columns
if "summary_de" not in cols:
cursor.execute("ALTER TABLE news ADD COLUMN summary_de TEXT")
if "summary_jp" not in cols:
cursor.execute("ALTER TABLE news ADD COLUMN summary_jp TEXT")
conn.commit()
finally:
conn.close()
# --- Core Logic ---
def get_countries_at_time(target_hour, target_minute, capitals_data):
"""Finds countries where the local time matches the target hour and minute."""
now_utc = datetime.now(timezone.utc)
hits = []
for entry in capitals_data:
try:
tz = ZoneInfo(entry["tzid"])
local_time = now_utc.astimezone(tz)
if local_time.hour == target_hour and local_time.minute == target_minute:
hits.append(entry)
except Exception as e:
print(f"Error processing timezone {entry.get('tzid', 'N/A')}: {e}")
return hits
def get_next_10am_countries(capitals_data):
"""Finds the next group of countries that will reach 10:00 AM."""
now_utc = datetime.now(timezone.utc)
next_event_time = None
countries_for_next_event = []
for entry in capitals_data:
try:
tz = ZoneInfo(entry["tzid"])
local_time = now_utc.astimezone(tz)
# Calculate next 10:00 AM in this timezone
next_10am = local_time.replace(hour=10, minute=0, second=0, microsecond=0)
if local_time >= next_10am:
next_10am += timedelta(days=1)
if next_event_time is None or next_10am < next_event_time:
next_event_time = next_10am
countries_for_next_event = [entry]
elif next_10am == next_event_time:
countries_for_next_event.append(entry)
except Exception as e:
print(f"Error processing timezone {entry.get('tzid', 'N/A')} for dev mode: {e}")
return countries_for_next_event
def fetch_searxng_results(country_name):
"""Fetches news from SearXNG."""
query = f"top news in {country_name} today"
params = {"q": query, "format": "json"}
try:
response = requests.get(SEARXNG_URL, params=params, timeout=15)
response.raise_for_status()
results = response.json().get("results", [])
# Simple concatenation of titles and content for summary
content_to_summarize = " ".join([
f"{r.get('title', '')}: {r.get('content', '')}" for r in results[:5]
])
return content_to_summarize
except requests.RequestException as e:
print(f"Error fetching from SearXNG for {country_name}: {e}")
return None
def summarize_with_ollama(content, country):
"""Summarizes content using the ollama library."""
if not content or content.isspace():
return "No content available to summarize."
prompt = f"Here are excerpts of news websites from {content}.\nPlease provide a concise summary of these news in English.\nList each individual news in bulletin points. DO NOT list by news outlets - list by relevant news topics.\nDon't intro your response with things like \"..here are the news..\" or anything like that - reply with ONLY the news listed as bulletin points.\nKeep it concise.\nThe news:\n\n{content}"
try:
response = ollama.generate(
model=OLLAMA_MODEL,
prompt=prompt
)
return response.get('response', 'Summary generation failed.').strip()
except Exception as e:
# The ollama library might raise various exceptions.
print(f"Error communicating with Ollama using library: {e}")
return "Summary failed due to a communication error."
def translate_summary(summary_en: str, target_lang: str) -> str | None:
"""
Translate an English news summary into the specified language using the
ollama model. Supported languages: 'de' for German and 'jp' for Japanese.
Returns None on failure.
"""
if not summary_en or summary_en.isspace():
return None
if target_lang == 'de':
prompt = (
"Translate the following news summary into natural, accurate German. "
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
"Summary (English):\n" + summary_en
)
elif target_lang == 'jp':
prompt = (
"Translate the following news summary into natural, accurate Japanese. "
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
"Summary (English):\n" + summary_en
)
else:
# Unsupported language
return None
try:
response = ollama.generate(model=OLLAMA_MODEL, prompt=prompt)
return response.get('response', '').strip()
except Exception as e:
print(f"Error translating summary to {target_lang}: {e}")
return None
def store_news(country_name: str, summary_en: str, summary_de: str | None = None, summary_jp: str | None = None, overwrite: bool = False) -> None:
"""
Insert/update with verification. After commit, we re-read the row to confirm
presence, avoiding misleading success logs if an insert was ignored or
overwritten later.
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
news_date = datetime.now(timezone.utc).date()
try:
if overwrite:
sql = (
"INSERT INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
"VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(country_name, news_date) DO UPDATE SET "
"summary_en = excluded.summary_en, "
"summary_de = COALESCE(excluded.summary_de, news.summary_de), "
"summary_jp = COALESCE(excluded.summary_jp, news.summary_jp), "
"created_at = CURRENT_TIMESTAMP"
)
params = (country_name, news_date, summary_en, summary_de, summary_jp)
verb = "upserted"
else:
sql = (
"INSERT OR IGNORE INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
"VALUES (?, ?, ?, ?, ?)"
)
params = (country_name, news_date, summary_en, summary_de, summary_jp)
verb = "inserted"
cursor.execute(sql, params)
conn.commit()
# Verify by reading back the row we *expect* to exist
cursor.execute(
"SELECT id FROM news WHERE country_name = ? AND news_date = ? LIMIT 1",
(country_name, news_date),
)
row = cursor.fetchone()
if row:
print(f"Successfully {verb} news for {country_name}. (id={row[0]})")
else:
# This should not happen; make it explicit in logs
print(f"[warn] Post-commit verification failed for {country_name} ({news_date}).")
except Exception as e:
print(f"Error storing news for {country_name}: {e}")
finally:
conn.close()
def update_translations(country_name: str, news_date, summary_de: str | None = None, summary_jp: str | None = None) -> None:
"""
Update translation fields for a given country and date. Only updates
provided languages; leaving a language as None will not change that
column. Updates `created_at` timestamp to reflect the change.
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
try:
updates = []
params: list = []
if summary_de is not None:
updates.append("summary_de = ?")
params.append(summary_de)
if summary_jp is not None:
updates.append("summary_jp = ?")
params.append(summary_jp)
if not updates:
return
# Append timestamp update
updates.append("created_at = CURRENT_TIMESTAMP")
params.append(country_name)
params.append(news_date)
sql = f"UPDATE news SET {', '.join(updates)} WHERE country_name = ? AND news_date = ?"
cursor.execute(sql, params)
conn.commit()
except Exception as e:
print(f"Error updating translations for {country_name}: {e}")
finally:
conn.close()
def process_country_queue(queue, overwrite=False):
"""
Processes the queue of countries sequentially. Generates English summaries and
translations, then inserts or updates rows in the database accordingly.
When overwrite=False, existing rows are not replaced; missing translations
for existing rows are filled in if possible.
"""
print(f"Starting to process a queue of {len(queue)} countries. Overwrite: {overwrite}")
for country_entry in queue:
country_name = country_entry["country"]
print(f"\n--- Processing: {country_name} ---")
news_date = datetime.now(timezone.utc).date()
# Determine if a record exists for this country/date
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute(
"SELECT summary_en, summary_de, summary_jp FROM news WHERE country_name = ? AND news_date = ?",
(country_name, news_date),
)
existing_row = cursor.fetchone()
conn.close()
if existing_row and not overwrite:
# Existing row: fill missing translations if possible
summary_en_db, summary_de_db, summary_jp_db = existing_row
# Only attempt translation if English summary is present
if summary_en_db:
need_de = summary_de_db is None
need_jp = summary_jp_db is None
if need_de or need_jp:
print(f"Existing summary found for {country_name}; translating missing languages...")
summary_de_new = translate_summary(summary_en_db, 'de') if need_de else None
summary_jp_new = translate_summary(summary_en_db, 'jp') if need_jp else None
# Log translation success/failure
if need_de and summary_de_new:
print(f"Filled German translation for {country_name}.")
if need_jp and summary_jp_new:
print(f"Filled Japanese translation for {country_name}.")
# Update only provided translations
update_translations(country_name, news_date, summary_de_new, summary_jp_new)
else:
print(f"Existing row for {country_name} lacks English summary; cannot translate.")
# Skip summarization for existing rows when not overwriting
continue
# Fetch news content
print(f"Fetching news for {country_name}...")
news_content = fetch_searxng_results(country_name)
if not news_content:
print(f"No content fetched for {country_name}. Skipping summary.")
continue
# Summarize in English
print(f"Summarizing news for {country_name}...")
summary_en = summarize_with_ollama(news_content, country_name)
# Translate to German and Japanese
summary_de = translate_summary(summary_en, 'de')
summary_jp = translate_summary(summary_en, 'jp')
# Store the summaries
store_news(country_name, summary_en, summary_de, summary_jp, overwrite=overwrite)
# Brief pause between requests
time.sleep(1) # Small delay to avoid overwhelming services
# --- Main Loop ---
def main():
parser = argparse.ArgumentParser(description="10AM News Collector Service")
parser.add_argument("--dev", action="store_true", help="Run in development mode: process next 10am countries once and exit.")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing news summaries in the database.")
args = parser.parse_args()
print("Starting News Collector Service...")
setup_database()
# Perform schema migrations (safe, idempotent)
try:
run_db_migrations()
except Exception as e:
print(f"Migration error: {e}")
try:
with open(CAPITALS_FILE, "r", encoding="utf-8") as f:
capitals_data = json.load(f)
except FileNotFoundError:
print(f"Error: Capitals data file not found at {CAPITALS_FILE}")
return
if args.dev:
print(f"Running in DEV mode... Overwrite: {args.overwrite}")
country_queue = get_next_10am_countries(capitals_data)
if country_queue:
process_country_queue(country_queue, overwrite=args.overwrite)
else:
print("Could not determine the next set of countries for 10 AM.")
print("DEV mode run complete.")
return
print("Running in PRODUCTION mode. Checking UTC minute windows (:00/:15/:30/:45) every minute...")
while True:
now_utc = datetime.now(timezone.utc)
# Fire at every 15-minute UTC slot to cover quarter-hour timezones
if (now_utc.minute % 15) == 0:
print(f"UTC {now_utc.strftime('%H:%M')} → checking for countries at local 09:30.")
country_queue = get_countries_at_time(9, 30, capitals_data)
if country_queue:
process_country_queue(country_queue, overwrite=args.overwrite)
else:
print("No countries are at 09:30 local time right now.")
# Sleep until next minute boundary (UTC)
time.sleep(60 - now_utc.second)
if __name__ == "__main__":
main()