citation_bot/csv_importer_atlas-of-surve...

469 lines
15 KiB
Python

from time import sleep
from typing import Optional
import urllib.request
import json
import logging
import requests
import argparse
import datetime
import tqdm
import csv
from geopy.geocoders import Nominatim
logger = logging.getLogger('wiki.importer')
default_categories = [
'Person',
'Institution',
'Technology',
'Deployments',
'Dataset',
'City',
'Country',
]
geocoder = Nominatim(user_agent="tutorial")
parser = argparse.ArgumentParser(
description='Turn wiki into nodes & links, usable by d3-force.')
parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+',
help='Categories')
parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php",
help='Wiki API URL')
parser.add_argument('--output', default="semantic_data.json",
help='Output JSON file')
parser.add_argument('--credentials', default="no_credentials.json",
help="JSON file containing the Bot's credentials")
parser.add_argument('--csv', default="Atlas of Surveillance-Gunshot Detection,Face Recognition,Real-Time Crime Center,Video Analytics-20220621.csv",
help="CVS file to import")
parser.add_argument('--citeref', default="atlasofsurveillance2022",
help="Bibliography key for imported items")
parser.add_argument('--dry-run', '-n', action="store_true",
help="Dry run")
parser.add_argument('--skip-geolocation', action="store_true",
help="Skip geolocation fetch, for faster dry-run")
args = parser.parse_args()
if args.skip_geolocation and not args.dry_run:
raise Exception("Cannot do a real run without geolocating cities")
with open(args.credentials) as fp:
credentials = json.load(fp)
username = credentials['user']
password = credentials['password']
def get_session():
S = requests.Session()
URL = args.url
# Retrieve login token first
PARAMS_0 = {
'action': "query",
'meta': "tokens",
'type': "login",
'format': "json"
}
R = S.get(url=URL, params=PARAMS_0)
DATA = R.json()
logger.debug(DATA)
LOGIN_TOKEN = DATA['query']['tokens']['logintoken']
logger.debug(LOGIN_TOKEN)
# Send a post request to login. Using the main account for login is not
# supported. Obtain credentials via Special:BotPasswords
# (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword
PARAMS_1 = {
'action': "login",
'lgname': username,
'lgpassword': password,
'lgtoken': LOGIN_TOKEN,
'format': "json"
}
R = S.post(URL, data=PARAMS_1)
DATA = R.json()
logger.debug(DATA)
if DATA['login']['result'] != 'Success':
raise Exception("Failed logging in")
return S
# Map columns
[
'AOSNUMBER',
'City', # City_state
# 'County',
'State',
'Agency', # Institution
'Type of LEA', # Law enforment agency (Institution type?)
'Summary', # body text
'Type of Juris', # instution type? (municipal/county/state etc)
'Technology', # deployment type? face recognition etc
'Vendor', # empty or clearview ai, veritone etc. (Institution)
'Link 1',
# 'Link 1 Snapshot',
'Link 1 Source',
# 'Link 1 Type',
'Link 1 Date',
'Link 2',
# 'Link 2 Snapshot',
'Link 2 Source',
# 'Link 2 Type',
'Link 2 Date',
'Link 3',
# 'Link 3 Snapshot',
'Link 3 Source',
# 'Link 3 Type',
'Link 3 Date',
# 'Other Links',
'Statewide Network of Agency Photos (SNAP)', # single deplyment, aggregrate Used by
'Face Analysis Comparison & Examination System (FACES)', # single deplyment, aggregrate Used by
'Maryland Image Repository System', # single deplyment, aggregrate Used by
#'Clearview AI', # no aggregation
#'BriefCam', # no aggregation?
'FACE Services', # create link: Input for
'Relevant for the Wiki?', # FILTER!!
]
# title: Use of [VENDOR ]TECHNOLOGY by AGENCY
# City: CITY (STATE) or aggregated
# Country: USA
# Software Used: VENDOR TECHNOLOGY
# Used by: AGENCY
# Information Certainty: Documented
# Input for: [FACE Services]
# body: <blockquote> SUMMARY</blockquote>
# Additional properties:
# Original Sources: url date (link 1, 2, 3)
# AOSNUMBER
# CiteRef: args.citeref
# title: AGENCY
# City: CITY (STATE)
# Institution Type: Law Enforcement
# aggregate agencies when these columns are 'yes'
aggregates = {
'Statewide Network of Agency Photos (SNAP)': [],
'Face Analysis Comparison & Examination System (FACES)': [],
'Maryland Image Repository System': [],
}
institutions = {}
cities = {}
technologies = {}
def mapEntry(entry) -> Optional[dict]:
if entry['Relevant for the Wiki?'] != 'Yes':
logger.warning(f'Ignore entry {entry["AOSNUMBER"]}')
return None
else:
hasAggregated = False
for field in aggregates.keys():
if entry[field] == 'Yes':
aggregates[field].append(entry)
hasAggregated = True
if hasAggregated:
return None
return mapDeployment(entry)
def mapTechnology(entry):
entry['Vendor'] = entry['Vendor'].strip()
tech = {
'title': f"{entry['Vendor'] if entry['Vendor'] else 'Unknown'} {entry['Technology']}",
"@type": "Products",
'properties': {
"Developed by": entry['Vendor'],
},
"additionalProperties": {
"Technology Type": entry['Technology'],
"Needs processing of the title": "Yes",
'CiteRef': args.citeref,
}
}
technologies[tech['title']] = tech
return tech
def mapDeployment(entry):
global args
city = mapCity(entry)
mapInstitution(entry)
if entry['Vendor'] and len(entry['Vendor'].strip()) > 0:
mapDeveloperInstitution(entry['Vendor'])
tech = mapTechnology(entry)
return {
'title': f"{entry['Vendor']} {entry['Technology']} used by {entry['Agency']}".replace(' '," ").strip(),
'@type': 'Deployments',
'properties': {
"Keywords": [entry['Technology']],
"used by": entry['Agency'],
"Software Deployed": tech['title'],
"City": city['title'],
"Information Certainty": "Documented",
},
"additionalProperties": {
"URL": [
"https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'],
entry['Link 1'],
entry['Link 2'],
entry['Link 3'],
],
'CiteRef': args.citeref,
"Input for": "FACES (FBI) Dataset" if entry['FACE Services'] == 'Yes' else None,
},
"body": [entry['Summary']]
}
# Type of LEA: Court gives Institution Type::Government Instititution Sector::Justice,
# Police/Sheriff/State Police/District Attorney/Attorney General/Prosecutor/School Police/Constables/DHS/Fusion Center/Juvenile/Security/Transit Police Type::Law Enfrocement Instititution Sector::Security
# DMV/Emergency Services/Parks/State Agency/Transit Institution Type::Government Institution Sector::Civil Administration
# Medical Examiner Institution Type::Government Institution Sector::Health
# School District Institution Type::Local Government Institution Sector::Education
# State-Local Partnership Institution Type::State-Local Partnership Institution Sector::Security
institution_type_sector = {
"Court": ("Government", "Justice"),
"Police": ("Law Enforcement", "Security"),
"Sheriff": ("Law Enforcement", "Security"),
"State Police": ("Law Enforcement", "Security"),
"District Attorney": ("Law Enforcement", "Security"),
"Attorney General": ("Law Enforcement", "Security"),
"Prosecutor": ("Law Enforcement", "Security"),
"School Police": ("Law Enforcement", "Security"),
"Constables": ("Law Enforcement", "Security"),
"DHS": ("Law Enforcement", "Security"),
"Fusion Center": ("Law Enforcement", "Security"),
"Juvenile": ("Law Enforcement", "Security"),
"Security": ("Law Enforcement", "Security"),
"Transit Police": ("Law Enforcement", "Security"),
"Corrections": ("Law Enforcement", "Security"),
"Clemis": ("Law Enforcement", "Security"),
"DMV": ("Government", "Civil Administration"),
"Emergency Services": ("Government", "Civil Administration"),
"Parks": ("Government", "Civil Administration"),
"State Agency": ("Government", "Civil Administration"),
"Transit": ("Government", "Civil Administration"),
"Medical Examiner": ("Local Government", "Health"),
"School District": ("Local Government", "Education"),
"State-Local Partnership": ("State-Local Partnership", "Security"),
}
def mapInstitution(entry):
# aggregate agencies as institutions from entries
global args
type, sector = institution_type_sector[entry['Type of LEA']]
info = {
'title': entry['Agency'],
'@type': 'Institution',
'properties': {
"Institution Type": type,
"Institution Sector": sector,
'City': mapCity(entry)['title'],
},
"additionalProperties": {
"Type of Juris": entry['Type of Juris'],
"URL": [
"https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'],
entry['Link 1'],
entry['Link 2'],
entry['Link 3'],
],
'CiteRef': args.citeref,
},
"body": [entry['Type of LEA']],
}
if entry['Agency'] in institutions:
logger.warning(f'Ignore duplicate {entry["Agency"]}')
else:
institutions[entry['Agency']] = info
def mapDeveloperInstitution(title):
if title in institutions:
return
institutions[title] = {
'title': title,
'@type': 'Institution',
'properties': {
},
"additionalProperties": {
"Needs content": "Yes",
'CiteRef': args.citeref,
}
}
def mapCity(entry):
title = f"{entry['City']} ({entry['State']})"
if title not in cities:
info = {
'title': title,
'@type': 'City',
'properties': {
"is in Country": "USA",
},
"additionalProperties": {
'CiteRef': args.citeref,
}
}
if not args.skip_geolocation:
location_response = geocoder.geocode(f"{entry['City']}, {entry['State']}, USA")
sleep(1) # free tier of location geocode requires 1 sec delay
if location_response:
location = location_response.raw
info["properties"]["Has Coordinates"] = f"{location['lat']}, {location['lon']}"
info["body"] = [location['display_name']]
else:
logger.warning(f"No location data for {title} USA")
cities[title] = info
return cities[title]
def mapAggregate(title, data):
urls = [ "https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'] for entry in data]
urls.extend([entry['Link 1'] for entry in data])
urls.extend([entry['Link 2'] for entry in data])
urls.extend([entry['Link 3'] for entry in data])
urls = list(dict.fromkeys(urls)) # unique
urls = list(filter(lambda url: url and len(url) > 0, urls))
for entry in data:
mapInstitution(entry)
return {
"title": title,
'@type': 'Deployments',
'properties': {
"Information Certainty": "Documented",
"used by": [entry['Agency'] for entry in data]
},
"additionalProperties": {
"URL": urls,
'CiteRef': args.citeref,
}
}
def renderPage(data):
global args
page = f"{{{{{data['@type']}"
for key, value in data['properties'].items():
page += f"\n|{key}=" + (', '.join(value) if isinstance(value, list) else value)
page += "}}\n\n"
if 'body' in data:
for b in data['body']:
if b and len(b):
page += f"<blockquote>{b} [[CiteRef::{args.citeref}]]</blockquote>\n\n"
if len(data['additionalProperties']):
page += "=== Additional properties ===\n\n"
for key, value in data['additionalProperties'].items():
if not isinstance(value, list):
value = [value]
for v in value:
if v:
page += f"* {key} [[{key}::{v}]]\n"
return page
def saveIfNotExists(data, page, session, token):
# https://en.wikipedia.org/w/api.php?action=query&prop=info&titles=New%20York%20Yankeesdfsdf
# baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json"
params = {
'action': 'edit',
'createonly': '1',
'title': data['title'].strip(),
'contentformat': 'text/x-wiki',
'text': page,
'format': 'json',
'token': token,
}
logger.debug(args.url, params)
if not args.dry_run:
logger.warning(f"Creating '{data['title'].strip()}' type {data['@type']}")
response = session.post(args.url, data=params)
resp = response.json()
if 'warnings' in resp:
logger.warning(resp)
logger.debug(resp)
else:
logger.warning(f"'{data['title'].strip()}' type {data['@type']}")
def getEditToken(session):
params = {
'action': "query",
'meta': "tokens",
'type': "csrf",
'format': "json"
}
R = session.get(args.url, params=params)
DATA = R.json()
logger.debug(DATA)
return DATA['query']['tokens']['csrftoken']
if __name__ == "__main__":
logger.setLevel(logging.DEBUG)
session = get_session()
token = getEditToken(session)
parsedData=[]
with open(args.csv, newline='') as csvfile:
csvreader = csv.DictReader(csvfile, delimiter=',')
for row in tqdm.tqdm(csvreader):
data = mapEntry(row)
if data is None:
continue
parsedData.append(data)
parsedData.extend([mapAggregate(title, a) for title, a in aggregates.items()])
parsedData.extend(cities.values())
parsedData.extend(technologies.values())
parsedData.extend(institutions.values())
# print(parsedData)
for i, data in enumerate(parsedData):
page = renderPage(data)
# if data['@type'] == 'City': #only for city as to update coordinates
saveIfNotExists(data, page, session, token)
# if i > 5:
# break
print(f"total: {len(parsedData)} items (of which {len(institutions)} institutions, {len(cities)} cities, {len(technologies)} products)")
print (len(parsedData) - len(institutions) - len(cities) - len(technologies), "deployments" )
#
# Title: vendor/unknown
# Postprocessing: make sure unknown are numbered, and multiple related deployments individual unknowns are created.