339 lines
17 KiB
Python
339 lines
17 KiB
Python
import logging
|
|
import yaml
|
|
from sorteerhoed import HITStore
|
|
import os
|
|
import subprocess
|
|
import boto3
|
|
import threading
|
|
from queue import Queue
|
|
from sorteerhoed.plotter import Plotter
|
|
import queue
|
|
from sorteerhoed.sqs import SqsListener
|
|
from sorteerhoed.webserver import Server
|
|
import time
|
|
from sorteerhoed.Signal import Signal
|
|
import io
|
|
from PIL import Image
|
|
import datetime
|
|
|
|
|
|
class CentralManagement():
|
|
"""
|
|
Central management reads config and controls process flow
|
|
|
|
The HIT Store is the archive of hits
|
|
mturk thread communicates with mturk
|
|
server thread is tornado communicating with the turkers and with the status interface on the installation
|
|
Plotter thread reads plotter queue and sends it to there
|
|
Scanner is for now a simpe imagescan command
|
|
SQS: thread that listens for updates from Amazon
|
|
"""
|
|
def __init__(self, debug_mode):
|
|
self.logger = logging.getLogger("sorteerhoed").getChild('management')
|
|
self.debug = debug_mode
|
|
self.currentHit = None
|
|
|
|
self.eventQueue = Queue()
|
|
self.isRunning = threading.Event()
|
|
self.isScanning = threading.Event()
|
|
|
|
|
|
def loadConfig(self, filename):
|
|
with open(filename, 'r') as fp:
|
|
self.logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
varDb = os.path.join(
|
|
# self.config['storage_dir'],
|
|
'hit_store.db'
|
|
)
|
|
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
|
|
|
|
self.logger.debug(f"Loaded configuration: {self.config}")
|
|
# self.amazon =
|
|
# self.server
|
|
|
|
|
|
# self.panopticon = Panopticon(self, self.config, self.voiceStorage)
|
|
def start(self):
|
|
self.isRunning.set()
|
|
|
|
try:
|
|
|
|
# M-turk connection
|
|
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
|
|
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
|
|
self.mturk = boto3.client('mturk',
|
|
aws_access_key_id = self.config['amazon']['user_id'],
|
|
aws_secret_access_key = self.config['amazon']['user_secret'],
|
|
region_name='us-east-1',
|
|
endpoint_url = MTURK_SANDBOX
|
|
)
|
|
|
|
self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}")
|
|
|
|
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
|
|
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
|
|
sqsThread.start()
|
|
|
|
# the plotter itself
|
|
self.plotter = Plotter(self.config, self.eventQueue, self.isRunning)
|
|
plotterThread = threading.Thread(target=self.plotter.start, name='plotter')
|
|
plotterThread.start()
|
|
|
|
# webserver for turks and status
|
|
self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store)
|
|
serverThread = threading.Thread(target=self.server.start, name='server')
|
|
serverThread.start()
|
|
|
|
# event listener:
|
|
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
|
|
dispatcherThread.start()
|
|
#
|
|
#
|
|
|
|
self.eventQueue.put(Signal('start', {'ding':'test'}))
|
|
|
|
while self.isRunning.is_set():
|
|
time.sleep(.5)
|
|
|
|
finally:
|
|
self.isRunning.clear()
|
|
self.server.stop()
|
|
|
|
|
|
def eventListener(self):
|
|
while self.isRunning.is_set():
|
|
try:
|
|
signal = self.eventQueue.get(True, 1)
|
|
except queue.Empty:
|
|
pass
|
|
# self.logger.log(5, "Empty queue.")
|
|
else:
|
|
"""
|
|
Possible events:
|
|
- SQS events: accept/abandoned/returned/submitted
|
|
- webserver events: open, draw, submit
|
|
- scan complete
|
|
- HIT created
|
|
- Plotter complete
|
|
-
|
|
"""
|
|
#TODO: make level debug()
|
|
self.logger.warn(f"SIGNAL: {signal}")
|
|
if signal.name == 'start':
|
|
self.makeHit()
|
|
pass
|
|
elif signal.name == 'hit.scanned':
|
|
# TODO: wrap up hit & make new HIT
|
|
self.currentHit.scanned_at = datetime.datetime.utcnow()
|
|
self.server.statusPage.set('state', self.currentHit.getStatus())
|
|
self.makeHit()
|
|
elif signal.name == 'scan.start':
|
|
self.isScanning.set()
|
|
elif signal.name == 'scan.finished':
|
|
self.isScanning.clear()
|
|
elif signal.name == 'hit.info':
|
|
if signal.params['hit_id'] != self.currentHit.id:
|
|
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
|
|
continue
|
|
for name, value in signal.params.items():
|
|
if name == 'hit_id':
|
|
continue
|
|
if name == 'ip':
|
|
self.currentHit.turk_ip = value
|
|
if name == 'location':
|
|
self.currentHit.turk_country = value
|
|
|
|
self.logger.debug(f'Set status: {name} to {value}')
|
|
self.server.statusPage.set(name, value)
|
|
elif signal.name == 'server.open':
|
|
self.currentHit.open_page_at = datetime.datetime.utcnow()
|
|
self.store.saveHIT(self.currentHit)
|
|
self.setLight(True)
|
|
|
|
elif signal.name == 'server.submit':
|
|
self.currentHit.submit_page_at = datetime.datetime.utcnow()
|
|
self.store.saveHIT(self.currentHit)
|
|
self.plotter.park()
|
|
self.setLight(False)
|
|
# park always triggers a plotter.finished after being processed
|
|
|
|
elif signal.name[:4] == 'sqs.':
|
|
if signal.params['event']['HITId'] != self.currentHit.id:
|
|
self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT")
|
|
sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId'])
|
|
updateStatus = False
|
|
else:
|
|
sqsHit = self.currentHit
|
|
updateStatus = True
|
|
|
|
if signal.name == 'sqs.AssignmentAccepted':
|
|
self.logger.info(f'Set status progress to accepted')
|
|
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
sqsHit.worker_id = signal.params['event']['WorkerId']
|
|
if updateStatus:
|
|
self.server.statusPage.set('worker_id', sqsHit.worker_id)
|
|
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
elif signal.name == 'sqs.AssignmentAbandoned':
|
|
self.logger.info(f'Set status progress to abandoned')
|
|
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
sqsHit.accept_time = None
|
|
sqsHit.open_page_at = None
|
|
self.reset()
|
|
if updateStatus:
|
|
self.setLight(False)
|
|
|
|
elif signal.name == 'sqs.AssignmentReturned':
|
|
self.logger.info(f'Set status progress to returned')
|
|
sqsHit.accept_time = None
|
|
sqsHit.open_page_at = None
|
|
self.reset()
|
|
if updateStatus:
|
|
self.setLight(False)
|
|
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
elif signal.name == 'sqs.AssignmentSubmitted':
|
|
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
|
|
self.logger.info(f'Set status progress to submitted')
|
|
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
|
|
sqsHit.answer = signal.params['event']['Answer']
|
|
if sqsHit.uuid not in sqsHit.answer:
|
|
self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}")
|
|
|
|
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
self.store.saveHIT(sqsHit)
|
|
|
|
if updateStatus:
|
|
# TODO: have HITStore/HIT take care of this by emitting a signal
|
|
# only update status if it is the currentHit
|
|
self.server.statusPage.set('state', sqsHit.getStatus())
|
|
elif signal.name == 'plotter.finished':
|
|
if self.currentHit.submit_page_at:
|
|
scan = threading.Thread(target=self.scanImage, name='scan')
|
|
scan.start()
|
|
else:
|
|
self.logger.critical(f"Unknown signal: {signal.name}")
|
|
|
|
|
|
def makeHit(self):
|
|
self.server.statusPage.reset()
|
|
self.currentHit = self.store.createHIT()
|
|
|
|
self.logger.info(f"Make HIT {self.currentHit.id}")
|
|
|
|
question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
|
|
estimatedHitDuration = self.store.getAvgDurationOfPreviousNHits(5)
|
|
|
|
fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration
|
|
self.currentHit.fee = fee
|
|
self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
|
|
new_hit = self.mturk.create_hit(
|
|
Title = 'Trace the drawn line',
|
|
Description = 'Draw a line over the sketched line in the image',
|
|
Keywords = 'polygons, trace, draw',
|
|
Reward = "{:.2f}".format(fee),
|
|
MaxAssignments = 1,
|
|
LifetimeInSeconds = self.config['hit_lifetime'],
|
|
AssignmentDurationInSeconds = self.config['hit_assignment_duration'],
|
|
AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'],
|
|
Question = question,
|
|
)
|
|
|
|
self.logger.info(f"Created hit: {new_hit}")
|
|
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
|
|
|
|
self.currentHit.hit_id = new_hit['HIT']['HITId']
|
|
|
|
self.store.saveHIT(self.currentHit)
|
|
# TODO: have HITStore/HIT take care of this by emitting a signal
|
|
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
|
|
|
|
# mturk.send_test_event_notification()
|
|
if self.config['amazon']['sqs_url']:
|
|
notification_info= self.mturk.update_notification_settings(
|
|
HITTypeId=new_hit['HIT']['HITTypeId'],
|
|
Notification = {
|
|
'Destination' : self.config['amazon']['sqs_url'],
|
|
'Transport': 'SQS',
|
|
'Version': '2014-08-15',
|
|
'EventTypes': [
|
|
'AssignmentAccepted',
|
|
'AssignmentAbandoned',
|
|
'AssignmentReturned',
|
|
'AssignmentSubmitted',
|
|
# 'AssignmentRejected',
|
|
# 'AssignmentApproved',
|
|
# 'HITCreated',
|
|
# 'HITExpired',
|
|
# 'HITReviewable',
|
|
# 'HITExtended',
|
|
# 'HITDisposed',
|
|
# 'Ping',
|
|
]
|
|
},
|
|
Active=True
|
|
)
|
|
self.logger.debug(notification_info)
|
|
|
|
def cleanDrawing(self):
|
|
self.eventQueue.put(Signal('scan.start'))
|
|
# Scan to reset
|
|
cmd = [
|
|
'sudo', 'scanimage', '-d', 'epkowa'
|
|
]
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
|
# opens connection to scanner, but only starts scanning when output becomes ready:
|
|
o, e = proc.communicate(80)
|
|
if e:
|
|
self.logger.critical(f"Scanner caused: {e.decode()}")
|
|
|
|
self.eventQueue.put(Signal('system.reset'))
|
|
self.eventQueue.put(Signal('scan.finished'))
|
|
|
|
def reset(self) -> str:
|
|
# TODO: for returns & abandons
|
|
scan = threading.Thread(target=self.cleanDrawing, name='reset')
|
|
scan.start()
|
|
|
|
def scanImage(self) -> str:
|
|
"""
|
|
Run scanimage on scaner and returns a string with the filename
|
|
"""
|
|
if self.isScanning.is_set():
|
|
raise Exception("Already scanning!")
|
|
|
|
self.eventQueue.put(Signal('scan.start'))
|
|
cmd = [
|
|
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'jpeg',
|
|
'--resolution=100', '-l','20','-t','30','-x',str(255),
|
|
'-y',str(185)
|
|
]
|
|
self.logger.info(f"{cmd}")
|
|
filename = self.currentHit.getImagePath()
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# opens connection to scanner, but only starts scanning when output becomes ready:
|
|
o, e = proc.communicate(80)
|
|
if e:
|
|
self.logger.critical(f"Scanner caused: {e.decode()}")
|
|
#TODO: should clear self.isRunning.clear() ?
|
|
|
|
f = io.BytesIO(o)
|
|
img = Image.open(f)
|
|
img = img.transpose(Image.ROTATE_90)
|
|
img.save(filename)
|
|
|
|
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
|
|
self.eventQueue.put(Signal('scan.finished'))
|
|
|
|
def setLight(self, on):
|
|
value = 1 if on else 0
|
|
cmd = [
|
|
'usbrelay', f'HURTM_1={value}'
|
|
]
|
|
self.logger.info(f"Trigger light {cmd}")
|
|
code = subprocess.call(cmd)
|
|
if code > 0:
|
|
self.logger.warning(f"Error on light change: {code}")
|
|
|
|
|