guest_worker/sorteerhoed/central_management.py

336 lines
16 KiB
Python
Raw Normal View History

2019-10-23 08:56:28 +00:00
import logging
import yaml
from sorteerhoed import HITStore
import os
import subprocess
import boto3
import threading
from queue import Queue
from sorteerhoed.plotter import Plotter
import queue
from sorteerhoed.sqs import SqsListener
from sorteerhoed.webserver import Server
import time
2019-10-23 20:33:37 +00:00
from sorteerhoed.Signal import Signal
import io
from PIL import Image
import datetime
2019-10-23 08:56:28 +00:00
class CentralManagement():
"""
Central management reads config and controls process flow
The HIT Store is the archive of hits
mturk thread communicates with mturk
server thread is tornado communicating with the turkers and with the status interface on the installation
Plotter thread reads plotter queue and sends it to there
Scanner is for now a simpe imagescan command
SQS: thread that listens for updates from Amazon
"""
def __init__(self, debug_mode):
self.logger = logging.getLogger("sorteerhoed").getChild('management')
self.debug = debug_mode
self.currentHit = None
self.eventQueue = Queue()
self.isRunning = threading.Event()
2019-10-30 11:44:25 +00:00
self.isScanning = threading.Event()
2019-10-23 08:56:28 +00:00
def loadConfig(self, filename):
with open(filename, 'r') as fp:
self.logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
varDb = os.path.join(
# self.config['storage_dir'],
'hit_store.db'
)
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
self.logger.debug(f"Loaded configuration: {self.config}")
# self.amazon =
# self.server
# self.panopticon = Panopticon(self, self.config, self.voiceStorage)
def start(self):
self.isRunning.set()
try:
# M-turk connection
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
self.mturk = boto3.client('mturk',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name='us-east-1',
endpoint_url = MTURK_SANDBOX
)
self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}")
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
2019-10-23 20:33:37 +00:00
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
2019-10-23 08:56:28 +00:00
sqsThread.start()
# the plotter itself
self.plotter = Plotter(self.config, self.eventQueue, self.isRunning)
2019-10-23 20:33:37 +00:00
plotterThread = threading.Thread(target=self.plotter.start, name='plotter')
2019-10-23 08:56:28 +00:00
plotterThread.start()
# webserver for turks and status
2019-10-23 20:33:37 +00:00
self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store)
serverThread = threading.Thread(target=self.server.start, name='server')
2019-10-23 08:56:28 +00:00
serverThread.start()
# event listener:
2019-10-23 20:33:37 +00:00
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
2019-10-23 08:56:28 +00:00
dispatcherThread.start()
2019-10-23 20:33:37 +00:00
#
#
self.eventQueue.put(Signal('start', {'ding':'test'}))
2019-10-23 08:56:28 +00:00
2019-10-23 20:33:37 +00:00
while self.isRunning.is_set():
time.sleep(.5)
2019-10-23 08:56:28 +00:00
finally:
self.isRunning.clear()
self.server.stop()
def eventListener(self):
while self.isRunning.is_set():
try:
signal = self.eventQueue.get(True, 1)
except queue.Empty:
pass
# self.logger.log(5, "Empty queue.")
else:
"""
Possible events:
- SQS events: accept/abandoned/returned/submitted
- webserver events: open, draw, submit
- scan complete
- HIT created
- Plotter complete
-
"""
2019-10-23 20:33:37 +00:00
#TODO: make level debug()
self.logger.warn(f"SIGNAL: {signal}")
if signal.name == 'start':
self.makeHit()
pass
elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT
2019-10-30 14:19:32 +00:00
self.currentHit.scanned_at = datetime.datetime.utcnow()
self.server.statusPage.set('state', self.currentHit.getStatus())
2019-10-30 11:44:25 +00:00
self.makeHit()
elif signal.name == 'scan.start':
self.isScanning.set()
elif signal.name == 'scan.finished':
self.isScanning.clear()
2019-10-23 20:33:37 +00:00
elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue
2019-10-30 11:44:25 +00:00
for name, value in signal.params.items():
if name == 'hit_id':
continue
2019-10-30 14:19:32 +00:00
if name == 'ip':
self.currentHit.turk_ip = value
if name == 'location':
self.currentHit.turk_country = value
2019-10-23 20:33:37 +00:00
self.logger.debug(f'Set status: {name} to {value}')
self.server.statusPage.set(name, value)
2019-10-30 11:44:25 +00:00
elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.setLight(True)
2019-10-23 20:33:37 +00:00
elif signal.name == 'server.submit':
2019-10-30 11:44:25 +00:00
self.currentHit.submit_page_at = datetime.datetime.utcnow()
2019-10-23 20:33:37 +00:00
self.store.saveHIT(self.currentHit)
self.plotter.park()
self.setLight(False)
# park always triggers a plotter.finished after being processed
2019-10-30 11:44:25 +00:00
elif signal.name[:4] == 'sqs.':
if signal.params['event']['HITId'] != self.currentHit.id:
self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT")
sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId'])
updateStatus = False
else:
sqsHit = self.currentHit
updateStatus = True
if signal.name == 'sqs.AssignmentAccepted':
self.logger.info(f'Set status progress to accepted')
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsHit.worker_id = signal.params['event']['WorkerId']
if updateStatus:
self.server.statusPage.set('worker_id', sqsHit.worker_id)
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentAbandoned':
self.logger.info(f'Set status progress to abandoned')
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
2019-10-30 11:44:25 +00:00
sqsHit.accept_time = None
sqsHit.open_page_at = None
self.reset()
if updateStatus:
self.setLight(False)
2019-10-30 11:44:25 +00:00
elif signal.name == 'sqs.AssignmentReturned':
self.logger.info(f'Set status progress to returned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
self.reset()
if updateStatus:
self.setLight(False)
2019-10-30 11:44:25 +00:00
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentSubmitted':
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
self.logger.info(f'Set status progress to submitted')
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
2019-10-30 14:19:32 +00:00
sqsHit.answer = signal.params['event']['Answer']
if sqsHit.uuid not in sqsHit.answer:
self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}")
2019-10-30 11:44:25 +00:00
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
self.store.saveHIT(sqsHit)
if updateStatus:
# TODO: have HITStore/HIT take care of this by emitting a signal
# only update status if it is the currentHit
self.server.statusPage.set('state', sqsHit.getStatus())
2019-10-23 20:33:37 +00:00
elif signal.name == 'plotter.finished':
if self.currentHit.submit_page_at:
2019-10-30 11:44:25 +00:00
scan = threading.Thread(target=self.scanImage, name='scan')
scan.start()
else:
self.logger.critical(f"Unknown signal: {signal.name}")
2019-10-23 20:33:37 +00:00
2019-10-23 08:56:28 +00:00
def makeHit(self):
2019-10-30 11:44:25 +00:00
self.server.statusPage.reset()
2019-10-23 20:33:37 +00:00
self.currentHit = self.store.createHIT()
2019-10-23 08:56:28 +00:00
2019-10-23 20:33:37 +00:00
self.logger.info(f"Make HIT {self.currentHit.id}")
2019-10-23 08:56:28 +00:00
2019-10-23 20:33:37 +00:00
question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
2019-10-30 11:44:25 +00:00
estimatedHitDuration = self.store.getAvgDurationOfPreviousNHits(5)
fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration
self.logger.debug(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
2019-10-23 08:56:28 +00:00
new_hit = self.mturk.create_hit(
Title = 'Trace the drawn line',
Description = 'Draw a line over the sketched line in the image',
Keywords = 'polygons, trace, draw',
2019-10-30 11:44:25 +00:00
Reward = "{:.2f}".format(fee),
2019-10-23 08:56:28 +00:00
MaxAssignments = 1,
LifetimeInSeconds = self.config['hit_lifetime'],
AssignmentDurationInSeconds = self.config['hit_assignment_duration'],
AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'],
Question = question,
)
2019-10-23 20:33:37 +00:00
self.logger.info(f"Created hit: {new_hit}")
2019-10-23 08:56:28 +00:00
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
self.currentHit.hit_id = new_hit['HIT']['HITId']
2019-10-30 11:44:25 +00:00
2019-10-23 20:33:37 +00:00
self.store.saveHIT(self.currentHit)
2019-10-30 11:44:25 +00:00
# TODO: have HITStore/HIT take care of this by emitting a signal
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
2019-10-23 08:56:28 +00:00
# mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']:
notification_info= self.mturk.update_notification_settings(
HITTypeId=new_hit['HIT']['HITTypeId'],
Notification = {
'Destination' : self.config['amazon']['sqs_url'],
'Transport': 'SQS',
'Version': '2014-08-15',
'EventTypes': [
'AssignmentAccepted',
'AssignmentAbandoned',
'AssignmentReturned',
'AssignmentSubmitted',
# 'AssignmentRejected',
# 'AssignmentApproved',
# 'HITCreated',
# 'HITExpired',
# 'HITReviewable',
# 'HITExtended',
# 'HITDisposed',
# 'Ping',
]
},
Active=True
)
self.logger.debug(notification_info)
2019-10-30 11:44:25 +00:00
def cleanDrawing(self):
self.eventQueue.put(Signal('scan.start'))
# Scan to reset
cmd = [
'sudo', 'scanimage', '-d', 'epkowa'
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
self.eventQueue.put(Signal('system.reset'))
self.eventQueue.put(Signal('scan.finished'))
def reset(self) -> str:
# TODO: for returns & abandons
scan = threading.Thread(target=self.cleanDrawing, name='reset')
scan.start()
2019-10-23 08:56:28 +00:00
def scanImage(self) -> str:
"""
Run scanimage on scaner and returns a string with the filename
"""
2019-10-30 11:44:25 +00:00
self.eventQueue.put(Signal('scan.start'))
2019-10-31 12:55:22 +00:00
2019-10-23 08:56:28 +00:00
cmd = [
2019-10-31 12:55:22 +00:00
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'jpeg',
'--resolution=100', '-l','20','-t','30','-x',(self.config['scanner']['height']),
'-y',str(self.config['scanner']['width'])
2019-10-23 08:56:28 +00:00
]
2019-10-23 20:33:37 +00:00
filename = self.currentHit.getImagePath()
2019-10-23 08:56:28 +00:00
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2019-10-23 20:33:37 +00:00
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
2019-10-30 11:44:25 +00:00
#TODO: should clear self.isRunning.clear() ?
2019-10-23 20:33:37 +00:00
f = io.BytesIO(o)
img = Image.open(f)
2019-10-31 12:55:22 +00:00
img = img.transpose(Image.ROTATE_90)
2019-10-23 20:33:37 +00:00
img.save(filename)
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
2019-10-30 11:44:25 +00:00
self.eventQueue.put(Signal('scan.finished'))
2019-10-23 08:56:28 +00:00
def setLight(self, on):
value = 1 if on else 0
cmd = [
'usbrelay', f'HURTM_1={value}'
]
self.logger.info(f"Trigger light {cmd}")
code = subprocess.call(cmd)
if code > 0:
self.logger.warning(f"Error on light change: {code}")