initial commit
This commit is contained in:
396
server/news_collector.py
Normal file
396
server/news_collector.py
Normal file
@@ -0,0 +1,396 @@
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
import argparse
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
import requests
|
||||
import ollama
|
||||
|
||||
# --- Configuration ---
|
||||
SCRIPT_DIR = os.path.dirname(__file__)
|
||||
DB_FILE = os.path.join(SCRIPT_DIR, "news.db")
|
||||
CAPITALS_FILE = os.environ.get("TENAM_DATA", os.path.join(SCRIPT_DIR, "capitals_tz.json"))
|
||||
SEARXNG_URL = "http://localhost:8888"
|
||||
OLLAMA_MODEL = "gpt-oss:20b"
|
||||
|
||||
# --- Database Setup ---
|
||||
def setup_database():
|
||||
"""
|
||||
Creates the news table if it doesn't exist. New schema uses
|
||||
multilingual summaries: summary_en, summary_de, summary_jp. All
|
||||
summary columns are nullable to allow incremental backfill.
|
||||
"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
# Create table with new schema if it doesn't exist
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS news (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
country_name TEXT NOT NULL,
|
||||
news_date DATE NOT NULL,
|
||||
summary_en TEXT,
|
||||
summary_de TEXT,
|
||||
summary_jp TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(country_name, news_date)
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def run_db_migrations():
|
||||
"""
|
||||
Performs safe, idempotent migrations on the `news` table. On old
|
||||
databases, the column `summary` is renamed to `summary_en`. New
|
||||
columns `summary_de` and `summary_jp` are added if missing. Uses
|
||||
ALTER TABLE where available and falls back to table copy when
|
||||
column rename isn't supported.
|
||||
"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
# Inspect existing columns
|
||||
cursor.execute("PRAGMA table_info(news)")
|
||||
cols = [row[1] for row in cursor.fetchall()]
|
||||
# Rename summary -> summary_en if needed
|
||||
if "summary_en" not in cols and "summary" in cols:
|
||||
try:
|
||||
cursor.execute("ALTER TABLE news RENAME COLUMN summary TO summary_en")
|
||||
conn.commit()
|
||||
# Refresh column list after rename
|
||||
cursor.execute("PRAGMA table_info(news)")
|
||||
cols = [row[1] for row in cursor.fetchall()]
|
||||
except sqlite3.OperationalError:
|
||||
# Fallback: recreate table with proper schema
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE news_new (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
country_name TEXT NOT NULL,
|
||||
news_date DATE NOT NULL,
|
||||
summary_en TEXT,
|
||||
summary_de TEXT,
|
||||
summary_jp TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(country_name, news_date)
|
||||
)
|
||||
"""
|
||||
)
|
||||
# Copy old data: map summary -> summary_en
|
||||
cursor.execute(
|
||||
"INSERT INTO news_new (id, country_name, news_date, summary_en, created_at) "
|
||||
"SELECT id, country_name, news_date, summary, created_at FROM news"
|
||||
)
|
||||
cursor.execute("DROP TABLE news")
|
||||
cursor.execute("ALTER TABLE news_new RENAME TO news")
|
||||
conn.commit()
|
||||
cursor.execute("PRAGMA table_info(news)")
|
||||
cols = [row[1] for row in cursor.fetchall()]
|
||||
# Add missing translation columns
|
||||
if "summary_de" not in cols:
|
||||
cursor.execute("ALTER TABLE news ADD COLUMN summary_de TEXT")
|
||||
if "summary_jp" not in cols:
|
||||
cursor.execute("ALTER TABLE news ADD COLUMN summary_jp TEXT")
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# --- Core Logic ---
|
||||
def get_countries_at_time(target_hour, target_minute, capitals_data):
|
||||
"""Finds countries where the local time matches the target hour and minute."""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
hits = []
|
||||
for entry in capitals_data:
|
||||
try:
|
||||
tz = ZoneInfo(entry["tzid"])
|
||||
local_time = now_utc.astimezone(tz)
|
||||
if local_time.hour == target_hour and local_time.minute == target_minute:
|
||||
hits.append(entry)
|
||||
except Exception as e:
|
||||
print(f"Error processing timezone {entry.get('tzid', 'N/A')}: {e}")
|
||||
return hits
|
||||
|
||||
def get_next_10am_countries(capitals_data):
|
||||
"""Finds the next group of countries that will reach 10:00 AM."""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
next_event_time = None
|
||||
countries_for_next_event = []
|
||||
|
||||
for entry in capitals_data:
|
||||
try:
|
||||
tz = ZoneInfo(entry["tzid"])
|
||||
local_time = now_utc.astimezone(tz)
|
||||
|
||||
# Calculate next 10:00 AM in this timezone
|
||||
next_10am = local_time.replace(hour=10, minute=0, second=0, microsecond=0)
|
||||
if local_time >= next_10am:
|
||||
next_10am += timedelta(days=1)
|
||||
|
||||
if next_event_time is None or next_10am < next_event_time:
|
||||
next_event_time = next_10am
|
||||
countries_for_next_event = [entry]
|
||||
elif next_10am == next_event_time:
|
||||
countries_for_next_event.append(entry)
|
||||
except Exception as e:
|
||||
print(f"Error processing timezone {entry.get('tzid', 'N/A')} for dev mode: {e}")
|
||||
|
||||
return countries_for_next_event
|
||||
|
||||
def fetch_searxng_results(country_name):
|
||||
"""Fetches news from SearXNG."""
|
||||
query = f"top news in {country_name} today"
|
||||
params = {"q": query, "format": "json"}
|
||||
try:
|
||||
response = requests.get(SEARXNG_URL, params=params, timeout=15)
|
||||
response.raise_for_status()
|
||||
results = response.json().get("results", [])
|
||||
# Simple concatenation of titles and content for summary
|
||||
content_to_summarize = " ".join([
|
||||
f"{r.get('title', '')}: {r.get('content', '')}" for r in results[:5]
|
||||
])
|
||||
return content_to_summarize
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching from SearXNG for {country_name}: {e}")
|
||||
return None
|
||||
|
||||
def summarize_with_ollama(content, country):
|
||||
"""Summarizes content using the ollama library."""
|
||||
if not content or content.isspace():
|
||||
return "No content available to summarize."
|
||||
|
||||
prompt = f"Here are excerpts of news websites from {content}.\nPlease provide a concise summary of these news in English.\nList each individual news in bulletin points. DO NOT list by news outlets - list by relevant news topics.\nDon't intro your response with things like \"..here are the news..\" or anything like that - reply with ONLY the news listed as bulletin points.\nKeep it concise.\nThe news:\n\n{content}"
|
||||
try:
|
||||
response = ollama.generate(
|
||||
model=OLLAMA_MODEL,
|
||||
prompt=prompt
|
||||
)
|
||||
return response.get('response', 'Summary generation failed.').strip()
|
||||
except Exception as e:
|
||||
# The ollama library might raise various exceptions.
|
||||
print(f"Error communicating with Ollama using library: {e}")
|
||||
return "Summary failed due to a communication error."
|
||||
|
||||
def translate_summary(summary_en: str, target_lang: str) -> str | None:
|
||||
"""
|
||||
Translate an English news summary into the specified language using the
|
||||
ollama model. Supported languages: 'de' for German and 'jp' for Japanese.
|
||||
Returns None on failure.
|
||||
"""
|
||||
if not summary_en or summary_en.isspace():
|
||||
return None
|
||||
if target_lang == 'de':
|
||||
prompt = (
|
||||
"Translate the following news summary into natural, accurate German. "
|
||||
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
|
||||
"Summary (English):\n" + summary_en
|
||||
)
|
||||
elif target_lang == 'jp':
|
||||
prompt = (
|
||||
"Translate the following news summary into natural, accurate Japanese. "
|
||||
"Keep the format.\nKeep it concise; do not add new facts.\n\n"
|
||||
"Summary (English):\n" + summary_en
|
||||
)
|
||||
else:
|
||||
# Unsupported language
|
||||
return None
|
||||
try:
|
||||
response = ollama.generate(model=OLLAMA_MODEL, prompt=prompt)
|
||||
return response.get('response', '').strip()
|
||||
except Exception as e:
|
||||
print(f"Error translating summary to {target_lang}: {e}")
|
||||
return None
|
||||
|
||||
def store_news(country_name: str, summary_en: str, summary_de: str | None = None, summary_jp: str | None = None, overwrite: bool = False) -> None:
|
||||
"""
|
||||
Insert/update with verification. After commit, we re-read the row to confirm
|
||||
presence, avoiding misleading “success” logs if an insert was ignored or
|
||||
overwritten later.
|
||||
"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
news_date = datetime.now(timezone.utc).date()
|
||||
try:
|
||||
if overwrite:
|
||||
sql = (
|
||||
"INSERT INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
|
||||
"VALUES (?, ?, ?, ?, ?) "
|
||||
"ON CONFLICT(country_name, news_date) DO UPDATE SET "
|
||||
"summary_en = excluded.summary_en, "
|
||||
"summary_de = COALESCE(excluded.summary_de, news.summary_de), "
|
||||
"summary_jp = COALESCE(excluded.summary_jp, news.summary_jp), "
|
||||
"created_at = CURRENT_TIMESTAMP"
|
||||
)
|
||||
params = (country_name, news_date, summary_en, summary_de, summary_jp)
|
||||
verb = "upserted"
|
||||
else:
|
||||
sql = (
|
||||
"INSERT OR IGNORE INTO news (country_name, news_date, summary_en, summary_de, summary_jp) "
|
||||
"VALUES (?, ?, ?, ?, ?)"
|
||||
)
|
||||
params = (country_name, news_date, summary_en, summary_de, summary_jp)
|
||||
verb = "inserted"
|
||||
|
||||
cursor.execute(sql, params)
|
||||
conn.commit()
|
||||
|
||||
# Verify by reading back the row we *expect* to exist
|
||||
cursor.execute(
|
||||
"SELECT id FROM news WHERE country_name = ? AND news_date = ? LIMIT 1",
|
||||
(country_name, news_date),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
print(f"Successfully {verb} news for {country_name}. (id={row[0]})")
|
||||
else:
|
||||
# This should not happen; make it explicit in logs
|
||||
print(f"[warn] Post-commit verification failed for {country_name} ({news_date}).")
|
||||
except Exception as e:
|
||||
print(f"Error storing news for {country_name}: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def update_translations(country_name: str, news_date, summary_de: str | None = None, summary_jp: str | None = None) -> None:
|
||||
"""
|
||||
Update translation fields for a given country and date. Only updates
|
||||
provided languages; leaving a language as None will not change that
|
||||
column. Updates `created_at` timestamp to reflect the change.
|
||||
"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
updates = []
|
||||
params: list = []
|
||||
if summary_de is not None:
|
||||
updates.append("summary_de = ?")
|
||||
params.append(summary_de)
|
||||
if summary_jp is not None:
|
||||
updates.append("summary_jp = ?")
|
||||
params.append(summary_jp)
|
||||
if not updates:
|
||||
return
|
||||
# Append timestamp update
|
||||
updates.append("created_at = CURRENT_TIMESTAMP")
|
||||
params.append(country_name)
|
||||
params.append(news_date)
|
||||
sql = f"UPDATE news SET {', '.join(updates)} WHERE country_name = ? AND news_date = ?"
|
||||
cursor.execute(sql, params)
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
print(f"Error updating translations for {country_name}: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def process_country_queue(queue, overwrite=False):
|
||||
"""
|
||||
Processes the queue of countries sequentially. Generates English summaries and
|
||||
translations, then inserts or updates rows in the database accordingly.
|
||||
When overwrite=False, existing rows are not replaced; missing translations
|
||||
for existing rows are filled in if possible.
|
||||
"""
|
||||
print(f"Starting to process a queue of {len(queue)} countries. Overwrite: {overwrite}")
|
||||
for country_entry in queue:
|
||||
country_name = country_entry["country"]
|
||||
print(f"\n--- Processing: {country_name} ---")
|
||||
news_date = datetime.now(timezone.utc).date()
|
||||
# Determine if a record exists for this country/date
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"SELECT summary_en, summary_de, summary_jp FROM news WHERE country_name = ? AND news_date = ?",
|
||||
(country_name, news_date),
|
||||
)
|
||||
existing_row = cursor.fetchone()
|
||||
conn.close()
|
||||
|
||||
if existing_row and not overwrite:
|
||||
# Existing row: fill missing translations if possible
|
||||
summary_en_db, summary_de_db, summary_jp_db = existing_row
|
||||
# Only attempt translation if English summary is present
|
||||
if summary_en_db:
|
||||
need_de = summary_de_db is None
|
||||
need_jp = summary_jp_db is None
|
||||
if need_de or need_jp:
|
||||
print(f"Existing summary found for {country_name}; translating missing languages...")
|
||||
summary_de_new = translate_summary(summary_en_db, 'de') if need_de else None
|
||||
summary_jp_new = translate_summary(summary_en_db, 'jp') if need_jp else None
|
||||
# Log translation success/failure
|
||||
if need_de and summary_de_new:
|
||||
print(f"Filled German translation for {country_name}.")
|
||||
if need_jp and summary_jp_new:
|
||||
print(f"Filled Japanese translation for {country_name}.")
|
||||
# Update only provided translations
|
||||
update_translations(country_name, news_date, summary_de_new, summary_jp_new)
|
||||
else:
|
||||
print(f"Existing row for {country_name} lacks English summary; cannot translate.")
|
||||
# Skip summarization for existing rows when not overwriting
|
||||
continue
|
||||
# Fetch news content
|
||||
print(f"Fetching news for {country_name}...")
|
||||
news_content = fetch_searxng_results(country_name)
|
||||
if not news_content:
|
||||
print(f"No content fetched for {country_name}. Skipping summary.")
|
||||
continue
|
||||
# Summarize in English
|
||||
print(f"Summarizing news for {country_name}...")
|
||||
summary_en = summarize_with_ollama(news_content, country_name)
|
||||
# Translate to German and Japanese
|
||||
summary_de = translate_summary(summary_en, 'de')
|
||||
summary_jp = translate_summary(summary_en, 'jp')
|
||||
# Store the summaries
|
||||
store_news(country_name, summary_en, summary_de, summary_jp, overwrite=overwrite)
|
||||
# Brief pause between requests
|
||||
time.sleep(1) # Small delay to avoid overwhelming services
|
||||
|
||||
# --- Main Loop ---
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="10AM News Collector Service")
|
||||
parser.add_argument("--dev", action="store_true", help="Run in development mode: process next 10am countries once and exit.")
|
||||
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing news summaries in the database.")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("Starting News Collector Service...")
|
||||
setup_database()
|
||||
# Perform schema migrations (safe, idempotent)
|
||||
try:
|
||||
run_db_migrations()
|
||||
except Exception as e:
|
||||
print(f"Migration error: {e}")
|
||||
|
||||
|
||||
try:
|
||||
with open(CAPITALS_FILE, "r", encoding="utf-8") as f:
|
||||
capitals_data = json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Capitals data file not found at {CAPITALS_FILE}")
|
||||
return
|
||||
|
||||
if args.dev:
|
||||
print(f"Running in DEV mode... Overwrite: {args.overwrite}")
|
||||
country_queue = get_next_10am_countries(capitals_data)
|
||||
if country_queue:
|
||||
process_country_queue(country_queue, overwrite=args.overwrite)
|
||||
else:
|
||||
print("Could not determine the next set of countries for 10 AM.")
|
||||
print("DEV mode run complete.")
|
||||
return
|
||||
|
||||
print("Running in PRODUCTION mode. Checking UTC minute windows (:00/:15/:30/:45) every minute...")
|
||||
while True:
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
# Fire at every 15-minute UTC slot to cover quarter-hour timezones
|
||||
if (now_utc.minute % 15) == 0:
|
||||
print(f"UTC {now_utc.strftime('%H:%M')} → checking for countries at local 09:30.")
|
||||
country_queue = get_countries_at_time(9, 30, capitals_data)
|
||||
if country_queue:
|
||||
process_country_queue(country_queue, overwrite=args.overwrite)
|
||||
else:
|
||||
print("No countries are at 09:30 local time right now.")
|
||||
# Sleep until next minute boundary (UTC)
|
||||
time.sleep(60 - now_utc.second)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user