397 lines
17 KiB
Python
397 lines
17 KiB
Python
|
|
import json
|
||
|
|
import os
|
||
|
|
import sqlite3
|
||
|
|
import time
|
||
|
|
import argparse
|
||
|
|
from datetime import datetime, timezone, timedelta
|
||
|
|
from zoneinfo import ZoneInfo
|
||
|
|
import requests
|
||
|
|
import ollama
|
||
|
|
|
||
|
|
# --- Configuration ---
|
||
|
|
SCRIPT_DIR = os.path.dirname(__file__)
|
||
|
|
DB_FILE = os.path.join(SCRIPT_DIR, "news.db")
|
||
|
|
CAPITALS_FILE = os.environ.get("TENAM_DATA", os.path.join(SCRIPT_DIR, "capitals_tz.json"))
|
||
|
|
SEARXNG_URL = "http://localhost:8888"
|
||
|
|
OLLAMA_MODEL = "gpt-oss:20b"
|
||
|
|
|
||
|
|
# --- Database Setup ---
|
||
|
|
def setup_database():
|
||
|
|
"""
|
||
|
|
Creates the news table if it doesn't exist. New schema uses
|
||
|
|
multilingual summaries: summary_en, summary_de, summary_jp. All
|
||
|
|
summary columns are nullable to allow incremental backfill.
|
||
|
|
"""
|
||
|
|
conn = sqlite3.connect(DB_FILE)
|
||
|
|
cursor = conn.cursor()
|
||
|
|
# Create table with new schema if it doesn't exist
|
||
|
|
cursor.execute(
|
||
|
|
"""
|
||
|
|
CREATE TABLE IF NOT EXISTS news (
|
||
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
|
|
country_name TEXT NOT NULL,
|
||
|
|
news_date DATE NOT NULL,
|
||
|
|
summary_en TEXT,
|
||
|
|
summary_de TEXT,
|
||
|
|
summary_jp TEXT,
|
||
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
|
|
UNIQUE(country_name, news_date)
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
conn.commit()
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
def run_db_migrations():
|
||
|
|
"""
|
||
|
|
Performs safe, idempotent migrations on the `news` table. On old
|
||
|
|
databases, the column `summary` is renamed to `summary_en`. New
|
||
|
|
columns `summary_de` and `summary_jp` are added if missing. Uses
|
||
|
|
ALTER TABLE where available and falls back to table copy when
|
||
|
|
column rename isn't supported.
|
||
|
|
"""
|
||
|
|
conn = sqlite3.connect(DB_FILE)
|
||
|
|
cursor = conn.cursor()
|
||
|
|
try:
|
||
|
|
# Inspect existing columns
|
||
|
|
cursor.execute("PRAGMA table_info(news)")
|
||
|
|
cols = [row[1] for row in cursor.fetchall()]
|
||
|
|
# Rename summary -> summary_en if needed
|
||
|
|
if "summary_en" not in cols and "summary" in cols:
|
||
|
|
try:
|
||
|
|
cursor.execute("ALTER TABLE news RENAME COLUMN summary TO summary_en")
|
||
|
|
conn.commit()
|
||
|
|
# Refresh column list after rename
|
||
|
|
cursor.execute("PRAGMA table_info(news)")
|
||
|
|
cols = [row[1] for row in cursor.fetchall()]
|
||
|
|
except sqlite3.OperationalError:
|
||
|
|
# Fallback: recreate table with proper schema
|
||
|
|
cursor.execute(
|
||
|
|
"""
|
||
|
|
CREATE TABLE news_new (
|
||
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
|
|
country_name TEXT NOT NULL,
|
||
|
|
news_date DATE NOT NULL,
|
||
|
|
summary_en TEXT,
|
||
|
|
summary_de TEXT,
|
||
|
|
summary_jp TEXT,
|
||
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
|
|
UNIQUE(country_name, news_date)
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
# Copy old data: map summary -> summary_en
|
||
|
|
cursor.execute(
|
||
|
|
"INSERT INTO news_new (id, country_name, news_date, summary_en, created_at) "
|
||
|
|
"SELECT id, country_name, news_date, summary, created_at FROM news"
|
||
|
|
)
|
||
|
|
cursor.execute("DROP TABLE news")
|
||
|
|
cursor.execute("ALTER TABLE news_new RENAME TO news")
|
||
|
|
conn.commit()
|
||
|
|
cursor.execute("PRAGMA table_info(news)")
|
||
|
|
cols = [row[1] for row in cursor.fetchall()]
|
||
|
|
# Add missing translation columns
|
||
|
|
if "summary_de" not in cols:
|
||
|
|
cursor.execute("ALTER TABLE news ADD COLUMN summary_de TEXT")
|
||
|
|
if "summary_jp" not in cols:
|
||
|
|
cursor.execute("ALTER TABLE news ADD COLUMN summary_jp TEXT")
|
||
|
|
conn.commit()
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
# --- Core Logic ---
|
||
|
|
def get_countries_at_time(target_hour, target_minute, capitals_data):
|
||
|
|
"""Finds countries where the local time matches the target hour and minute."""
|
||
|
|
now_utc = datetime.now(timezone.utc)
|
||
|
|
hits = []
|
||
|
|
for entry in capitals_data:
|
||
|
|
try:
|
||
|
|
tz = ZoneInfo(entry["tzid"])
|
||
|
|
local_time = now_utc.astimezone(tz)
|
||
|
|
if local_time.hour == target_hour and local_time.minute == target_minute:
|
||
|
|
hits.append(entry)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error processing timezone {entry.get('tzid', 'N/A')}: {e}")
|
||
|
|
return hits
|
||
|
|
|
||
|
|
def get_next_10am_countries(capitals_data):
|
||
|
|
"""Finds the next group of countries that will reach 10:00 AM."""
|
||
|
|
now_utc = datetime.now(timezone.utc)
|
||
|
|
next_event_time = None
|
||
|
|
countries_for_next_event = []
|
||
|
|
|
||
|
|
for entry in capitals_data:
|
||
|
|
try:
|
||
|
|
tz = ZoneInfo(entry["tzid"])
|
||
|
|
local_time = now_utc.astimezone(tz)
|
||
|
|
|
||
|
|
# Calculate next 10:00 AM in this timezone
|
||
|
|
next_10am = local_time.replace(hour=10, minute=0, second=0, microsecond=0)
|
||
|
|
if local_time >= next_10am:
|
||
|
|
next_10am += timedelta(days=1)
|
||
|
|
|
||
|
|
if next_event_time is None or next_10am < next_event_time:
|
||
|
|
next_event_time = next_10am
|
||
|
|
countries_for_next_event = [entry]
|
||
|
|
elif next_10am == next_event_time:
|
||
|
|
countries_for_next_event.append(entry)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error processing timezone {entry.get('tzid', 'N/A')} for dev mode: {e}")
|
||
|
|
|
||
|
|
return countries_for_next_event
|
||
|
|
|
||
|
|
def fetch_searxng_results(country_name):
|
||
|
|
"""Fetches news from SearXNG."""
|
||
|
|
query = f"top news in {country_name} today"
|
||
|
|
params = {"q": query, "format": "json"}
|
||
|
|
try:
|
||
|
|
response = requests.get(SEARXNG_URL, params=params, timeout=15)
|
||
|
|
response.raise_for_status()
|
||
|
|
results = response.json().get("results", [])
|
||
|
|
# Simple concatenation of titles and content for summary
|
||
|
|
content_to_summarize = " ".join([
|
||
|
|
f"{r.get('title', '')}: {r.get('content', '')}" for r in results[:5]
|
||
|
|
])
|
||
|
|
return content_to_summarize
|
||
|
|
except requests.RequestException as e:
|
||
|
|
print(f"Error fetching from SearXNG for {country_name}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def summarize_with_ollama(content, country):
|
||
|
|
"""Summarizes content using the ollama library."""
|
||
|
|
if not content or content.isspace():
|
||
|
|
return "No content available to summarize."
|
||
|
|
|
||
|
|
prompt = f"Here are excerpts of news websites from {content}.\nPlease provide a concise summary of these news in English.\nList each individual news in bulletin points. DO NOT list by news outlets - list by relevant news topics.\nDon't intro your response with things like \"..here are the news..\" or anything like that - reply with ONLY the news listed as bulletin points.\nKeep it concise.\nThe news:\n\n{content}"
|
||
|
|
try:
|
||
|
|
response = ollama.generate(
|
||
|
|
model=OLLAMA_MODEL,
|
||
|
|
prompt=prompt
|
||
|
|
)
|
||
|
|
return response.get('response', 'Summary generation failed.').strip()
|
||
|
|
except Exception as e:
|
||
|
|
# The ollama library might raise various exceptions.
|
||
|
|
print(f"Error communicating with Ollama using library: {e}")
|
||
|
|
return "Summary failed due to a communication error."
|
||
|
|
|
||
|
|
def translate_summary(summary_en: str, target_lang: str) -> str | None:
|
||
|
|
"""
|
||
|
|
Translate an English news summary into the specified language using the
|
||
|
|
ollama model. Supported languages: 'de' for German and 'jp' for Japanese.
|
||
|
|
Returns None on failure.
|
||
|
|
"""
|
||
|
|
if not summary_en or summary_en.isspace():
|
||
|
|
return None
|
||
|
|
if target_lang == 'de':
|
||
|
|
prompt = (
|
||
|
|
"Translate the following news summary into natural, accurate German. "
|
||
|
|
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
|
||
|
|
"Summary (English):\n" + summary_en
|
||
|
|
)
|
||
|
|
elif target_lang == 'jp':
|
||
|
|
prompt = (
|
||
|
|
"Translate the following news summary into natural, accurate Japanese. "
|
||
|
|
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
|
||
|
|
"Summary (English):\n" + summary_en
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
# Unsupported language
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
response = ollama.generate(model=OLLAMA_MODEL, prompt=prompt)
|
||
|
|
return response.get('response', '').strip()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error translating summary to {target_lang}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def store_news(country_name: str, summary_en: str, summary_de: str | None = None, summary_jp: str | None = None, overwrite: bool = False) -> None:
|
||
|
|
"""
|
||
|
|
Insert/update with verification. After commit, we re-read the row to confirm
|
||
|
|
presence, avoiding misleading “success” logs if an insert was ignored or
|
||
|
|
overwritten later.
|
||
|
|
"""
|
||
|
|
conn = sqlite3.connect(DB_FILE)
|
||
|
|
cursor = conn.cursor()
|
||
|
|
news_date = datetime.now(timezone.utc).date()
|
||
|
|
try:
|
||
|
|
if overwrite:
|
||
|
|
sql = (
|
||
|
|
"INSERT INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
|
||
|
|
"VALUES (?, ?, ?, ?, ?) "
|
||
|
|
"ON CONFLICT(country_name, news_date) DO UPDATE SET "
|
||
|
|
"summary_en = excluded.summary_en, "
|
||
|
|
"summary_de = COALESCE(excluded.summary_de, news.summary_de), "
|
||
|
|
"summary_jp = COALESCE(excluded.summary_jp, news.summary_jp), "
|
||
|
|
"created_at = CURRENT_TIMESTAMP"
|
||
|
|
)
|
||
|
|
params = (country_name, news_date, summary_en, summary_de, summary_jp)
|
||
|
|
verb = "upserted"
|
||
|
|
else:
|
||
|
|
sql = (
|
||
|
|
"INSERT OR IGNORE INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
|
||
|
|
"VALUES (?, ?, ?, ?, ?)"
|
||
|
|
)
|
||
|
|
params = (country_name, news_date, summary_en, summary_de, summary_jp)
|
||
|
|
verb = "inserted"
|
||
|
|
|
||
|
|
cursor.execute(sql, params)
|
||
|
|
conn.commit()
|
||
|
|
|
||
|
|
# Verify by reading back the row we *expect* to exist
|
||
|
|
cursor.execute(
|
||
|
|
"SELECT id FROM news WHERE country_name = ? AND news_date = ? LIMIT 1",
|
||
|
|
(country_name, news_date),
|
||
|
|
)
|
||
|
|
row = cursor.fetchone()
|
||
|
|
if row:
|
||
|
|
print(f"Successfully {verb} news for {country_name}. (id={row[0]})")
|
||
|
|
else:
|
||
|
|
# This should not happen; make it explicit in logs
|
||
|
|
print(f"[warn] Post-commit verification failed for {country_name} ({news_date}).")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error storing news for {country_name}: {e}")
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
def update_translations(country_name: str, news_date, summary_de: str | None = None, summary_jp: str | None = None) -> None:
|
||
|
|
"""
|
||
|
|
Update translation fields for a given country and date. Only updates
|
||
|
|
provided languages; leaving a language as None will not change that
|
||
|
|
column. Updates `created_at` timestamp to reflect the change.
|
||
|
|
"""
|
||
|
|
conn = sqlite3.connect(DB_FILE)
|
||
|
|
cursor = conn.cursor()
|
||
|
|
try:
|
||
|
|
updates = []
|
||
|
|
params: list = []
|
||
|
|
if summary_de is not None:
|
||
|
|
updates.append("summary_de = ?")
|
||
|
|
params.append(summary_de)
|
||
|
|
if summary_jp is not None:
|
||
|
|
updates.append("summary_jp = ?")
|
||
|
|
params.append(summary_jp)
|
||
|
|
if not updates:
|
||
|
|
return
|
||
|
|
# Append timestamp update
|
||
|
|
updates.append("created_at = CURRENT_TIMESTAMP")
|
||
|
|
params.append(country_name)
|
||
|
|
params.append(news_date)
|
||
|
|
sql = f"UPDATE news SET {', '.join(updates)} WHERE country_name = ? AND news_date = ?"
|
||
|
|
cursor.execute(sql, params)
|
||
|
|
conn.commit()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error updating translations for {country_name}: {e}")
|
||
|
|
finally:
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
def process_country_queue(queue, overwrite=False):
|
||
|
|
"""
|
||
|
|
Processes the queue of countries sequentially. Generates English summaries and
|
||
|
|
translations, then inserts or updates rows in the database accordingly.
|
||
|
|
When overwrite=False, existing rows are not replaced; missing translations
|
||
|
|
for existing rows are filled in if possible.
|
||
|
|
"""
|
||
|
|
print(f"Starting to process a queue of {len(queue)} countries. Overwrite: {overwrite}")
|
||
|
|
for country_entry in queue:
|
||
|
|
country_name = country_entry["country"]
|
||
|
|
print(f"\n--- Processing: {country_name} ---")
|
||
|
|
news_date = datetime.now(timezone.utc).date()
|
||
|
|
# Determine if a record exists for this country/date
|
||
|
|
conn = sqlite3.connect(DB_FILE)
|
||
|
|
cursor = conn.cursor()
|
||
|
|
cursor.execute(
|
||
|
|
"SELECT summary_en, summary_de, summary_jp FROM news WHERE country_name = ? AND news_date = ?",
|
||
|
|
(country_name, news_date),
|
||
|
|
)
|
||
|
|
existing_row = cursor.fetchone()
|
||
|
|
conn.close()
|
||
|
|
|
||
|
|
if existing_row and not overwrite:
|
||
|
|
# Existing row: fill missing translations if possible
|
||
|
|
summary_en_db, summary_de_db, summary_jp_db = existing_row
|
||
|
|
# Only attempt translation if English summary is present
|
||
|
|
if summary_en_db:
|
||
|
|
need_de = summary_de_db is None
|
||
|
|
need_jp = summary_jp_db is None
|
||
|
|
if need_de or need_jp:
|
||
|
|
print(f"Existing summary found for {country_name}; translating missing languages...")
|
||
|
|
summary_de_new = translate_summary(summary_en_db, 'de') if need_de else None
|
||
|
|
summary_jp_new = translate_summary(summary_en_db, 'jp') if need_jp else None
|
||
|
|
# Log translation success/failure
|
||
|
|
if need_de and summary_de_new:
|
||
|
|
print(f"Filled German translation for {country_name}.")
|
||
|
|
if need_jp and summary_jp_new:
|
||
|
|
print(f"Filled Japanese translation for {country_name}.")
|
||
|
|
# Update only provided translations
|
||
|
|
update_translations(country_name, news_date, summary_de_new, summary_jp_new)
|
||
|
|
else:
|
||
|
|
print(f"Existing row for {country_name} lacks English summary; cannot translate.")
|
||
|
|
# Skip summarization for existing rows when not overwriting
|
||
|
|
continue
|
||
|
|
# Fetch news content
|
||
|
|
print(f"Fetching news for {country_name}...")
|
||
|
|
news_content = fetch_searxng_results(country_name)
|
||
|
|
if not news_content:
|
||
|
|
print(f"No content fetched for {country_name}. Skipping summary.")
|
||
|
|
continue
|
||
|
|
# Summarize in English
|
||
|
|
print(f"Summarizing news for {country_name}...")
|
||
|
|
summary_en = summarize_with_ollama(news_content, country_name)
|
||
|
|
# Translate to German and Japanese
|
||
|
|
summary_de = translate_summary(summary_en, 'de')
|
||
|
|
summary_jp = translate_summary(summary_en, 'jp')
|
||
|
|
# Store the summaries
|
||
|
|
store_news(country_name, summary_en, summary_de, summary_jp, overwrite=overwrite)
|
||
|
|
# Brief pause between requests
|
||
|
|
time.sleep(1) # Small delay to avoid overwhelming services
|
||
|
|
|
||
|
|
# --- Main Loop ---
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description="10AM News Collector Service")
|
||
|
|
parser.add_argument("--dev", action="store_true", help="Run in development mode: process next 10am countries once and exit.")
|
||
|
|
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing news summaries in the database.")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
print("Starting News Collector Service...")
|
||
|
|
setup_database()
|
||
|
|
# Perform schema migrations (safe, idempotent)
|
||
|
|
try:
|
||
|
|
run_db_migrations()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Migration error: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
try:
|
||
|
|
with open(CAPITALS_FILE, "r", encoding="utf-8") as f:
|
||
|
|
capitals_data = json.load(f)
|
||
|
|
except FileNotFoundError:
|
||
|
|
print(f"Error: Capitals data file not found at {CAPITALS_FILE}")
|
||
|
|
return
|
||
|
|
|
||
|
|
if args.dev:
|
||
|
|
print(f"Running in DEV mode... Overwrite: {args.overwrite}")
|
||
|
|
country_queue = get_next_10am_countries(capitals_data)
|
||
|
|
if country_queue:
|
||
|
|
process_country_queue(country_queue, overwrite=args.overwrite)
|
||
|
|
else:
|
||
|
|
print("Could not determine the next set of countries for 10 AM.")
|
||
|
|
print("DEV mode run complete.")
|
||
|
|
return
|
||
|
|
|
||
|
|
print("Running in PRODUCTION mode. Checking UTC minute windows (:00/:15/:30/:45) every minute...")
|
||
|
|
while True:
|
||
|
|
now_utc = datetime.now(timezone.utc)
|
||
|
|
# Fire at every 15-minute UTC slot to cover quarter-hour timezones
|
||
|
|
if (now_utc.minute % 15) == 0:
|
||
|
|
print(f"UTC {now_utc.strftime('%H:%M')} → checking for countries at local 09:30.")
|
||
|
|
country_queue = get_countries_at_time(9, 30, capitals_data)
|
||
|
|
if country_queue:
|
||
|
|
process_country_queue(country_queue, overwrite=args.overwrite)
|
||
|
|
else:
|
||
|
|
print("No countries are at 09:30 local time right now.")
|
||
|
|
# Sleep until next minute boundary (UTC)
|
||
|
|
time.sleep(60 - now_utc.second)
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|