basic functionality, creating a gazillion json files

This commit is contained in:
Ruben van de Ven 2021-07-07 09:22:38 +02:00
commit c1d454f512
12 changed files with 421 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
config.local.yml

1
.python-version Normal file
View file

@ -0,0 +1 @@
3.9.2

17
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: launch",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/exhaust.py",
"console": "integratedTerminal",
"args": ["-c", "config.local.yml", "-v"]
}
]
}

17
README.md Normal file
View file

@ -0,0 +1,17 @@
# publish hits to mturh and fetch results
## install
`poetry install`
## usage
Create a `config.local.yml` based on `config.example.yml`.
Create a json file in the batches directory based on example.json.
`poetry run python exhaust.py -c config.local.yml --publish batches/batch1.json`
then later, fetch results/submissions using
`poetry run python exhaust.py -c config.local.yml --update`

2
batches/answers/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*
!.gitignore

25
batches/example.json Normal file
View file

@ -0,0 +1,25 @@
{
"title": "Write 5 short texts about working on MTurk",
"summary": "Write 5 short texts where you explain your point of view on each topic.",
"instructions": [
"Please read each topic carefully and then write a short text where you explain <strong>your</strong> point of view.",
"There is not a minimum number of words required but please write the number of words that you think is fair for 2$."
],
"reward": 2,
"keywords": "text",
"questions": [
{
"question": [
"In a recent survey about working on MTurk, a fellow worker mentioned that they at one time had a \"good day\" on MTurk where they earned so much that they threw a party to celebrate.",
"Please explain if you agree that sometimes there are good days on MTurk that are reason to celebrate and why (or why not):"
],
"in_response_to": "answer_id"
},
{
"question": [
"Please expain...."
],
"in_response_to": null
}
]
}

2
batches/hits/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*
!.gitignore

8
config.example.yml Normal file
View file

@ -0,0 +1,8 @@
amazon:
user_id: ...
user_secret: "..."
mturk_sandbox: true
mturk_region: us-east-1
sqs_endpoint_url: "https://sqs.eu-west-3.amazonaws.com/"
sqs_url: "https://sqs.eu-west-3.amazonaws.com/12345/test"
sqs_region_name: "eu-west-3"

76
exhaust.py Normal file
View file

@ -0,0 +1,76 @@
import exhausting_mturk.api
import argparse
import logging
import coloredlogs
if __name__ == "__main__":
argParser = argparse.ArgumentParser(
description='Make and request hits')
argParser.add_argument(
'--batchdir',
'-d',
type=str,
default="batches",
help='directory to read and write from'
)
argParser.add_argument(
'--config',
'-c',
required=True,
type=str,
help='The yaml config file to load'
)
argParser.add_argument(
'--verbose',
'-v',
action='store_true',
help='Increase log level'
)
argParser.add_argument(
'--publish',
'-p',
type=str,
help='Publish a batch to MTurk'
)
argParser.add_argument(
'--update',
'-u',
action='store_true',
help='fetch HIT status and assignments from MTurk'
)
argParser.add_argument(
'--for-real',
action='store_true',
help='run on live MTurk instead of sandbox'
)
args = argParser.parse_args()
loglevel = logging.DEBUG if args.verbose else logging.INFO
coloredlogs.install(
level=loglevel,
fmt="%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s"
)
logger = logging.getLogger("exhausting_mturk")
logging.getLogger('botocore').setLevel(logging.INFO)
connection = exhausting_mturk.api.Connection(args.config)
print(f"{connection}")
batch_files = exhausting_mturk.api.get_batch_files(args.batchdir)
logger.info(f"{batch_files=}")
# print(exhausting_mturk.api.batch_to_xml(batches[0]))
if args.update:
connection.load_new_submissions()
elif args.publish:
if args.publish not in batch_files:
raise Exception(f'Not a valid batch. Use one of {batches=}')
batch = exhausting_mturk.api.open_batch(args.publish)
connection.publish_hit(batch)
# connection.load_new_submissions()

View file

@ -0,0 +1 @@
__version__ = '0.1.0'

255
exhausting_mturk/api.py Normal file
View file

