commit c1d454f5124196c66ce0ae8d629c53e17496958f Author: Ruben van de Ven Date: Wed Jul 7 09:22:38 2021 +0200 basic functionality, creating a gazillion json files diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8cf7685 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +config.local.yml diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..2009c7d --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.9.2 diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..dc00337 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: launch", + "type": "python", + "request": "launch", + "cwd": "${workspaceFolder}", + "program": "${workspaceFolder}/exhaust.py", + "console": "integratedTerminal", + "args": ["-c", "config.local.yml", "-v"] + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..10d5e21 --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +# publish hits to mturh and fetch results + +## install + +`poetry install` + +## usage + +Create a `config.local.yml` based on `config.example.yml`. + +Create a json file in the batches directory based on example.json. + +`poetry run python exhaust.py -c config.local.yml --publish batches/batch1.json` + +then later, fetch results/submissions using + +`poetry run python exhaust.py -c config.local.yml --update` diff --git a/batches/answers/.gitignore b/batches/answers/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/batches/answers/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/batches/example.json b/batches/example.json new file mode 100644 index 0000000..f77618a --- /dev/null +++ b/batches/example.json @@ -0,0 +1,25 @@ +{ + "title": "Write 5 short texts about working on MTurk", + "summary": "Write 5 short texts where you explain your point of view on each topic.", + "instructions": [ + "Please read each topic carefully and then write a short text where you explain your point of view.", + "There is not a minimum number of words required but please write the number of words that you think is fair for 2$." + ], + "reward": 2, + "keywords": "text", + "questions": [ + { + "question": [ + "In a recent survey about working on MTurk, a fellow worker mentioned that they at one time had a \"good day\" on MTurk where they earned so much that they threw a party to celebrate.", + "Please explain if you agree that sometimes there are good days on MTurk that are reason to celebrate and why (or why not):" + ], + "in_response_to": "answer_id" + }, + { + "question": [ + "Please expain...." + ], + "in_response_to": null + } + ] +} \ No newline at end of file diff --git a/batches/hits/.gitignore b/batches/hits/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/batches/hits/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..2eab974 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,8 @@ +amazon: + user_id: ... + user_secret: "..." + mturk_sandbox: true + mturk_region: us-east-1 + sqs_endpoint_url: "https://sqs.eu-west-3.amazonaws.com/" + sqs_url: "https://sqs.eu-west-3.amazonaws.com/12345/test" + sqs_region_name: "eu-west-3" \ No newline at end of file diff --git a/exhaust.py b/exhaust.py new file mode 100644 index 0000000..70c6c17 --- /dev/null +++ b/exhaust.py @@ -0,0 +1,76 @@ +import exhausting_mturk.api +import argparse +import logging +import coloredlogs + + + +if __name__ == "__main__": + argParser = argparse.ArgumentParser( + description='Make and request hits') + argParser.add_argument( + '--batchdir', + '-d', + type=str, + default="batches", + help='directory to read and write from' + ) + argParser.add_argument( + '--config', + '-c', + required=True, + type=str, + help='The yaml config file to load' + ) + argParser.add_argument( + '--verbose', + '-v', + action='store_true', + help='Increase log level' + ) + argParser.add_argument( + '--publish', + '-p', + type=str, + help='Publish a batch to MTurk' + ) + argParser.add_argument( + '--update', + '-u', + action='store_true', + help='fetch HIT status and assignments from MTurk' + ) + argParser.add_argument( + '--for-real', + action='store_true', + help='run on live MTurk instead of sandbox' + ) + args = argParser.parse_args() + + loglevel = logging.DEBUG if args.verbose else logging.INFO + coloredlogs.install( + level=loglevel, + fmt="%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s" + ) + logger = logging.getLogger("exhausting_mturk") + + logging.getLogger('botocore').setLevel(logging.INFO) + + connection = exhausting_mturk.api.Connection(args.config) + + print(f"{connection}") + + batch_files = exhausting_mturk.api.get_batch_files(args.batchdir) + logger.info(f"{batch_files=}") + + # print(exhausting_mturk.api.batch_to_xml(batches[0])) + + if args.update: + connection.load_new_submissions() + elif args.publish: + if args.publish not in batch_files: + raise Exception(f'Not a valid batch. Use one of {batches=}') + batch = exhausting_mturk.api.open_batch(args.publish) + connection.publish_hit(batch) + + # connection.load_new_submissions() \ No newline at end of file diff --git a/exhausting_mturk/__init__.py b/exhausting_mturk/__init__.py new file mode 100644 index 0000000..b794fd4 --- /dev/null +++ b/exhausting_mturk/__init__.py @@ -0,0 +1 @@ +__version__ = '0.1.0' diff --git a/exhausting_mturk/api.py b/exhausting_mturk/api.py new file mode 100644 index 0000000..fd886f9 --- /dev/null +++ b/exhausting_mturk/api.py @@ -0,0 +1,255 @@ +import os +import boto3 +import xmltodict +import glob +import logging +import json +import datetime +import yaml + +logger = logging.getLogger("exhausting_mturk").getChild("api") + + +# To avoid "Object of type datetime is not JSON serializable" +class DateTimeEncoder(json.JSONEncoder): + def default(self, z): + if isinstance(z, datetime.datetime): + return (str(z)) + else: + return super().default(z) + +class Connection(): + def __init__(self, config_file, for_real = False): + with open(config_file, 'r') as fp: + self.config = yaml.safe_load(fp) + + self.config['for_real'] = for_real + + self.frontend_url = "https://worker.mturk.com" if for_real else "https://workersandbox.mturk.com" + + # M-turk connection + MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' + MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com' + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client + self.mturk = boto3.client('mturk', + aws_access_key_id = self.config['amazon']['user_id'], + aws_secret_access_key = self.config['amazon']['user_secret'], + region_name='us-east-1', + endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX + ) + + def load_new_submissions(self): + hit_files = glob.glob('batches/hits/*.json') + for hit_file in hit_files: + with open(hit_file, 'r') as fp: + hit = json.load(fp) + if hit['HIT']['HITStatus'] == 'Assignable' or hit['HIT']['HITStatus'] == 'Unassignable': + logger.info(f"Fetch more info for hit {hit['HIT']['HITId']}") + new_hit = self.mturk.get_hit(HITId=hit['HIT']['HITId']) + if hit['HIT']['HITStatus'] == new_hit['HIT']['HITStatus']: + logger.info(f"Status kept for {hit['HIT']['HITId']}") + continue + + hit = new_hit + + if hit['HIT']['HITStatus'] == 'Reviewable': + # fetch results + logger.debug(f"Fetch results for {hit['HIT']['HITId']}") + worker_results = self.mturk.list_assignments_for_hit( + HITId=hit['HIT']['HITId'], + AssignmentStatuses=['Submitted'] + ) + for assignment in worker_results['Assignments']: + logger.info(f"{assignment=}") + answer_filename = f"batches/answers/{hit['HIT']['HITId']}_{assignment['AssignmentId']}.json" + xml_doc = xmltodict.parse(assignment['Answer']) + if not isinstance(xml_doc['QuestionFormAnswers']['Answer'],list): + answers = [xml_doc['QuestionFormAnswers']['Answer']] + else: + answers = xml_doc['QuestionFormAnswers']['Answer'] + + print('Input from worker:') + assignment['answers'] = {} + # Multiple fields in HIT layout + for answer_field in answers: + # logger.debug(f"Fetch results for {hit['HIT']['HITId']}") + print (f"{answer_field['QuestionIdentifier']}: {answer_field['FreeText']}") + #store the dict/json object + assignment['answers'][answer_field['QuestionIdentifier']] = answer_field['FreeText'] + # answer_filename + + if not os.path.exists(answer_filename): + with open(answer_filename, 'w') as fp: + logger.debug(f"Save {answer_filename}") + json.dump(assignment, fp, cls=DateTimeEncoder, indent=4) + + if not confirm("Accept input of worker (no = reject!!)", True) and confirm("Are you sure you want to reject this user?! (reason in next step)", False): + # reject + reason = input("Reason for rejection (no newlines)") + response = self.mturk.reject_assignment( + AssignmentId=assignment['AssignmentId'], + RequesterFeedback=reason + ) + else: + response = self.mturk.approve_assignment( + AssignmentId=assignment['AssignmentId'] + # RequesterFeedback=reason + ) + + + # save with new status after all processing is succesfull + with open(hit_file, 'w') as fp: + json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4) + # print(hits) + + + def publish_hit(self, batch) -> str: + xml = batch_to_xml(batch) + new_hit = self.mturk.create_hit( + Title = batch['title'], + Description = batch['summary'], + Keywords = batch['keywords'], + Reward = str(batch['reward']), + MaxAssignments = 1, + LifetimeInSeconds = 172800, + AssignmentDurationInSeconds = 600, + AutoApprovalDelayInSeconds = 14400, + Question = xml, + ) + # logger.info("HIT created") + logger.info(f"HIT created. Preview: {self.frontend_url}/mturk/preview?groupId={new_hit['HIT']['HITGroupId']}") + logger.debug(f"{new_hit=}") + + hit_file = f"batches/hits/{new_hit['HIT']['HITId']}.json" + with open(hit_file, 'w') as fp: + json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4) + + logger.info(f"wrote to {hit_file}") + + #TODO save hit id to batch!! + append_link(batch, new_hit) + + # new_hit['HIT']['HITId'] + + +def get_batch_files(directory) -> list: + files = glob.glob(os.path.join(directory, "*.json")) + return files + +def open_batch(batch_file) -> dict: + with open(batch_file, 'r') as fp: + batch = json.load(fp) + batch['file'] = batch_file + return batch + +def append_link(batch, hit): + dirname = os.path.dirname(batch['file']) + logfile = os.path.join(dirname, 'batch_hits.json') + if os.path.exists(logfile): + with open(logfile, 'r') as fp: + links = json.load(fp) + else: + links = {} + + if batch['file'] not in links: + links[batch['file']] = [] + + links[batch['file']].append(hit['HIT']['HITId']) + + with open(logfile, 'w') as fp: + logger.info(f"wrote {logfile=}") + json.dump(links, fp, indent=4) + +def batch_to_xml(batch): + logger.debug(f"To xml {batch=}") + + xml = f""" + + + + + + + + + + + + + + {parse_string(batch['summary'])} + + + {parse_string(batch['instructions'])} + + +
+ {parse_string(batch['instructions'])} +
+
+""" + + for i, q in enumerate(batch['questions']): + xml += f""" +
+ {parse_string(q['question'])} + +
+ """ + xml += """
+ + + ]]>
+ 0 +
+ """ + return xml + +def parse_string(string_or_array) -> str: + if not isinstance(string_or_array, list): + string_or_array = [string_or_array] + + return "\n".join([f"

{s}

" for s in string_or_array]) + + +# By Raghuram Devarakonda on https://code.activestate.com/recipes/541096-prompt-the-user-for-confirmation/ +def confirm(prompt=None, resp=False): + """prompts for yes or no response from the user. Returns True for yes and + False for no. + + 'resp' should be set to the default value assumed by the caller when + user simply types ENTER. + + >>> confirm(prompt='Create Directory?', resp=True) + Create Directory? [y]|n: + True + >>> confirm(prompt='Create Directory?', resp=False) + Create Directory? [n]|y: + False + >>> confirm(prompt='Create Directory?', resp=False) + Create Directory? [n]|y: y + True + + """ + + if prompt is None: + prompt = 'Confirm' + + if resp: + prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n') + else: + prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y') + + while True: + ans = input(prompt) + if not ans: + return resp + if ans not in ['y', 'Y', 'n', 'N']: + print ('please enter y or n.') + continue + if ans == 'y' or ans == 'Y': + return True + if ans == 'n' or ans == 'N': + return False diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..2ce9746 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "exhausting_mturk" +version = "0.1.0" +description = "" +authors = ["Ruben van de Ven "] + +[tool.poetry.dependencies] +python = "^3.9" +boto3 = "^1.17.105" +xmltodict = "^0.12.0" +coloredlogs = "^15.0.1" +PyYAML = "^5.4.1" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api"