import json import os import sqlite3 import time import argparse from datetime import datetime, timezone, timedelta from zoneinfo import ZoneInfo import requests import ollama # --- Configuration --- SCRIPT_DIR = os.path.dirname(__file__) DB_FILE = os.path.join(SCRIPT_DIR, "news.db") CAPITALS_FILE = os.environ.get("TENAM_DATA", os.path.join(SCRIPT_DIR, "capitals_tz.json")) SEARXNG_URL = "http://localhost:8888" OLLAMA_MODEL = "gpt-oss:20b" # --- Database Setup --- def setup_database(): """ Creates the news table if it doesn't exist. New schema uses multilingual summaries: summary_en, summary_de, summary_jp. All summary columns are nullable to allow incremental backfill. """ conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() # Create table with new schema if it doesn't exist cursor.execute( """ CREATE TABLE IF NOT EXISTS news ( id INTEGER PRIMARY KEY AUTOINCREMENT, country_name TEXT NOT NULL, news_date DATE NOT NULL, summary_en TEXT, summary_de TEXT, summary_jp TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(country_name, news_date) ) """ ) conn.commit() conn.close() def run_db_migrations(): """ Performs safe, idempotent migrations on the `news` table. On old databases, the column `summary` is renamed to `summary_en`. New columns `summary_de` and `summary_jp` are added if missing. Uses ALTER TABLE where available and falls back to table copy when column rename isn't supported. """ conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() try: # Inspect existing columns cursor.execute("PRAGMA table_info(news)") cols = [row[1] for row in cursor.fetchall()] # Rename summary -> summary_en if needed if "summary_en" not in cols and "summary" in cols: try: cursor.execute("ALTER TABLE news RENAME COLUMN summary TO summary_en") conn.commit() # Refresh column list after rename cursor.execute("PRAGMA table_info(news)") cols = [row[1] for row in cursor.fetchall()] except sqlite3.OperationalError: # Fallback: recreate table with proper schema cursor.execute( """ CREATE TABLE news_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, country_name TEXT NOT NULL, news_date DATE NOT NULL, summary_en TEXT, summary_de TEXT, summary_jp TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(country_name, news_date) ) """ ) # Copy old data: map summary -> summary_en cursor.execute( "INSERT INTO news_new (id, country_name, news_date, summary_en, created_at) " "SELECT id, country_name, news_date, summary, created_at FROM news" ) cursor.execute("DROP TABLE news") cursor.execute("ALTER TABLE news_new RENAME TO news") conn.commit() cursor.execute("PRAGMA table_info(news)") cols = [row[1] for row in cursor.fetchall()] # Add missing translation columns if "summary_de" not in cols: cursor.execute("ALTER TABLE news ADD COLUMN summary_de TEXT") if "summary_jp" not in cols: cursor.execute("ALTER TABLE news ADD COLUMN summary_jp TEXT") conn.commit() finally: conn.close() # --- Core Logic --- def get_countries_at_time(target_hour, target_minute, capitals_data): """Finds countries where the local time matches the target hour and minute.""" now_utc = datetime.now(timezone.utc) hits = [] for entry in capitals_data: try: tz = ZoneInfo(entry["tzid"]) local_time = now_utc.astimezone(tz) if local_time.hour == target_hour and local_time.minute == target_minute: hits.append(entry) except Exception as e: print(f"Error processing timezone {entry.get('tzid', 'N/A')}: {e}") return hits def get_next_10am_countries(capitals_data): """Finds the next group of countries that will reach 10:00 AM.""" now_utc = datetime.now(timezone.utc) next_event_time = None countries_for_next_event = [] for entry in capitals_data: try: tz = ZoneInfo(entry["tzid"]) local_time = now_utc.astimezone(tz) # Calculate next 10:00 AM in this timezone next_10am = local_time.replace(hour=10, minute=0, second=0, microsecond=0) if local_time >= next_10am: next_10am += timedelta(days=1) if next_event_time is None or next_10am < next_event_time: next_event_time = next_10am countries_for_next_event = [entry] elif next_10am == next_event_time: countries_for_next_event.append(entry) except Exception as e: print(f"Error processing timezone {entry.get('tzid', 'N/A')} for dev mode: {e}") return countries_for_next_event def fetch_searxng_results(country_name): """Fetches news from SearXNG.""" query = f"top news in {country_name} today" params = {"q": query, "format": "json"} try: response = requests.get(SEARXNG_URL, params=params, timeout=15) response.raise_for_status() results = response.json().get("results", []) # Simple concatenation of titles and content for summary content_to_summarize = " ".join([ f"{r.get('title', '')}: {r.get('content', '')}" for r in results[:5] ]) return content_to_summarize except requests.RequestException as e: print(f"Error fetching from SearXNG for {country_name}: {e}") return None def summarize_with_ollama(content, country): """Summarizes content using the ollama library.""" if not content or content.isspace(): return "No content available to summarize." prompt = f"Here are excerpts of news websites from {content}.\nPlease provide a concise summary of these news in English.\nList each individual news in bulletin points. DO NOT list by news outlets - list by relevant news topics.\nDon't intro your response with things like \"..here are the news..\" or anything like that - reply with ONLY the news listed as bulletin points.\nKeep it concise.\nThe news:\n\n{content}" try: response = ollama.generate( model=OLLAMA_MODEL, prompt=prompt ) return response.get('response', 'Summary generation failed.').strip() except Exception as e: # The ollama library might raise various exceptions. print(f"Error communicating with Ollama using library: {e}") return "Summary failed due to a communication error." def translate_summary(summary_en: str, target_lang: str) -> str | None: """ Translate an English news summary into the specified language using the ollama model. Supported languages: 'de' for German and 'jp' for Japanese. Returns None on failure. """ if not summary_en or summary_en.isspace(): return None if target_lang == 'de': prompt = ( "Translate the following news summary into natural, accurate German. " "Keep the format.\nKeep it concise; do not add new facts.\n\n" "Summary (English):\n" + summary_en ) elif target_lang == 'jp': prompt = ( "Translate the following news summary into natural, accurate Japanese. " "Keep the format.\nKeep it concise; do not add new facts.\n\n" "Summary (English):\n" + summary_en ) else: # Unsupported language return None try: response = ollama.generate(model=OLLAMA_MODEL, prompt=prompt) return response.get('response', '').strip() except Exception as e: print(f"Error translating summary to {target_lang}: {e}") return None def store_news(country_name: str, summary_en: str, summary_de: str | None = None, summary_jp: str | None = None, overwrite: bool = False) -> None: """ Insert/update with verification. After commit, we re-read the row to confirm presence, avoiding misleading “success” logs if an insert was ignored or overwritten later. """ conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() news_date = datetime.now(timezone.utc).date() try: if overwrite: sql = ( "INSERT INTO news (country_name, news_date, summary_en, summary_de, summary_jp) " "VALUES (?, ?, ?, ?, ?) " "ON CONFLICT(country_name, news_date) DO UPDATE SET " "summary_en = excluded.summary_en, " "summary_de = COALESCE(excluded.summary_de, news.summary_de), " "summary_jp = COALESCE(excluded.summary_jp, news.summary_jp), " "created_at = CURRENT_TIMESTAMP" ) params = (country_name, news_date, summary_en, summary_de, summary_jp) verb = "upserted" else: sql = ( "INSERT OR IGNORE INTO news (country_name, news_date, summary_en, summary_de, summary_jp) " "VALUES (?, ?, ?, ?, ?)" ) params = (country_name, news_date, summary_en, summary_de, summary_jp) verb = "inserted" cursor.execute(sql, params) conn.commit() # Verify by reading back the row we *expect* to exist cursor.execute( "SELECT id FROM news WHERE country_name = ? AND news_date = ? LIMIT 1", (country_name, news_date), ) row = cursor.fetchone() if row: print(f"Successfully {verb} news for {country_name}. (id={row[0]})") else: # This should not happen; make it explicit in logs print(f"[warn] Post-commit verification failed for {country_name} ({news_date}).") except Exception as e: print(f"Error storing news for {country_name}: {e}") finally: conn.close() def update_translations(country_name: str, news_date, summary_de: str | None = None, summary_jp: str | None = None) -> None: """ Update translation fields for a given country and date. Only updates provided languages; leaving a language as None will not change that column. Updates `created_at` timestamp to reflect the change. """ conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() try: updates = [] params: list = [] if summary_de is not None: updates.append("summary_de = ?") params.append(summary_de) if summary_jp is not None: updates.append("summary_jp = ?") params.append(summary_jp) if not updates: return # Append timestamp update updates.append("created_at = CURRENT_TIMESTAMP") params.append(country_name) params.append(news_date) sql = f"UPDATE news SET {', '.join(updates)} WHERE country_name = ? AND news_date = ?" cursor.execute(sql, params) conn.commit() except Exception as e: print(f"Error updating translations for {country_name}: {e}") finally: conn.close() def process_country_queue(queue, overwrite=False): """ Processes the queue of countries sequentially. Generates English summaries and translations, then inserts or updates rows in the database accordingly. When overwrite=False, existing rows are not replaced; missing translations for existing rows are filled in if possible. """ print(f"Starting to process a queue of {len(queue)} countries. Overwrite: {overwrite}") for country_entry in queue: country_name = country_entry["country"] print(f"\n--- Processing: {country_name} ---") news_date = datetime.now(timezone.utc).date() # Determine if a record exists for this country/date conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute( "SELECT summary_en, summary_de, summary_jp FROM news WHERE country_name = ? AND news_date = ?", (country_name, news_date), ) existing_row = cursor.fetchone() conn.close() if existing_row and not overwrite: # Existing row: fill missing translations if possible summary_en_db, summary_de_db, summary_jp_db = existing_row # Only attempt translation if English summary is present if summary_en_db: need_de = summary_de_db is None need_jp = summary_jp_db is None if need_de or need_jp: print(f"Existing summary found for {country_name}; translating missing languages...") summary_de_new = translate_summary(summary_en_db, 'de') if need_de else None summary_jp_new = translate_summary(summary_en_db, 'jp') if need_jp else None # Log translation success/failure if need_de and summary_de_new: print(f"Filled German translation for {country_name}.") if need_jp and summary_jp_new: print(f"Filled Japanese translation for {country_name}.") # Update only provided translations update_translations(country_name, news_date, summary_de_new, summary_jp_new) else: print(f"Existing row for {country_name} lacks English summary; cannot translate.") # Skip summarization for existing rows when not overwriting continue # Fetch news content print(f"Fetching news for {country_name}...") news_content = fetch_searxng_results(country_name) if not news_content: print(f"No content fetched for {country_name}. Skipping summary.") continue # Summarize in English print(f"Summarizing news for {country_name}...") summary_en = summarize_with_ollama(news_content, country_name) # Translate to German and Japanese summary_de = translate_summary(summary_en, 'de') summary_jp = translate_summary(summary_en, 'jp') # Store the summaries store_news(country_name, summary_en, summary_de, summary_jp, overwrite=overwrite) # Brief pause between requests time.sleep(1) # Small delay to avoid overwhelming services # --- Main Loop --- def main(): parser = argparse.ArgumentParser(description="10AM News Collector Service") parser.add_argument("--dev", action="store_true", help="Run in development mode: process next 10am countries once and exit.") parser.add_argument("--overwrite", action="store_true", help="Overwrite existing news summaries in the database.") args = parser.parse_args() print("Starting News Collector Service...") setup_database() # Perform schema migrations (safe, idempotent) try: run_db_migrations() except Exception as e: print(f"Migration error: {e}") try: with open(CAPITALS_FILE, "r", encoding="utf-8") as f: capitals_data = json.load(f) except FileNotFoundError: print(f"Error: Capitals data file not found at {CAPITALS_FILE}") return if args.dev: print(f"Running in DEV mode... Overwrite: {args.overwrite}") country_queue = get_next_10am_countries(capitals_data) if country_queue: process_country_queue(country_queue, overwrite=args.overwrite) else: print("Could not determine the next set of countries for 10 AM.") print("DEV mode run complete.") return print("Running in PRODUCTION mode. Checking UTC minute windows (:00/:15/:30/:45) every minute...") while True: now_utc = datetime.now(timezone.utc) # Fire at every 15-minute UTC slot to cover quarter-hour timezones if (now_utc.minute % 15) == 0: print(f"UTC {now_utc.strftime('%H:%M')} → checking for countries at local 09:30.") country_queue = get_countries_at_time(9, 30, capitals_data) if country_queue: process_country_queue(country_queue, overwrite=args.overwrite) else: print("No countries are at 09:30 local time right now.") # Sleep until next minute boundary (UTC) time.sleep(60 - now_utc.second) if __name__ == "__main__": main()