@ -0,0 +1,255 @@
import os
import boto3
import xmltodict
import glob
import logging
import json
import datetime
import yaml
logger = logging.getLogger("exhausting_mturk").getChild("api")
# To avoid "Object of type datetime is not JSON serializable"
class DateTimeEncoder(json.JSONEncoder):
def default(self, z):
if isinstance(z, datetime.datetime):
return (str(z))
else:
return super().default(z)
class Connection():
def __init__(self, config_file, for_real = False):
with open(config_file, 'r') as fp:
self.config = yaml.safe_load(fp)
self.config['for_real'] = for_real
self.frontend_url = "https://worker.mturk.com" if for_real else "https://workersandbox.mturk.com"
# M-turk connection
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com'
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
self.mturk = boto3.client('mturk',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name='us-east-1',
endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX
)
def load_new_submissions(self):
hit_files = glob.glob('batches/hits/*.json')
for hit_file in hit_files:
with open(hit_file, 'r') as fp:
hit = json.load(fp)
if hit['HIT']['HITStatus'] == 'Assignable' or hit['HIT']['HITStatus'] == 'Unassignable':
logger.info(f"Fetch more info for hit {hit['HIT']['HITId']}")
new_hit = self.mturk.get_hit(HITId=hit['HIT']['HITId'])
if hit['HIT']['HITStatus'] == new_hit['HIT']['HITStatus']:
logger.info(f"Status kept for {hit['HIT']['HITId']}")
continue
hit = new_hit
if hit['HIT']['HITStatus'] == 'Reviewable':
# fetch results
logger.debug(f"Fetch results for {hit['HIT']['HITId']}")
worker_results = self.mturk.list_assignments_for_hit(
HITId=hit['HIT']['HITId'],
AssignmentStatuses=['Submitted']
)
for assignment in worker_results['Assignments']:
logger.info(f"{assignment=}")
answer_filename = f"batches/answers/{hit['HIT']['HITId']}_{assignment['AssignmentId']}.json"
xml_doc = xmltodict.parse(assignment['Answer'])
if not isinstance(xml_doc['QuestionFormAnswers']['Answer'],list):
answers = [xml_doc['QuestionFormAnswers']['Answer']]
else:
answers = xml_doc['QuestionFormAnswers']['Answer']
print('Input from worker:')
assignment['answers'] = {}
# Multiple fields in HIT layout
for answer_field in answers:
# logger.debug(f"Fetch results for {hit['HIT']['HITId']}")
print (f"{answer_field['QuestionIdentifier']}: {answer_field['FreeText']}")
#store the dict/json object
assignment['answers'][answer_field['QuestionIdentifier']] = answer_field['FreeText']
# answer_filename
if not os.path.exists(answer_filename):
with open(answer_filename, 'w') as fp:
logger.debug(f"Save {answer_filename}")
json.dump(assignment, fp, cls=DateTimeEncoder, indent=4)
if not confirm("Accept input of worker (no = reject!!)", True) and confirm("Are you sure you want to reject this user?! (reason in next step)", False):
# reject
reason = input("Reason for rejection (no newlines)")
response = self.mturk.reject_assignment(
AssignmentId=assignment['AssignmentId'],
RequesterFeedback=reason
)
else:
response = self.mturk.approve_assignment(
AssignmentId=assignment['AssignmentId']
# RequesterFeedback=reason
)
# save with new status after all processing is succesfull
with open(hit_file, 'w') as fp:
json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4)
# print(hits)
def publish_hit(self, batch) -> str:
xml = batch_to_xml(batch)
new_hit = self.mturk.create_hit(
Title = batch['title'],
Description = batch['summary'],
Keywords = batch['keywords'],
Reward = str(batch['reward']),
MaxAssignments = 1,
LifetimeInSeconds = 172800,
AssignmentDurationInSeconds = 600,
AutoApprovalDelayInSeconds = 14400,
Question = xml,
)
# logger.info("HIT created")
logger.info(f"HIT created. Preview: {self.frontend_url}/mturk/preview?groupId={new_hit['HIT']['HITGroupId']}")
logger.debug(f"{new_hit=}")
hit_file = f"batches/hits/{new_hit['HIT']['HITId']}.json"
with open(hit_file, 'w') as fp:
json.dump(new_hit, fp, cls=DateTimeEncoder, indent=4)
logger.info(f"wrote to {hit_file}")
#TODO save hit id to batch!!
append_link(batch, new_hit)
# new_hit['HIT']['HITId']
def get_batch_files(directory) -> list:
files = glob.glob(os.path.join(directory, "*.json"))
return files
def open_batch(batch_file) -> dict:
with open(batch_file, 'r') as fp:
batch = json.load(fp)
batch['file'] = batch_file
return batch
def append_link(batch, hit):
dirname = os.path.dirname(batch['file'])
logfile = os.path.join(dirname, 'batch_hits.json')
if os.path.exists(logfile):
with open(logfile, 'r') as fp:
links = json.load(fp)
else:
links = {}
if batch['file'] not in links:
links[batch['file']] = []
links[batch['file']].append(hit['HIT']['HITId'])
with open(logfile, 'w') as fp:
logger.info(f"wrote {logfile=}")
json.dump(links, fp, indent=4)
def batch_to_xml(batch):
logger.debug(f"To xml {batch=}")
xml = f"""
<HTMLQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd">
<HTMLContent><![CDATA[
<!DOCTYPE html>
<body>
<!-- You must include this JavaScript file -->
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<!-- For the full list of available Crowd HTML Elements and their input/output documentation,
please refer to https://docs.aws.amazon.com/sagemaker/latest/dg/sms-ui-template-reference.html -->
<!-- You must include crowd-form so that your task submits answers to MTurk -->
<crowd-form answer-format="flatten-objects">
<crowd-instructions link-text="View instructions" link-type="button">
<short-summary>
{parse_string(batch['summary'])}
</short-summary>
<detailed-instructions>
{parse_string(batch['instructions'])}
</detailed-instructions>
</crowd-instructions>
<div>
{parse_string(batch['instructions'])}
</div>
<hr />
"""
for i, q in enumerate(batch['questions']):
xml += f"""
<div>
{parse_string(q['question'])}
<crowd-text-area name="q{i}" rows="4" placeholder="Please write your explanation here" required></crowd-text-area>
</div>
"""
xml += """</crowd-form>
</body>
</html>
]]></HTMLContent>
<FrameHeight>0</FrameHeight>
</HTMLQuestion>
"""
return xml
def parse_string(string_or_array) -> str:
if not isinstance(string_or_array, list):
string_or_array = [string_or_array]
return "\n".join([f"<p>{s}</p>" for s in string_or_array])
# By Raghuram Devarakonda on https://code.activestate.com/recipes/541096-prompt-the-user-for-confirmation/
def confirm(prompt=None, resp=False):
"""prompts for yes or no response from the user. Returns True for yes and
False for no.
'resp' should be set to the default value assumed by the caller when
user simply types ENTER.
>>> confirm(prompt='Create Directory?', resp=True)
Create Directory? [y]|n:
True
>>> confirm(prompt='Create Directory?', resp=False)
Create Directory? [n]|y:
False
>>> confirm(prompt='Create Directory?', resp=False)
Create Directory? [n]|y: y
True
"""
if prompt is None:
prompt = 'Confirm'
if resp:
prompt = '%s [%s]|%s: ' % (prompt, 'y', 'n')
else:
prompt = '%s [%s]|%s: ' % (prompt, 'n', 'y')
while True:
ans = input(prompt)
if not ans:
return resp
if ans not in ['y', 'Y', 'n', 'N']:
print ('please enter y or n.')
continue
if ans == 'y' or ans == 'Y':
return True
if ans == 'n' or ans == 'N':
return False

16
pyproject.toml Normal file
View file

@ -0,0 +1,16 @@
[tool.poetry]
name = "exhausting_mturk"
version = "0.1.0"
description = ""
authors = ["Ruben van de Ven <git@rubenvandeven.com>"]
[tool.poetry.dependencies]
python = "^3.9"
boto3 = "^1.17.105"
xmltodict = "^0.12.0"
coloredlogs = "^15.0.1"
PyYAML = "^5.4.1"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"