citation_bot/csv_importer_technopolice.py

341 lines
11 KiB
Python
Raw Normal View History

2022-07-11 09:20:17 +00:00
import urllib.request
import json
import logging
import requests
import argparse
import datetime
import tqdm
import csv
logger = logging.getLogger('wiki.importer')
default_categories = [
'Person',
'Institution',
'Technology',
'Deployments',
'Dataset',
'City',
'Country',
]
parser = argparse.ArgumentParser(
description='Turn wiki into nodes & links, usable by d3-force.')
parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+',
help='Categories')
parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php",
help='Wiki API URL')
parser.add_argument('--output', default="semantic_data.json",
help='Output JSON file')
parser.add_argument('--credentials', default="no_credentials.json",
help="JSON file containing the Bot's credentials")
parser.add_argument('--csv', default="test.csv",
help="CVS file to import")
parser.add_argument('--citeref', default="technopoliceFrMarch2020",
help="Bibliography key for imported items")
args = parser.parse_args()
with open(args.credentials) as fp:
credentials = json.load(fp)
username = credentials['user']
password = credentials['password']
def get_session():
S = requests.Session()
URL = args.url
# Retrieve login token first
PARAMS_0 = {
'action': "query",
'meta': "tokens",
'type': "login",
'format': "json"
}
R = S.get(url=URL, params=PARAMS_0)
DATA = R.json()
logger.debug(DATA)
LOGIN_TOKEN = DATA['query']['tokens']['logintoken']
logger.debug(LOGIN_TOKEN)
# Send a post request to login. Using the main account for login is not
# supported. Obtain credentials via Special:BotPasswords
# (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword
PARAMS_1 = {
'action': "login",
'lgname': username,
'lgpassword': password,
'lgtoken': LOGIN_TOKEN,
'format': "json"
}
R = S.post(URL, data=PARAMS_1)
DATA = R.json()
logger.debug(DATA)
if DATA['login']['result'] != 'Success':
raise Exception("Failed logging in")
return S
# Map columns
# split on |
[
'Title' # Title,
'Date added' # -,
'Template' # Category (map name & fields),
'Type de document administratif', # Documents administratifs
'Date', # Lois ou règlements & Documents administratifs
'Produit par', # Documents administratifs & Lois ou règlements
'Titre complet', # Lois ou règlements & Documents administratifs
'Projet(s) lié(s)', # Documents/Lois/Contentieux
# "Financement BPI Safe City|Convention d'expérimentation Safe City Nice|Réponse Etablissement Paris La Déf. Projet SafeCity"
'Document(s) lié(s)',
'Origine du document',
'Description',
'Sujet(s)', # set list of items
'URL',
'Nom complet', # acteurs:
"Type d'acteur", # Acteurs
'Compétences', # Acteurs: "Police et justice|Technologies, innovation, R&D"
'Adresse', # Acteurs
'Fait partie de', # Acteurs: Page
'Image',
'Application(s)', # Projet: Set list of items
'Enjeu(x) prioritaire(s)',
'Durée du projet', # Project: date range, eg me/01/2020~sa/12/2022 -> Jan 1, 2020 ~ Dec 31, 2022
'Description du projet', # Project: (body) text
"Coût du projet (en millions d'euros)", # Projet: number * 1000.000 (project)
'Commanditaire(s)', # Projet: Page
'Financements publics', # Projet: Page
'Entreprise(s) prestataire(s)', # Projet Page
'Type de document',
'Type de loi ou règlement', # Lois ou règlements
"Date d'adoption",
'Domaine(s)',
"Période d'applicabilité", 'Dernière modification',
'Juridiction',
'Geolocation', # geolocation (projet, Acteurs)
'Documents',
'Attachments',
'Published'
]
[
'Title', # Title,
'Date added',# -,
'Template' # Category (map name & fields),
'Application(s)', # Projet: Set list of items
'Enjeu(x) prioritaire(s)', # Projet: Set list of items
'Durée du projet', # Project: date range, eg me/01/2020~sa/12/2022 -> Jan 1, 2020 ~ Dec 31, 2022
'Description du projet', # Project: (body) text
"Coût du projet (en millions d'euros)", # Projet: number * 1000.000 (project)
'Commanditaire(s)', # Projet: Page
'Financements publics', # Project: Page
'Entreprise(s) prestataire(s)', # Project: Page
'Document(s) lié(s)', # -
'URL', # Url
'Nom complet', # -
"Type d'acteur", # Acteur: set list
'Compétences', # Acteurs: "Police et justice|Technologies, innovation, R&D"
'Adresse', # Acteurs text
'Fait partie de', # Acteurs: Page
'Image',
'Description', # Body text
'Geolocation', # geolocation (projet, Acteurs)
'Documents',
'Attachments',
'Published'
]
def mapEntry(entry) -> dict:
# 'URL', # Url (split by |)
# 'Description', # Body text
# 'Geolocation', # convert to City
if entry['Template'] == 'Projets':
return mapDeployment(entry)
elif entry['Template'] == 'Acteurs':
return mapInstitution(entry)
else:
logger.critical(f"Invalid category/Template for entry: {entry['Template']}")
def parseStrings(*input):
items = []
for i in input:
items.extend(i.split('|'))
return items
def parseGeo(loc):
if not len(loc):
return ''
return '°,'.join(loc.split('|')) + '°'
def parseDate(d):
# date is in odd format, so we skip the day (which is DoW instead of DoM)
# me/01/2020~sa/12/2022 -> Jan 1, 2020 ~ Dec 31, 2022
if '/' in d:
parts = d.split('/')
return f"{parts[1]}/{parts[2]}"
return d
def mapDeployment(entry):
global args
return {
'title': entry['Title'],
'@type': 'Deployments',
'properties': {
"Keywords": parseStrings(
entry['Application(s)'],
entry['Enjeu(x) prioritaire(s)'],
),
"Managed by": parseStrings(entry['Commanditaire(s)']),
"Deployment Start Date": parseDate(entry['Durée du projet'].split('~')[0]),
"Deployment End Date": parseDate(entry['Durée du projet'].split('~')[1]) if '~' in entry['Durée du projet'] else '',
},
"additionalProperties": {
"Budget": int(entry["Coût du projet (en millions d'euros)"]) * 1000000 if entry["Coût du projet (en millions d'euros)"] else None,
"Funded by": parseStrings(entry['Financements publics']),
"Provided by": parseStrings(entry['Entreprise(s) prestataire(s)']),
"URL": entry['URL'],
"Geolocation": parseGeo(entry['Geolocation']),
'CiteRef': args.citeref,
},
"body": [entry['Description du projet'], entry['Description']]
}
# Deployments
# 'Application(s)': "Keywords": Capteurs audios, Vidéosurveillance automatisée, Fichiers, Statistiques Big Data,Identification biométrique, profilage,
# 'Enjeu(x) prioritaire(s)', # Projet: Set list of items (Keywords): Technologies, innovation, R&D, Transport, Éducation, Police et justice,
# 'Durée du projet', # Deployment_Start_Date Deployment_End_Date Project: date range, eg me/01/2020~sa/12/2022 -> Jan 1, 2020 ~ Dec 31, 2022
# 'Description du projet', # Project: (body) text
# "Coût du projet (en millions d'euros)", # Projet: number * 1000.000 (project)
# 'Commanditaire(s)', # Projet: Page
# 'Financements publics', # Project: Page
# 'Entreprise(s) prestataire(s)', # Project: Page
pass
def parseType(type):
typemap = {
'Entreprise': 'Company',
'Collectivité territoriale': 'Local Government',
'Association': 'NGO',
'Syndicat': 'Labor union',
'Institution ou organisme public': 'Government',
'Juridiction ou autorité de régulation': 'Government',
'Juridiction': 'Government',
}
return typemap[type]
def mapInstitution(entry):
global args
return {
'title': entry['Title'],
'@type': 'Institution',
'properties': {
"Keywords": parseStrings(
entry['Compétences']
),
"Institution Type": parseType(entry["Type d'acteur"]),
"Deployment Start Date": parseDate(entry['Durée du projet'].split('~')[0]),
"Deployment End Date": parseDate(entry['Durée du projet'].split('~')[1]) if '~' in entry['Durée du projet'] else '',
'URL': entry['URL'],
'Address': entry['Adresse'],
'Related Institutions': parseStrings(entry['Fait partie de'])
},
"additionalProperties": {
"Geolocation": parseGeo(entry['Geolocation']),
'CiteRef': args.citeref,
},
"body": [entry['Description']]
}
# "Type d'acteur", # Institution_Type: set list: Entreprise, Collectivité territoriale, Association, Syndicat, Institution ou organisme public, Juridiction ou autorité de régulation, Juridiction
# 'Compétences', # Keywords: Droits fondamentaux, Éducation, "Police et justice|Technologies, innovation, R&D"
# 'Adresse', # Address text
# 'Fait partie de', # Acteurs: Page link in Body: [[Collaborates With::NAME]]
def renderPage(data):
global args
page = f"{{{{{data['@type']}"
for key, value in data['properties'].items():
page += f"\n|{key}=" + (', '.join(value) if isinstance(value, list) else value)
page += "}}\n\n"
for b in data['body']:
if b and len(b):
page += f"<blockquote>{b} [[CiteRef::{args.citeref}]]</blockquote>\n\n"
if len(data['additionalProperties']):
page += "=== Additional properties ===\n\n"
for key, value in data['additionalProperties'].items():
if not isinstance(value, list):
value = [value]
for v in value:
if v:
page += f"* {key} [[{key}::{v}]]\n"
return page
def saveIfNotExists(data, page, session, token):
# https://en.wikipedia.org/w/api.php?action=query&prop=info&titles=New%20York%20Yankeesdfsdf
# baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json"
params = {
'action': 'edit',
'createonly': '1',
'title': data['title'],
'contentformat': 'text/x-wiki',
'text': page,
'format': 'json',
'token': token,
}
logger.debug(args.url, params)
logger.warning(f"Creating {data['title']}")
response = session.post(args.url, data=params)
resp = response.json()
if 'warnings' in resp:
logger.warning(resp)
logger.debug(resp)
# print(responseData)
def getEditToken(session):
params = {
'action': "query",
'meta': "tokens",
'type': "csrf",
'format': "json"
}
R = session.get(args.url, params=params)
DATA = R.json()
logger.debug(DATA)
return DATA['query']['tokens']['csrftoken']
if __name__ == "__main__":
logger.setLevel(logging.DEBUG)
session = get_session()
token = getEditToken(session)
i = 0
with open(args.csv, newline='') as csvfile:
csvreader = csv.DictReader(csvfile, delimiter=',')
for row in csvreader:
data = mapEntry(row)
page = renderPage(data)
saveIfNotExists(data, page, session, token)
i+= 1
# if i > 5:
# break