import os import boto3 import xmltodict import glob import logging import json import datetime import yaml logger = logging.getLogger("exhausting_mturk").getChild("api") # To avoid "Object of type datetime is not JSON serializable" class DateTimeEncoder(json.JSONEncoder): def default(self, z): if isinstance(z, datetime.datetime): return (str(z)) else: return super().default(z) class Connection(): def __init__(self, config_file, for_real = False): with open(config_file, 'r') as fp: self.config = yaml.safe_load(fp) self.config['for_real'] = for_real self.frontend_url = "https://worker.mturk.com" if for_real else "https://workersandbox.mturk.com" # M-turk connection MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com' # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client self.mturk = boto3.client('mturk', aws_access_key_id = self.config['amazon']['user_id'], aws_secret_access_key = self.config['amazon']['user_secret'], region_name='us-east-1', endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX ) def load_new_submissions(self): hit_files = glob.glob('batches/hits/*.json') for hit_file in hit_files: with open(hit_file, 'r') as fp: hit = json.load(fp) if hit['HIT']['HITStatus'] == 'Assignable' or hit['HIT']['HITStatus'] == 'Unassignable': logger.info(f"Fetch more info for hit {hit['HIT']['HITId']}") new_hit = self.mturk.get_hit(HITId=hit['HIT']['HITId']) if hit['HIT']['HITStatus'] == new_hit['HIT']['HITStatus']: logger.info(f"Status kept for {hit['HIT']['HITId']}") continue hit = new_hit if hit['HIT']['HITStatus'] == 'Reviewable': # fetch results logger.debug(f"Fetch results for {hit['HIT']['HITId']}") worker_results = self.mturk.list_assignments_for_hit( HITId=hit['HIT']['HITId'], AssignmentStatuses=['Submitted'] ) for assignment in worker_results['Assignments']: logger.info(f"{assignment=}") answer_filename = f"batches/answers/{hit['HIT']['HITId']}_{assignment['AssignmentId']}.json" xml_doc = xmltodict.parse(assignment['Answer']) if not isinstance(xml_doc['QuestionFormAnswers']['Answer'],list): answers = [xml_doc['QuestionFormAnswers']['Answer']] else: answers = xml_doc['QuestionFormAnswers']['Answer'] print('Input from worker:') assignment['answers'] = {} # Multiple fields in HIT layout for answer_field in answers: # logger.debug(f"Fetch results for {hit['HIT']['HITId']}") print (f"{answer_field['QuestionIdentifier']}: {answer_field['FreeText']}") #store the dict/json object assignment['answers'][answer_field['QuestionIdentifier']] = answer_field['FreeText'] # answer_filename if not os.path.exists(answer_filename): with open(answer_filename, 'w') as fp: logger.debug(f"Save {answer_filename}") json.dump(assignment, fp, cls=DateTimeEncoder, indent=4) if not confirm("Accept input of worker (no = reject!!)", True) and confirm("Are you sure you want to reject this user?! (reason in next step)", False): # reject reason = input("Reason for rejection (no newlines)") response = self.mturk.reject_assignment( AssignmentId=assignment['AssignmentId'], RequesterFeedback=reason ) else: response = self.mturk.approve_assignment( AssignmentId=assignment['AssignmentId'] # RequesterFeedback=reason ) # save with new status after all processing is succesfull with open(hit_file, 'w') as fp: json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4) # print(hits) def publish_hit(self, batch) -> str: xml = batch_to_xml(batch) new_hit = self.mturk.create_hit( Title = batch['title'], Description = batch['summary'], Keywords = batch['keywords'], Reward = str(batch['reward']), MaxAssignments = 1, LifetimeInSeconds = 172800, AssignmentDurationInSeconds = 600, AutoApprovalDelayInSeconds = 14400, Question = xml, ) # logger.info("HIT created") logger.info(f"HIT created. Preview: {self.frontend_url}/mturk/preview?groupId={new_hit['HIT']['HITGroupId']}") logger.debug(f"{new_hit=}") hit_file = f"batches/hits/{new_hit['HIT']['HITId']}.json" with open(hit_file, 'w') as fp: json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4) logger.info(f"wrote to {hit_file}") #TODO save hit id to batch!! append_link(batch, new_hit) # new_hit['HIT']['HITId'] def get_batch_files(directory) -> list: files = glob.glob(os.path.join(directory, "*.json")) return files def open_batch(batch_file) -> dict: with open(batch_file, 'r') as fp: batch = json.load(fp) batch['file'] = batch_file return batch def append_link(batch, hit): dirname = os.path.dirname(batch['file']) logfile = os.path.join(dirname, 'batch_hits.json') if os.path.exists(logfile): with open(logfile, 'r') as fp: links = json.load(fp) else: links = {} if batch['file'] not in links: links[batch['file']] = [] links[batch['file']].append(hit['HIT']['HITId']) with open(logfile, 'w') as fp: logger.info(f"wrote {logfile=}") json.dump(links, fp, indent=4) def batch_to_xml(batch): logger.debug(f"To xml {batch=}") xml = f""" {parse_string(batch['summary'])} {parse_string(batch['instructions'])}
{parse_string(batch['instructions'])}

""" for i, q in enumerate(batch['questions']): xml += f"""
{parse_string(q['question'])}
""" xml += """
]]>
0
""" return xml def parse_string(string_or_array) -> str: if not isinstance(string_or_array, list): string_or_array = [string_or_array] return "\n".join([f"

{s}

" for s in string_or_array]) # By Raghuram Devarakonda on https://code.activestate.com/recipes/541096-prompt-the-user-for-confirmation/ def confirm(prompt=None, resp=False): """prompts for yes or no response from the user. Returns True for yes and False for no. 'resp' should be set to the default value assumed by the caller when user simply types ENTER. >>> confirm(prompt='Create Directory?', resp=True) Create Directory? [y]|n: True >>> confirm(prompt='Create Directory?', resp=False) Create Directory? [n]|y: False >>> confirm(prompt='Create Directory?', resp=False) Create Directory? [n]|y: y True """ if prompt is None: prompt = 'Confirm' if resp: prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n') else: prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y') while True: ans = input(prompt) if not ans: return resp if ans not in ['y', 'Y', 'n', 'N']: print ('please enter y or n.') continue if ans == 'y' or ans == 'Y': return True if ans == 'n' or ans == 'N': return False