exhausting_mturk/exhausting_mturk/api.py

256 lines
9.4 KiB
Python

import os
import boto3
import xmltodict
import glob
import logging
import json
import datetime
import yaml
logger = logging.getLogger("exhausting_mturk").getChild("api")
# To avoid "Object of type datetime is not JSON serializable"
class DateTimeEncoder(json.JSONEncoder):
def default(self, z):
if isinstance(z, datetime.datetime):
return (str(z))
else:
return super().default(z)
class Connection():
def __init__(self, config_file, for_real = False):
with open(config_file, 'r') as fp:
self.config = yaml.safe_load(fp)
self.config['for_real'] = for_real
self.frontend_url = "https://worker.mturk.com" if for_real else "https://workersandbox.mturk.com"
# M-turk connection
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com'
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
self.mturk = boto3.client('mturk',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name='us-east-1',
endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX
)
def load_new_submissions(self):
hit_files = glob.glob('batches/hits/*.json')
for hit_file in hit_files:
with open(hit_file, 'r') as fp:
hit = json.load(fp)
if hit['HIT']['HITStatus'] == 'Assignable' or hit['HIT']['HITStatus'] == 'Unassignable':
logger.info(f"Fetch more info for hit {hit['HIT']['HITId']}")
new_hit = self.mturk.get_hit(HITId=hit['HIT']['HITId'])
if hit['HIT']['HITStatus'] == new_hit['HIT']['HITStatus']:
logger.info(f"Status kept for {hit['HIT']['HITId']}")
continue
hit = new_hit
if hit['HIT']['HITStatus'] == 'Reviewable':
# fetch results
logger.debug(f"Fetch results for {hit['HIT']['HITId']}")
worker_results = self.mturk.list_assignments_for_hit(
HITId=hit['HIT']['HITId'],
AssignmentStatuses=['Submitted']
)
for assignment in worker_results['Assignments']:
logger.info(f"{assignment=}")
answer_filename = f"batches/answers/{hit['HIT']['HITId']}_{assignment['AssignmentId']}.json"
xml_doc = xmltodict.parse(assignment['Answer'])
if not isinstance(xml_doc['QuestionFormAnswers']['Answer'],list):
answers = [xml_doc['QuestionFormAnswers']['Answer']]
else:
answers = xml_doc['QuestionFormAnswers']['Answer']
print('Input from worker:')
assignment['answers'] = {}
# Multiple fields in HIT layout
for answer_field in answers:
# logger.debug(f"Fetch results for {hit['HIT']['HITId']}")
print (f"{answer_field['QuestionIdentifier']}: {answer_field['FreeText']}")
#store the dict/json object
assignment['answers'][answer_field['QuestionIdentifier']] = answer_field['FreeText']
# answer_filename
if not os.path.exists(answer_filename):
with open(answer_filename, 'w') as fp:
logger.debug(f"Save {answer_filename}")
json.dump(assignment, fp, cls=DateTimeEncoder, indent=4)
if not confirm("Accept input of worker (no = reject!!)", True) and confirm("Are you sure you want to reject this user?! (reason in next step)", False):
# reject
reason = input("Reason for rejection (no newlines)")
response = self.mturk.reject_assignment(
AssignmentId=assignment['AssignmentId'],
RequesterFeedback=reason
)
else:
response = self.mturk.approve_assignment(
AssignmentId=assignment['AssignmentId']
# RequesterFeedback=reason
)
# save with new status after all processing is succesfull
with open(hit_file, 'w') as fp:
json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4)
# print(hits)
def publish_hit(self, batch) -> str:
xml = batch_to_xml(batch)
new_hit = self.mturk.create_hit(
Title = batch['title'],
Description = batch['summary'],
Keywords = batch['keywords'],
Reward = str(batch['reward']),
MaxAssignments = 1,
LifetimeInSeconds = 172800,
AssignmentDurationInSeconds = 600,
AutoApprovalDelayInSeconds = 14400,
Question = xml,
)
# logger.info("HIT created")
logger.info(f"HIT created. Preview: {self.frontend_url}/mturk/preview?groupId={new_hit['HIT']['HITGroupId']}")
logger.debug(f"{new_hit=}")
hit_file = f"batches/hits/{new_hit['HIT']['HITId']}.json"
with open(hit_file, 'w') as fp:
json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4)
logger.info(f"wrote to {hit_file}")
#TODO save hit id to batch!!
append_link(batch, new_hit)
# new_hit['HIT']['HITId']
def get_batch_files(directory) -> list:
files = glob.glob(os.path.join(directory, "*.json"))
return files
def open_batch(batch_file) -> dict:
with open(batch_file, 'r') as fp:
batch = json.load(fp)
batch['file'] = batch_file
return batch
def append_link(batch, hit):
dirname = os.path.dirname(batch['file'])
logfile = os.path.join(dirname, 'batch_hits.json')
if os.path.exists(logfile):
with open(logfile, 'r') as fp:
links = json.load(fp)
else:
links = {}
if batch['file'] not in links:
links[batch['file']] = []
links[batch['file']].append(hit['HIT']['HITId'])
with open(logfile, 'w') as fp:
logger.info(f"wrote {logfile=}")
json.dump(links, fp, indent=4)
def batch_to_xml(batch):
logger.debug(f"To xml {batch=}")
xml = f"""
<HTMLQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd">
<HTMLContent><![CDATA[
<!DOCTYPE html>
<body>
<!-- You must include this JavaScript file -->
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<!-- For the full list of available Crowd HTML Elements and their input/output documentation,
please refer to https://docs.aws.amazon.com/sagemaker/latest/dg/sms-ui-template-reference.html -->
<!-- You must include crowd-form so that your task submits answers to MTurk -->
<crowd-form answer-format="flatten-objects">
<crowd-instructions link-text="View instructions" link-type="button">
<short-summary>
{parse_string(batch['summary'])}
</short-summary>
<detailed-instructions>
{parse_string(batch['instructions'])}
</detailed-instructions>
</crowd-instructions>
<div>
{parse_string(batch['instructions'])}
</div>
<hr />
"""
for i, q in enumerate(batch['questions']):
xml += f"""
<div>
{parse_string(q['question'])}
<crowd-text-area name="q{i}" rows="4" placeholder="Please write your explanation here" required></crowd-text-area>
</div>
"""
xml += """</crowd-form>
</body>
</html>
]]></HTMLContent>
<FrameHeight>0</FrameHeight>
</HTMLQuestion>
"""
return xml
def parse_string(string_or_array) -> str:
if not isinstance(string_or_array, list):
string_or_array = [string_or_array]
return "\n".join([f"<p>{s}</p>" for s in string_or_array])
# By Raghuram Devarakonda on https://code.activestate.com/recipes/541096-prompt-the-user-for-confirmation/
def confirm(prompt=None, resp=False):
"""prompts for yes or no response from the user. Returns True for yes and
False for no.
'resp' should be set to the default value assumed by the caller when
user simply types ENTER.
>>> confirm(prompt='Create Directory?', resp=True)
Create Directory? [y]|n:
True
>>> confirm(prompt='Create Directory?', resp=False)
Create Directory? [n]|y:
False
>>> confirm(prompt='Create Directory?', resp=False)
Create Directory? [n]|y: y
True
"""
if prompt is None:
prompt = 'Confirm'
if resp:
prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n')
else:
prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y')
while True:
ans = input(prompt)
if not ans:
return resp
if ans not in ['y', 'Y', 'n', 'N']:
print ('please enter y or n.')
continue
if ans == 'y' or ans == 'Y':
return True
if ans == 'n' or ans == 'N':
return False