import requests import logging import argparse import sqlite3 import csv import urllib.parse class LocationStore: def __init__(self, db_filename): self.conn = sqlite3.connect(db_filename) # make sure the table exits. createSqls = [""" CREATE TABLE IF NOT EXISTS `people` ( `name` VARCHAR(255), `city_id` INTEGER, `city_name` VARCHAR(255), `latitude` VARCHAR(255), `longitude` VARCHAR(255), `wd_person_id` VARCHAR ( 20 ) UNIQUE ); """, """ CREATE UNIQUE INDEX IF NOT EXISTS `unique_name` ON `people` ( `name` ); """] cur = self.conn.cursor() for sql in createSqls: cur.execute(sql) self.conn.commit() # # def __enter__(self): # self.c = self.conn.cursor() # return self # # def __exit__(self, type, value, traceback): # self.c.close() def addVariable(self, name, wd_id, city_name, city_id, latitude, longitude): c = self.conn.cursor()"Queing storing of {name} ({wd_id}) in {city_name} ({city_id}) on {latitude}/{longitude}") if wd_id == -1 or wd_id == '-1': wd_id = None c.execute("INSERT OR REPLACE INTO people (name, wd_person_id, city_name, city_id, latitude, longitude) VALUES (?,?,?,?, ?, ?)", (name, wd_id, city_name, city_id, latitude, longitude)) self.conn.commit() c.close() def contains(self, name): cur = self.conn.cursor() cur.execute(f"SELECT name FROM people WHERE name = ?", (name,)) values = [v[0] for v in cur.fetchall()] cur.close() if len(values): return True return False logging.basicConfig(level=logging.INFO) logger = logging.getLogger('cities') argParser = argparse.ArgumentParser(description='Get coordinates for the birth places of the LFW people') argParser.add_argument( '--db', type=str, required=True, help='' ) argParser.add_argument( '--csv', type=str, required=True, help='' ) # argParser.add_argument( # '--limit', # type=int, # default=1000, # help='Limit of new messages to parse' # ) argParser.add_argument( '--verbose', '-v', action='store_true', help='Debug logging' ) args = argParser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) # problem how to search by title? # query = """ # SELECT ?city ?geoloc where { # # wd:Q47526 wdt:P27 ?geoloc . # wd:Q47526 wdt:P19 ?city . # ?city wdt:P17 ?country . # ?city wdt:P625 ?geoloc . # SERVICE wikibase:label { # bd:serviceParam wikibase:language "en" . # } # } # """ with open(args.csv, 'r') as fp: reader = csv.reader(fp, delimiter='\t') names = [r[0] for r in reader] storage = LocationStore(args.db) for name in names: logger.debug(f"Name: {name}") if storage.contains(name):"Skip {name} - exists already") continue urlName = urllib.parse.quote(name) searchUrl = f"{urlName}&format=json" headers = {"Accept" : "application/json"} response = requests.get(searchUrl, headers=headers) data = response.json() for wdId in data['entities']: try: city_id = data['entities'][wdId]['claims']['P19'][0]['mainsnak']['datavalue']['value']['id'] except Exception as e: logger.warn(f"No city found for '{name}'") logger.debug(data) storage.addVariable(name, wdId, '','','','') continue try: geolocUrl = f"{city_id}&format=json" r2 = requests.get(geolocUrl, headers=headers) d2 = r2.json() city_name = d2['entities'][city_id]['labels']['en']['value'] latitude = d2['entities'][city_id]['claims']['P625'][0]['mainsnak']['datavalue']['value']['latitude'] longitude = d2['entities'][city_id]['claims']['P625'][0]['mainsnak']['datavalue']['value']['longitude'] storage.addVariable(name, wdId, city_name, city_id, latitude, longitude) except Exception as e: logger.warn(f"Error when doing followup query to {city_id} for {name}") logger.exception(e) storage.addVariable(name, wdId, '', city_id, '','') break