semantic_graph/wiki_relations.py

277 lines
9.3 KiB
Python

import urllib.request, json
import logging
import requests
import argparse
import datetime
import tqdm
import csv
logger = logging.getLogger('wiki')
default_categories = [
# 'Person',
'Institution',
'Products',
'Deployments',
'Dataset',
'City',
#'Country',# for deployments without city we should configure Geolocation
'Technology Type',
]
parser = argparse.ArgumentParser(description='Turn wiki into nodes & links, usable by d3-force.')
parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+',
help='Categories')
parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php",
help='Wiki API URL')
parser.add_argument('--output', default="semantic_data.json",
help='Output JSON file')
parser.add_argument('--credentials', default="no_credentials.json",
help="JSON file containing the Bot's credentials")
parser.add_argument('--generate-csv', action='store_true',
help="generate edge.csv & nodes.csv")
args = parser.parse_args()
with open(args.credentials) as fp:
credentials = json.load(fp)
username = credentials['user']
password = credentials['password']
def get_session():
S = requests.Session()
URL = args.url
# Retrieve login token first
PARAMS_0 = {
'action':"query",
'meta':"tokens",
'type':"login",
'format':"json"
}
R = S.get(url=URL, params=PARAMS_0)
DATA = R.json()
logger.debug(DATA)
LOGIN_TOKEN = DATA['query']['tokens']['logintoken']
logger.debug(LOGIN_TOKEN)
# Send a post request to login. Using the main account for login is not
# supported. Obtain credentials via Special:BotPasswords
# (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword
PARAMS_1 = {
'action':"login",
'lgname':username,
'lgpassword': password,
'lgtoken':LOGIN_TOKEN,
'format':"json"
}
R = S.post(URL, data=PARAMS_1)
DATA = R.json()
logger.debug(DATA)
if DATA['login']['result'] != 'Success':
raise Exception("Failed logging in")
return S
def getPagesForCategory(category, session):
logging.info(f"Get pages in category: {category}")
pages = []
baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json"
params = {
'action': 'query',
'list': 'categorymembers',
'cmtitle': f'Category:{category}',
'format': 'json'
}
while True:
logger.debug(args.url, params)
response = session.post(args.url, data=params)
data = response.json()
try:
logger.debug(f"Fetched {len(data['query']['categorymembers'])} of category {category}")
pages.extend(data['query']['categorymembers'])
except Exception as e:
logger.error(data)
raise e
if 'continue' not in data:
break
params['cmcontinue'] = data['continue']['cmcontinue']
return pages
def getPropertiesForPages(pages, session, collection):
for page in tqdm.tqdm(pages):
links = getPropertiesForPage(page, session, collection)
def getPropertiesForPage(page, session, collection):
links = []
params = {
'action': 'smwbrowse',
'browse': 'subject',
'format': 'json',
'params': json.dumps({
'subject': page['title'],
'ns': page['ns'],
"iw": ""
})
}
response = session.post(args.url, data=params)
data = response.json()
# subject:
# data:
# sobj: subobjects
# print(data['query']['data'])
# Types:
# - 2: Text/String
# - 6: Date
# - 9: Page
subjectId = data['query']['subject']
for rel in data['query']['data']:
addToCollection(subjectId, rel, collection)
if 'sobj' not in data['query']:
return
for sub_obj in data['query']['sobj']:
subSubjectId = sub_obj['subject']
if '#0##_QUERY' in subSubjectId:
logger.info(f"Skip query subobj {subSubjectId}")
continue
if '#0##_ERR' in subSubjectId:
logger.info(f"Skip error subobj {subSubjectId}")
continue
for rel in sub_obj['data']:
addToCollection(subSubjectId, rel, collection, subjectId)
def addToCollection(subjectId, rel, collection, isSubObjOf = None):
if rel['property'] in ["_SKEY", "_MDAT", "_ASKDE", "_ASKSI"]:
logger.debug(f"Skipping {rel['property']} for {subjectId}")
return
if subjectId not in collection['nodes']:
collection['nodes'][subjectId] = getObjForSubject(subjectId)
if isSubObjOf:
collection['nodes'][subjectId]['parent'] = isSubObjOf
for data in rel['dataitem']:
addDataitemToCollection(subjectId, rel['property'], data, collection)
def addDataitemToCollection(subjectId, prop, data, collection):
# 2: Number (float or int) - keep string
# 2: string - keep string
# 5: url - keep string
# 6: date(time) : various resolutions 1/2021/3/1/21/54/54/0 or 1/2020
if data['type'] == 1 or data['type'] == 2 or data['type'] == 5 or data['type'] == 6:
if prop not in collection['nodes'][subjectId]:
collection['nodes'][subjectId][prop] = []
value = data['item']
if data['type'] == 6:
parts = value.split("/")
if parts[0] == "2":
logger.warning(f"Date string seems to be Julian Calendar, not supported but ignored for '{subjectId}'? {parts}")
elif parts[0] != "1":
logger.error(f"Date seems invallid for '{subjectId}'? {parts}")
del parts[0]
value = "/".join(parts)
# parts = [int(p) for p in parts]
# value = datetime.datetime(*parts).isoformat()
collection['nodes'][subjectId][prop].append(value)
# page (thus: a link/relationship)
elif data['type'] == 9:
if prop == '_INST':
# Category shouldn't be mapped as link for us
if prop not in collection['nodes'][subjectId]:
collection['nodes'][subjectId][prop] = []
collection['nodes'][subjectId][prop].append(data['item'])
elif prop in ['_ERRC', '_ERRP', '_ERRC']:
logger.warning(f"Error on page {subjectId}: {data}")
if prop not in collection['nodes'][subjectId]:
collection['nodes'][subjectId][prop] = []
collection['nodes'][subjectId][prop].append(json.dumps(data))
else:
# TODO: map as properties on link!
if '#0##_QUERY' in data['item']:
logger.warning(f"Skip query for {subjectId}: {data}")
else:
if data['item'] not in collection['nodes']:
collection['nodes'][data['item']] = getObjForSubject(data['item'])
collection['links'].append({
'source': subjectId,
'target': data['item'],
'name': prop
})
elif data['type'] == 7:
# Geolocation
lat, lon = data['item'].split(',')
collection['nodes'][subjectId]['lat'] = lat
collection['nodes'][subjectId]['lon'] = lon
else:
logger.error(f"Unknown type: {data['type']}: {prop} : {data}")
def getObjForSubject(sub):
obj = {
'@id': sub,
}
return obj
if __name__ == "__main__":
logger.setLevel(logging.INFO)
session = get_session()
collection = {'nodes': {}, 'links': []}
for category in args.categories:
logger.info(f"Fetch pages for category '{category}'")
pages = getPagesForCategory(category, session)
logger.info(f"Pages in category '{category}': {len(pages)}")
getPropertiesForPages(pages, session, collection)
custompage = {
'title': 'Resources',
'ns': 0
}
getPropertiesForPages([custompage], session, collection)
# [{'property': 'Based_in', 'dataitem': [{'type': 9, 'item': 'Berlin#0##'}]}, {'property': 'WikidataID', 'dataitem': [{'type': 2, 'item': 'Q57168389'}]}, {'property': '_INST', 'dataitem': [{'type': 9, 'item': 'Person#14##'}]}, {'property': '_MDAT', 'dataitem': [{'type': 6, 'item': '1/2021/3/1/21/13/7/0'}]}, {'property': '_SKEY', 'dataitem': [{'type': 2, 'item': 'Adam Harvey'}]}]
logger.info(f"Nodes: {len(collection['nodes'])} Links: {len(collection['links'])}")
# convert to list
collection['nodes'] = list(collection['nodes'].values())
logger.info(f"Write to {args.output}")
with open(args.output, 'w') as fp:
json.dump(collection, fp)
if args.generate_csv:
with open('nodes.csv', 'w') as csvfile:
all_keys = set().union(*(d.keys() for d in collection['nodes']))
# all_keys = ['@id']
dict_writer = csv.DictWriter(csvfile, fieldnames=all_keys, extrasaction='ignore', restval='')
dict_writer.writeheader()
dict_writer.writerows(collection['nodes'])
with open('edges.csv', 'w') as csvfile:
all_keys = set().union(*(d.keys() for d in collection['links']))
dict_writer = csv.DictWriter(csvfile, fieldnames=all_keys, extrasaction='ignore', restval='')
dict_writer.writeheader()
dict_writer.writerows(collection['links'])