244 lines
7.8 KiB
Python
244 lines
7.8 KiB
Python
import urllib.request, json
|
|
import logging
|
|
import requests
|
|
import argparse
|
|
import datetime
|
|
import tqdm
|
|
|
|
|
|
|
|
logger = logging.getLogger('wiki')
|
|
|
|
default_categories = [
|
|
'Person',
|
|
'Institution',
|
|
'Technology',
|
|
'Deployments',
|
|
'Dataset',
|
|
# 'City',
|
|
# 'Country',
|
|
]
|
|
|
|
parser = argparse.ArgumentParser(description='Turn wiki into nodes & links, usable by d3-force.')
|
|
parser.add_argument('--categories', metavar='categories', default=default_categories, nargs='+',
|
|
help='Categories')
|
|
parser.add_argument('--url', default="https://www.securityvision.io/wiki/api.php",
|
|
help='Wiki API URL')
|
|
parser.add_argument('--output', default="semantic_data.json",
|
|
help='Output JSON file')
|
|
parser.add_argument('--credentials', default="no_credentials.json",
|
|
help="JSON file containing the Bot's credentials")
|
|
|
|
args = parser.parse_args()
|
|
|
|
with open(args.credentials) as fp:
|
|
credentials = json.load(fp)
|
|
username = credentials['user']
|
|
password = credentials['password']
|
|
|
|
|
|
def get_session():
|
|
S = requests.Session()
|
|
|
|
URL = args.url
|
|
|
|
# Retrieve login token first
|
|
PARAMS_0 = {
|
|
'action':"query",
|
|
'meta':"tokens",
|
|
'type':"login",
|
|
'format':"json"
|
|
}
|
|
|
|
R = S.get(url=URL, params=PARAMS_0)
|
|
DATA = R.json()
|
|
logger.debug(DATA)
|
|
LOGIN_TOKEN = DATA['query']['tokens']['logintoken']
|
|
|
|
logger.debug(LOGIN_TOKEN)
|
|
|
|
# Send a post request to login. Using the main account for login is not
|
|
# supported. Obtain credentials via Special:BotPasswords
|
|
# (https://www.mediawiki.org/wiki/Special:BotPasswords) for lgname & lgpassword
|
|
|
|
PARAMS_1 = {
|
|
'action':"login",
|
|
'lgname':username,
|
|
'lgpassword': password,
|
|
'lgtoken':LOGIN_TOKEN,
|
|
'format':"json"
|
|
}
|
|
|
|
R = S.post(URL, data=PARAMS_1)
|
|
DATA = R.json()
|
|
|
|
logger.debug(DATA)
|
|
if DATA['login']['result'] != 'Success':
|
|
raise Exception("Failed logging in")
|
|
|
|
return S
|
|
|
|
def getPagesForCategory(category, session):
|
|
logging.info(f"Get pages in category: {category}")
|
|
pages = []
|
|
|
|
baseurl = f"{args.url}?action=query&list=categorymembers&cmtitle=Category:{category}&format=json"
|
|
params = {
|
|
'action': 'query',
|
|
'list': 'categorymembers',
|
|
'cmtitle': f'Category:{category}',
|
|
'format': 'json'
|
|
}
|
|
while True:
|
|
logger.debug(args.url, params)
|
|
response = session.post(args.url, data=params)
|
|
data = response.json()
|
|
try:
|
|
logger.debug(f"Fetched {len(data['query']['categorymembers'])} of category {category}")
|
|
pages.extend(data['query']['categorymembers'])
|
|
except Exception as e:
|
|
logger.error(data)
|
|
raise e
|
|
if 'continue' not in data:
|
|
break
|
|
params['cmcontinue'] = data['continue']['cmcontinue']
|
|
|
|
return pages
|
|
|
|
def getPropertiesForPages(pages, session, collection):
|
|
for page in tqdm.tqdm(pages):
|
|
links = getPropertiesForPage(page, session, collection)
|
|
|
|
def getPropertiesForPage(page, session, collection):
|
|
links = []
|
|
params = {
|
|
'action': 'smwbrowse',
|
|
'browse': 'subject',
|
|
'format': 'json',
|
|
'params': json.dumps({
|
|
'subject': page['title'],
|
|
'ns': page['ns'],
|
|
"iw": ""
|
|
})
|
|
}
|
|
response = session.post(args.url, data=params)
|
|
data = response.json()
|
|
|
|
# subject:
|
|
# data:
|
|
# sobj: subobjects
|
|
# print(data['query']['data'])
|
|
# Types:
|
|
# - 2: Text/String
|
|
# - 6: Date
|
|
# - 9: Page
|
|
|
|
subjectId = data['query']['subject']
|
|
for rel in data['query']['data']:
|
|
addToCollection(subjectId, rel, collection)
|
|
|
|
if 'sobj' not in data['query']:
|
|
return
|
|
|
|
for sub_obj in data['query']['sobj']:
|
|
subSubjectId = sub_obj['subject']
|
|
for rel in sub_obj['data']:
|
|
addToCollection(subSubjectId, rel, collection, subjectId)
|
|
|
|
|
|
def addToCollection(subjectId, rel, collection, isSubObjOf = None):
|
|
if rel['property'] in ["_SKEY", "_MDAT", "_ASKDE", "_ASKSI"]:
|
|
logger.debug(f"Skipping {rel['property']} for {subjectId}")
|
|
return
|
|
|
|
if subjectId not in collection['nodes']:
|
|
collection['nodes'][subjectId] = getObjForSubject(subjectId)
|
|
|
|
if isSubObjOf:
|
|
collection['nodes'][subjectId]['parent'] = isSubObjOf
|
|
|
|
for data in rel['dataitem']:
|
|
addDataitemToCollection(subjectId, rel['property'], data, collection)
|
|
|
|
def addDataitemToCollection(subjectId, prop, data, collection):
|
|
# 2: Number (float or int) - keep string
|
|
# 2: string - keep string
|
|
# 5: url - keep string
|
|
# 6: date(time) : various resolutions 1/2021/3/1/21/54/54/0 or 1/2020
|
|
if data['type'] == 1 or data['type'] == 2 or data['type'] == 5 or data['type'] == 6:
|
|
if prop not in collection['nodes'][subjectId]:
|
|
collection['nodes'][subjectId][prop] = []
|
|
value = data['item']
|
|
if data['type'] == 6:
|
|
parts = value.split("/")
|
|
if parts[0] == "2":
|
|
logger.warning(f"Date string seems to be Julian Calendar, not supported but ignored for '{subjectId}'? {parts}")
|
|
elif parts[0] != "1":
|
|
logger.error(f"Date seems invallid for '{subjectId}'? {parts}")
|
|
del parts[0]
|
|
value = "/".join(parts)
|
|
# parts = [int(p) for p in parts]
|
|
# value = datetime.datetime(*parts).isoformat()
|
|
|
|
collection['nodes'][subjectId][prop].append(value)
|
|
# page (thus: a link/relationship)
|
|
elif data['type'] == 9:
|
|
if prop == '_INST':
|
|
# Category shouldn't be mapped as link for us
|
|
if prop not in collection['nodes'][subjectId]:
|
|
collection['nodes'][subjectId][prop] = []
|
|
collection['nodes'][subjectId][prop].append(data['item'])
|
|
elif prop in ['_ERRC', '_ERRP', '_ERRC']:
|
|
logger.warning(f"Error on page {subjectId}: {data}")
|
|
if prop not in collection['nodes'][subjectId]:
|
|
collection['nodes'][subjectId][prop] = []
|
|
collection['nodes'][subjectId][prop].append(json.dumps(data))
|
|
else:
|
|
if data['item'] not in collection['nodes']:
|
|
collection['nodes'][data['item']] = getObjForSubject(data['item'])
|
|
collection['links'].append({
|
|
'source': subjectId,
|
|
'target': data['item'],
|
|
'name': prop
|
|
})
|
|
else:
|
|
logger.error(f"Unknown type: {data['type']}: {prop} : {data}")
|
|
|
|
def getObjForSubject(sub):
|
|
obj = {
|
|
'@id': sub,
|
|
}
|
|
|
|
return obj
|
|
|
|
if __name__ == "__main__":
|
|
logger.setLevel(logging.INFO)
|
|
session = get_session()
|
|
collection = {'nodes': {}, 'links': []}
|
|
|
|
for category in args.categories:
|
|
logger.info(f"Fetch pages for category '{category}'")
|
|
pages = getPagesForCategory(category, session)
|
|
logger.info(f"Pages in category '{category}': {len(pages)}")
|
|
getPropertiesForPages(pages, session, collection)
|
|
|
|
custompage = {
|
|
'title': 'Resources',
|
|
'ns': 0
|
|
}
|
|
getPropertiesForPages([custompage], session, collection)
|
|
|
|
|
|
# [{'property': 'Based_in', 'dataitem': [{'type': 9, 'item': 'Berlin#0##'}]}, {'property': 'WikidataID', 'dataitem': [{'type': 2, 'item': 'Q57168389'}]}, {'property': '_INST', 'dataitem': [{'type': 9, 'item': 'Person#14##'}]}, {'property': '_MDAT', 'dataitem': [{'type': 6, 'item': '1/2021/3/1/21/13/7/0'}]}, {'property': '_SKEY', 'dataitem': [{'type': 2, 'item': 'Adam Harvey'}]}]
|
|
|
|
|
|
logger.info(f"Nodes: {len(collection['nodes'])} Links: {len(collection['links'])}")
|
|
|
|
|
|
# convert to list
|
|
collection['nodes'] = list(collection['nodes'].values())
|
|
|
|
logger.info(f"Write to {args.output}")
|
|
with open(args.output, 'w') as fp:
|
|
json.dump(collection, fp)
|
|
|