import urllib.request, json import logging import requests import argparse import datetime import tqdm logger = logging.getLogger('wiki') default_categories = [ # 'Person', 'Institution', 'Technology', 'Deployments', 'Dataset', 'City', # 'Country',# for deployments without city we should configure Geolocation ] parser = argparse.ArgumentParser(description='Turn wiki into nodes & links, usable by d3-force.') parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+', help='Categories') parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php", help='Wiki API URL') parser.add_argument('--output', default="semantic_data.json", help='Output JSON file') parser.add_argument('--credentials', default="no_credentials.json", help="JSON file containing the Bot's credentials") args = parser.parse_args() with open(args.credentials) as fp: credentials = json.load(fp) username = credentials['user'] password = credentials['password'] def get_session(): S = requests.Session() URL = args.url # Retrieve login token first PARAMS_0 = { 'action':"query", 'meta':"tokens", 'type':"login", 'format':"json" } R = S.get(url=URL, params=PARAMS_0) DATA = R.json() logger.debug(DATA) LOGIN_TOKEN = DATA['query']['tokens']['logintoken'] logger.debug(LOGIN_TOKEN) # Send a post request to login. Using the main account for login is not # supported. Obtain credentials via Special:BotPasswords # (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword PARAMS_1 = { 'action':"login", 'lgname':username, 'lgpassword': password, 'lgtoken':LOGIN_TOKEN, 'format':"json" } R = S.post(URL, data=PARAMS_1) DATA = R.json() logger.debug(DATA) if DATA['login']['result'] != 'Success': raise Exception("Failed logging in") return S def getPagesForCategory(category, session): logging.info(f"Get pages in category: {category}") pages = [] baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json" params = { 'action': 'query', 'list': 'categorymembers', 'cmtitle': f'Category:{category}', 'format': 'json' } while True: logger.debug(args.url, params) response = session.post(args.url, data=params) data = response.json() try: logger.debug(f"Fetched {len(data['query']['categorymembers'])} of category {category}") pages.extend(data['query']['categorymembers']) except Exception as e: logger.error(data) raise e if 'continue' not in data: break params['cmcontinue'] = data['continue']['cmcontinue'] return pages def getPropertiesForPages(pages, session, collection): for page in tqdm.tqdm(pages): links = getPropertiesForPage(page, session, collection) def getPropertiesForPage(page, session, collection): links = [] params = { 'action': 'smwbrowse', 'browse': 'subject', 'format': 'json', 'params': json.dumps({ 'subject': page['title'], 'ns': page['ns'], "iw": "" }) } response = session.post(args.url, data=params) data = response.json() # subject: # data: # sobj: subobjects # print(data['query']['data']) # Types: # - 2: Text/String # - 6: Date # - 9: Page subjectId = data['query']['subject'] for rel in data['query']['data']: addToCollection(subjectId, rel, collection) if 'sobj' not in data['query']: return for sub_obj in data['query']['sobj']: subSubjectId = sub_obj['subject'] if '#0##_QUERY' in subSubjectId: logger.info(f"Skip query subobj {subSubjectId}") continue if '#0##_ERR' in subSubjectId: logger.info(f"Skip error subobj {subSubjectId}") continue for rel in sub_obj['data']: addToCollection(subSubjectId, rel, collection, subjectId) def addToCollection(subjectId, rel, collection, isSubObjOf = None): if rel['property'] in ["_SKEY", "_MDAT", "_ASKDE", "_ASKSI"]: logger.debug(f"Skipping {rel['property']} for {subjectId}") return if subjectId not in collection['nodes']: collection['nodes'][subjectId] = getObjForSubject(subjectId) if isSubObjOf: collection['nodes'][subjectId]['parent'] = isSubObjOf for data in rel['dataitem']: addDataitemToCollection(subjectId, rel['property'], data, collection) def addDataitemToCollection(subjectId, prop, data, collection): # 2: Number (float or int) - keep string # 2: string - keep string # 5: url - keep string # 6: date(time) : various resolutions 1/2021/3/1/21/54/54/0 or 1/2020 if data['type'] == 1 or data['type'] == 2 or data['type'] == 5 or data['type'] == 6: if prop not in collection['nodes'][subjectId]: collection['nodes'][subjectId][prop] = [] value = data['item'] if data['type'] == 6: parts = value.split("/") if parts[0] == "2": logger.warning(f"Date string seems to be Julian Calendar, not supported but ignored for '{subjectId}'? {parts}") elif parts[0] != "1": logger.error(f"Date seems invallid for '{subjectId}'? {parts}") del parts[0] value = "/".join(parts) # parts = [int(p) for p in parts] # value = datetime.datetime(*parts).isoformat() collection['nodes'][subjectId][prop].append(value) # page (thus: a link/relationship) elif data['type'] == 9: if prop == '_INST': # Category shouldn't be mapped as link for us if prop not in collection['nodes'][subjectId]: collection['nodes'][subjectId][prop] = [] collection['nodes'][subjectId][prop].append(data['item']) elif prop in ['_ERRC', '_ERRP', '_ERRC']: logger.warning(f"Error on page {subjectId}: {data}") if prop not in collection['nodes'][subjectId]: collection['nodes'][subjectId][prop] = [] collection['nodes'][subjectId][prop].append(json.dumps(data)) else: # TODO: map as properties on link! if '#0##_QUERY' in data['item']: logger.warning(f"Skip query for {subjectId}: {data}") else: if data['item'] not in collection['nodes']: collection['nodes'][data['item']] = getObjForSubject(data['item']) collection['links'].append({ 'source': subjectId, 'target': data['item'], 'name': prop }) elif data['type'] == 7: # Geolocation lat, lon = data['item'].split(',') collection['nodes'][subjectId]['lat'] = lat collection['nodes'][subjectId]['lon'] = lon else: logger.error(f"Unknown type: {data['type']}: {prop} : {data}") def getObjForSubject(sub): obj = { '@id': sub, } return obj if __name__ == "__main__": logger.setLevel(logging.INFO) session = get_session() collection = {'nodes': {}, 'links': []} for category in args.categories: logger.info(f"Fetch pages for category '{category}'") pages = getPagesForCategory(category, session) logger.info(f"Pages in category '{category}': {len(pages)}") getPropertiesForPages(pages, session, collection) custompage = { 'title': 'Resources', 'ns': 0 } getPropertiesForPages([custompage], session, collection) # [{'property': 'Based_in', 'dataitem': [{'type': 9, 'item': 'Berlin#0##'}]}, {'property': 'WikidataID', 'dataitem': [{'type': 2, 'item': 'Q57168389'}]}, {'property': '_INST', 'dataitem': [{'type': 9, 'item': 'Person#14##'}]}, {'property': '_MDAT', 'dataitem': [{'type': 6, 'item': '1/2021/3/1/21/13/7/0'}]}, {'property': '_SKEY', 'dataitem': [{'type': 2, 'item': 'Adam Harvey'}]}] logger.info(f"Nodes: {len(collection['nodes'])} Links: {len(collection['links'])}") # convert to list collection['nodes'] = list(collection['nodes'].values()) logger.info(f"Write to {args.output}") with open(args.output, 'w') as fp: json.dump(collection, fp)