from time import sleep from typing import Optional import urllib.request import json import logging import requests import argparse import datetime import tqdm import csv from geopy.geocoders import Nominatim logger = logging.getLogger('wiki.importer') default_categories = [ 'Person', 'Institution', 'Technology', 'Deployments', 'Dataset', 'City', 'Country', ] geocoder = Nominatim(user_agent="tutorial") parser = argparse.ArgumentParser( description='Turn wiki into nodes & links, usable by d3-force.') parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+', help='Categories') parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php", help='Wiki API URL') parser.add_argument('--output', default="semantic_data.json", help='Output JSON file') parser.add_argument('--credentials', default="no_credentials.json", help="JSON file containing the Bot's credentials") parser.add_argument('--csv', default="Atlas of Surveillance-Gunshot Detection,Face Recognition,Real-Time Crime Center,Video Analytics-20220621.csv", help="CVS file to import") parser.add_argument('--citeref', default="atlasofsurveillance2022", help="Bibliography key for imported items") parser.add_argument('--dry-run', '-n', action="store_true", help="Dry run") parser.add_argument('--skip-geolocation', action="store_true", help="Skip geolocation fetch, for faster dry-run") args = parser.parse_args() if args.skip_geolocation and not args.dry_run: raise Exception("Cannot do a real run without geolocating cities") with open(args.credentials) as fp: credentials = json.load(fp) username = credentials['user'] password = credentials['password'] def get_session(): S = requests.Session() URL = args.url # Retrieve login token first PARAMS_0 = { 'action': "query", 'meta': "tokens", 'type': "login", 'format': "json" } R = S.get(url=URL, params=PARAMS_0) DATA = R.json() logger.debug(DATA) LOGIN_TOKEN = DATA['query']['tokens']['logintoken'] logger.debug(LOGIN_TOKEN) # Send a post request to login. Using the main account for login is not # supported. Obtain credentials via Special:BotPasswords # (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword PARAMS_1 = { 'action': "login", 'lgname': username, 'lgpassword': password, 'lgtoken': LOGIN_TOKEN, 'format': "json" } R = S.post(URL, data=PARAMS_1) DATA = R.json() logger.debug(DATA) if DATA['login']['result'] != 'Success': raise Exception("Failed logging in") return S # Map columns [ 'AOSNUMBER', 'City', # City_state # 'County', 'State', 'Agency', # Institution 'Type of LEA', # Law enforment agency (Institution type?) 'Summary', # body text 'Type of Juris', # instution type? (municipal/county/state etc) 'Technology', # deployment type? face recognition etc 'Vendor', # empty or clearview ai, veritone etc. (Institution) 'Link 1', # 'Link 1 Snapshot', 'Link 1 Source', # 'Link 1 Type', 'Link 1 Date', 'Link 2', # 'Link 2 Snapshot', 'Link 2 Source', # 'Link 2 Type', 'Link 2 Date', 'Link 3', # 'Link 3 Snapshot', 'Link 3 Source', # 'Link 3 Type', 'Link 3 Date', # 'Other Links', 'Statewide Network of Agency Photos (SNAP)', # single deplyment, aggregrate Used by 'Face Analysis Comparison & Examination System (FACES)', # single deplyment, aggregrate Used by 'Maryland Image Repository System', # single deplyment, aggregrate Used by #'Clearview AI', # no aggregation #'BriefCam', # no aggregation? 'FACE Services', # create link: Input for 'Relevant for the Wiki?', # FILTER!! ] # title: Use of [VENDOR ]TECHNOLOGY by AGENCY # City: CITY (STATE) or aggregated # Country: USA # Software Used: VENDOR TECHNOLOGY # Used by: AGENCY # Information Certainty: Documented # Input for: [FACE Services] # body:
SUMMARY
# Additional properties: # Original Sources: url date (link 1, 2, 3) # AOSNUMBER # CiteRef: args.citeref # title: AGENCY # City: CITY (STATE) # Institution Type: Law Enforcement # aggregate agencies when these columns are 'yes' aggregates = { 'Statewide Network of Agency Photos (SNAP)': [], 'Face Analysis Comparison & Examination System (FACES)': [], 'Maryland Image Repository System': [], } institutions = {} cities = {} technologies = {} def mapEntry(entry) -> Optional[dict]: if entry['Relevant for the Wiki?'] != 'Yes': logger.warning(f'Ignore entry {entry["AOSNUMBER"]}') return None else: hasAggregated = False for field in aggregates.keys(): if entry[field] == 'Yes': aggregates[field].append(entry) hasAggregated = True if hasAggregated: return None return mapDeployment(entry) def mapTechnology(entry): entry['Vendor'] = entry['Vendor'].strip() tech = { 'title': f"{entry['Vendor'] if entry['Vendor'] else 'Unknown'} {entry['Technology']}", "@type": "Products", 'properties': { "Developed by": entry['Vendor'], }, "additionalProperties": { "Technology Type": entry['Technology'], "Needs processing of the title": "Yes", 'CiteRef': args.citeref, } } technologies[tech['title']] = tech return tech def mapDeployment(entry): global args city = mapCity(entry) mapInstitution(entry) if entry['Vendor'] and len(entry['Vendor'].strip()) > 0: mapDeveloperInstitution(entry['Vendor']) tech = mapTechnology(entry) return { 'title': f"{entry['Vendor']} {entry['Technology']} used by {entry['Agency']}".replace(' '," ").strip(), '@type': 'Deployments', 'properties': { "Keywords": [entry['Technology']], "used by": entry['Agency'], "Software Deployed": tech['title'], "City": city['title'], "Information Certainty": "Documented", }, "additionalProperties": { "URL": [ "https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'], entry['Link 1'], entry['Link 2'], entry['Link 3'], ], 'CiteRef': args.citeref, "Input for": "FACES (FBI) Dataset" if entry['FACE Services'] == 'Yes' else None, }, "body": [entry['Summary']] } # Type of LEA: Court gives Institution Type::Government Instititution Sector::Justice, # Police/Sheriff/State Police/District Attorney/Attorney General/Prosecutor/School Police/Constables/DHS/Fusion Center/Juvenile/Security/Transit Police Type::Law Enfrocement Instititution Sector::Security # DMV/Emergency Services/Parks/State Agency/Transit Institution Type::Government Institution Sector::Civil Administration # Medical Examiner Institution Type::Government Institution Sector::Health # School District Institution Type::Local Government Institution Sector::Education # State-Local Partnership Institution Type::State-Local Partnership Institution Sector::Security institution_type_sector = { "Court": ("Government", "Justice"), "Police": ("Law Enforcement", "Security"), "Sheriff": ("Law Enforcement", "Security"), "State Police": ("Law Enforcement", "Security"), "District Attorney": ("Law Enforcement", "Security"), "Attorney General": ("Law Enforcement", "Security"), "Prosecutor": ("Law Enforcement", "Security"), "School Police": ("Law Enforcement", "Security"), "Constables": ("Law Enforcement", "Security"), "DHS": ("Law Enforcement", "Security"), "Fusion Center": ("Law Enforcement", "Security"), "Juvenile": ("Law Enforcement", "Security"), "Security": ("Law Enforcement", "Security"), "Transit Police": ("Law Enforcement", "Security"), "Corrections": ("Law Enforcement", "Security"), "Clemis": ("Law Enforcement", "Security"), "DMV": ("Government", "Civil Administration"), "Emergency Services": ("Government", "Civil Administration"), "Parks": ("Government", "Civil Administration"), "State Agency": ("Government", "Civil Administration"), "Transit": ("Government", "Civil Administration"), "Medical Examiner": ("Local Government", "Health"), "School District": ("Local Government", "Education"), "State-Local Partnership": ("State-Local Partnership", "Security"), } def mapInstitution(entry): # aggregate agencies as institutions from entries global args type, sector = institution_type_sector[entry['Type of LEA']] info = { 'title': entry['Agency'], '@type': 'Institution', 'properties': { "Institution Type": type, "Institution Sector": sector, 'City': mapCity(entry)['title'], }, "additionalProperties": { "Type of Juris": entry['Type of Juris'], "URL": [ "https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'], entry['Link 1'], entry['Link 2'], entry['Link 3'], ], 'CiteRef': args.citeref, }, "body": [entry['Type of LEA']], } if entry['Agency'] in institutions: logger.warning(f'Ignore duplicate {entry["Agency"]}') else: institutions[entry['Agency']] = info def mapDeveloperInstitution(title): if title in institutions: return institutions[title] = { 'title': title, '@type': 'Institution', 'properties': { }, "additionalProperties": { "Needs content": "Yes", 'CiteRef': args.citeref, } } def mapCity(entry): title = f"{entry['City']} ({entry['State']})" if title not in cities: info = { 'title': title, '@type': 'City', 'properties': { "is in Country": "USA", }, "additionalProperties": { 'CiteRef': args.citeref, } } if not args.skip_geolocation: location_response = geocoder.geocode(f"{entry['City']}, {entry['State']}, USA") sleep(1) # free tier of location geocode requires 1 sec delay if location_response: location = location_response.raw info["properties"]["Has Coordinates"] = f"{location['lat']}, {location['lon']}" info["body"] = [location['display_name']] else: logger.warning(f"No location data for {title} USA") cities[title] = info return cities[title] def mapAggregate(title, data): urls = [ "https://atlasofsurveillance.org/es/a/"+entry['AOSNUMBER'] for entry in data] urls.extend([entry['Link 1'] for entry in data]) urls.extend([entry['Link 2'] for entry in data]) urls.extend([entry['Link 3'] for entry in data]) urls = list(dict.fromkeys(urls)) # unique urls = list(filter(lambda url: url and len(url) > 0, urls)) for entry in data: mapInstitution(entry) return { "title": title, '@type': 'Deployments', 'properties': { "Information Certainty": "Documented", "used by": [entry['Agency'] for entry in data] }, "additionalProperties": { "URL": urls, 'CiteRef': args.citeref, } } def renderPage(data): global args page = f"{{{{{data['@type']}" for key, value in data['properties'].items(): page += f"\n|{key}=" + (', '.join(value) if isinstance(value, list) else value) page += "}}\n\n" if 'body' in data: for b in data['body']: if b and len(b): page += f"
{b} [[CiteRef::{args.citeref}]]
\n\n" if len(data['additionalProperties']): page += "=== Additional properties ===\n\n" for key, value in data['additionalProperties'].items(): if not isinstance(value, list): value = [value] for v in value: if v: page += f"* {key} [[{key}::{v}]]\n" return page def saveIfNotExists(data, page, session, token): # https://en.wikipedia.org/w/api.php?action=query&prop=info&titles=New%20York%20Yankeesdfsdf # baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json" params = { 'action': 'edit', 'createonly': '1', 'title': data['title'].strip(), 'contentformat': 'text/x-wiki', 'text': page, 'format': 'json', 'token': token, } logger.debug(args.url, params) if not args.dry_run: logger.warning(f"Creating '{data['title'].strip()}' type {data['@type']}") response = session.post(args.url, data=params) resp = response.json() if 'warnings' in resp: logger.warning(resp) logger.debug(resp) else: logger.warning(f"'{data['title'].strip()}' type {data['@type']}") def getEditToken(session): params = { 'action': "query", 'meta': "tokens", 'type': "csrf", 'format': "json" } R = session.get(args.url, params=params) DATA = R.json() logger.debug(DATA) return DATA['query']['tokens']['csrftoken'] if __name__ == "__main__": logger.setLevel(logging.DEBUG) session = get_session() token = getEditToken(session) parsedData=[] with open(args.csv, newline='') as csvfile: csvreader = csv.DictReader(csvfile, delimiter=',') for row in tqdm.tqdm(csvreader): data = mapEntry(row) if data is None: continue parsedData.append(data) parsedData.extend([mapAggregate(title, a) for title, a in aggregates.items()]) parsedData.extend(cities.values()) parsedData.extend(technologies.values()) parsedData.extend(institutions.values()) # print(parsedData) for i, data in enumerate(parsedData): page = renderPage(data) # if data['@type'] == 'City': #only for city as to update coordinates saveIfNotExists(data, page, session, token) # if i > 5: # break print(f"total: {len(parsedData)} items (of which {len(institutions)} institutions, {len(cities)} cities, {len(technologies)} products)") print (len(parsedData) - len(institutions) - len(cities) - len(technologies), "deployments" ) # # Title: vendor/unknown # Postprocessing: make sure unknown are numbered, and multiple related deployments individual unknowns are created.