guest_worker/sorteerhoed/central_management.py

506 lines
25 KiB
Python

import logging
import yaml
from sorteerhoed import HITStore
import os
import subprocess
import boto3
import threading
from queue import Queue
from sorteerhoed.plotter import Plotter
import queue
from sorteerhoed.sqs import SqsListener
from sorteerhoed.webserver import Server
import time
from sorteerhoed.Signal import Signal
import io
from PIL import Image
import datetime
from shutil import copyfile
import colorsys
class Level(object):
# Level effect adapted from https://stackoverflow.com/a/3125421
def __init__(self, minv, maxv, gamma):
self.minv= minv/255.0
self.maxv= maxv/255.0
self._interval= self.maxv - self.minv
self._invgamma= 1.0/gamma
def new_level(self, value):
if value <= self.minv: return 0.0
if value >= self.maxv: return 1.0
return ((value - self.minv)/self._interval)**self._invgamma
def convert_and_level(self, band_values):
h, s, v= colorsys.rgb_to_hsv(*(i/255.0 for i in band_values))
new_v= self.new_level(v)
return tuple(int(255*i)
for i
in colorsys.hsv_to_rgb(h, s, new_v))
@classmethod
def level_image(cls, image, minv=0, maxv=255, gamma=1.0):
"""Level the brightness of image (a PIL.Image instance)
All values ≤ minv will become 0
All values ≥ maxv will become 255
gamma controls the curve for all values between minv and maxv"""
if image.mode != "RGB":
raise ValueError("this works with RGB images only")
new_image= image.copy()
leveller= cls(minv, maxv, gamma)
levelled_data= [
leveller.convert_and_level(data)
for data in image.getdata()]
new_image.putdata(levelled_data)
return new_image
class CentralManagement():
"""
Central management reads config and controls process flow
The HIT Store is the archive of hits
mturk thread communicates with mturk
server thread is tornado communicating with the turkers and with the status interface on the installation
Plotter thread reads plotter queue and sends it to there
Scanner is for now a simpe imagescan command
SQS: thread that listens for updates from Amazon
"""
def __init__(self, debug_mode):
self.logger = logging.getLogger("sorteerhoed").getChild('management')
self.debug = debug_mode
self.currentHit = None
self.lastHitTime = None
self.eventQueue = Queue()
self.isRunning = threading.Event()
self.isScanning = threading.Event()
self.scanLock = threading.Lock()
self.notPaused = threading.Event()
def loadConfig(self, filename, args):
self.configFile = filename
self.args = args
self.reloadConfig()
varDb = os.path.join(
# self.config['storage_dir'],
'hit_store.db'
)
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
self.logger.debug(f"Loaded configuration: {self.config}")
def reloadConfig(self):
# reload config settings
with open(self.configFile, 'r') as fp:
self.logger.debug('Load config from {}'.format(self.configFile))
self.config = yaml.safe_load(fp)
if self.args.no_plotter:
self.config['dummy_plotter'] = True
self.config['for_real'] = self.args.for_real
# self.panopticon = Panopticon(self, self.config, self.voiceStorage)
def start(self):
self.isRunning.set()
self.notPaused.set()
try:
# M-turk connection
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com'
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
self.mturk = boto3.client('mturk',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name='us-east-1',
endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX
)
self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}")
if not self.config['for_real']:
self.logger.info("Remove block from sandbox worker account")
self.mturk.delete_worker_block(WorkerId='A1CK46PK9VEUH5', Reason='Myself on Sandbox')
# clear any pending hits:
pending_hits = self.mturk.list_hits(MaxResults=100)
for pending_hit in pending_hits['HITs']:
# print(pending_hit['HITId'], pending_hit['HITStatus'])
if pending_hit['HITStatus'] == 'Assignable':
self.logger.warn(f"Expire stale hit: {pending_hit['HITId']}: {pending_hit['HITStatus']}")
self.mturk.update_expiration_for_hit(
HITId=pending_hit['HITId'],
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
self.mturk.delete_hit(HITId=pending_hit['HITId'])
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
sqsThread.start()
# the plotter itself
self.plotter = Plotter(self.config, self.eventQueue, self.isRunning, self.scanLock)
plotterThread = threading.Thread(target=self.plotter.start, name='plotter')
plotterThread.start()
# webserver for turks and status
self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store)
serverThread = threading.Thread(target=self.server.start, name='server')
serverThread.start()
# event listener:
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
dispatcherThread.start()
#
#
self.eventQueue.put(Signal('start', {'ding':'test'}))
while self.isRunning.is_set():
time.sleep(.5)
except Exception as e:
self.logger.exception(e)
finally:
self.logger.warning("Stopping Central Managment")
self.isRunning.clear()
self.server.stop()
self.expireCurrentHit()
def expireCurrentHit(self):
if self.currentHit and self.currentHit.hit_id: # hit pending
self.logger.warn(f"Delete hit: {self.currentHit.hit_id}")
self.mturk.update_expiration_for_hit(
HITId=self.currentHit.hit_id,
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
try:
self.mturk.delete_hit(HITId=self.currentHit.hit_id)
except Exception as e:
self.logger.exception(e)
def eventListener(self):
while self.isRunning.is_set():
try:
signal = self.eventQueue.get(True, 1)
except queue.Empty:
pass
# self.logger.log(5, "Empty queue.")
else:
try:
"""
Possible events:
- SQS events: accept/abandoned/returned/submitted
- webserver events: open, draw, submit
- scan complete
- HIT created
- Plotter complete
-
"""
#TODO: make level debug()
self.logger.info(f"SIGNAL: {signal}")
if signal.name == 'start':
self.makeHit()
self.lastHitTime = datetime.datetime.now()
pass
elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT
self.currentHit.scanned_at = datetime.datetime.utcnow()
self.server.statusPage.set('state', self.currentHit.getStatus())
time_diff = datetime.datetime.now() - self.lastHitTime
to_wait = 10 - time_diff.total_seconds()
if to_wait > 0:
self.logger.warn(f"Sleep until next hit: {to_wait}s")
time.sleep(to_wait)
else:
self.logger.info(f"No need to wait: {to_wait}s")
self.makeHit()
self.lastHitTime = datetime.datetime.now()
elif signal.name == 'scan.start':
pass
elif signal.name == 'scan.finished':
pass
elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue
for name, value in signal.params.items():
if name == 'hit_id':
continue
if name == 'ip':
self.currentHit.turk_ip = value
if name == 'location':
self.currentHit.turk_country = value
self.logger.debug(f'Set status: {name} to {value}')
self.server.statusPage.set(name, value)
elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.setLight(True)
self.server.statusPage.set('state', self.currentHit.getStatus())
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
elif signal.name == 'server.submit':
self.currentHit.submit_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.plotter.park()
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
# park always triggers a plotter.finished after being processed
elif signal.name[:4] == 'sqs.':
if signal.params['event']['HITId'] != self.currentHit.hit_id:
self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT")
sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId'])
updateStatus = False
else:
sqsHit = self.currentHit
updateStatus = True
if signal.name == 'sqs.AssignmentAccepted':
self.logger.info(f'Set status progress to accepted')
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsHit.worker_id = signal.params['event']['WorkerId']
if updateStatus:
self.server.statusPage.set('worker_id', sqsHit.worker_id)
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentAbandoned':
self.logger.info(f'Set status progress to abandoned')
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.')
elif signal.name == 'sqs.AssignmentReturned':
self.logger.info(f'Set status progress to returned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentSubmitted':
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
self.logger.info(f'Set status progress to submitted')
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
sqsHit.answer = signal.params['event']['Answer']
if sqsHit.uuid not in sqsHit.answer:
self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}")
if not sqsHit.submit_page_at:
# page not submitted, hit is. Nevertheless, create new hit.
try:
self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment')
except Exception as e:
self.logger.exception(e)
self.makeHit()
else:
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# block de worker na succesvolle submit, om dubbele workers te voorkomen
# TODO: Disabled after worker mail, use quals instead
#self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.')
#self.logger.warn("Block worker after submission")
self.store.saveHIT(sqsHit)
if updateStatus:
self.logger.warning(f'update status: {sqsHit.getStatus()}')
# TODO: have HITStore/HIT take care of this by emitting a signal
# only update status if it is the currentHit
self.server.statusPage.set('state', sqsHit.getStatus())
else:
self.logger.warning('DO NOT update status')
elif signal.name == 'plotter.finished':
if self.currentHit and self.currentHit.submit_page_at:
self.setLight(False)
scan = threading.Thread(target=self.scanImage, name='scan')
scan.start()
self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at)
self.server.statusPage.set('state', self.currentHit.getStatus())
else:
self.logger.critical(f"Unknown signal: {signal.name}")
except Exception as e:
self.logger.critical(f"Exception on handling {signal}")
self.logger.exception(e)
def makeHit(self):
self.expireCurrentHit() # expire hit if it is there
self.server.statusPage.reset()
self.reloadConfig() # reload new config values if they are set
# self.notPaused.wait()
self.currentHit = self.store.createHIT()
self.store.currentHit = self.currentHit
self.logger.info(f"Make HIT {self.currentHit.id}")
# question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
question = '''<?xml version="1.0" encoding="UTF-8"?>
<ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
<ExternalURL>https://guest.rubenvandeven.com:8888/draw?id={HIT_NR}</ExternalURL>
<FrameHeight>0</FrameHeight>
</ExternalQuestion>
'''.replace("{HIT_NR}",str(self.currentHit.id))
estimatedHitDuration = self.store.getEstimatedHitDuration()
# set minimum rate, if they take longer we increase the pay
fee = max(self.config['minimum_fee'], (self.config['hour_rate_aim']/3600.) * estimatedHitDuration)
self.currentHit.fee = fee
self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
new_hit = self.mturk.create_hit(
Title = 'Trace the drawn line',
Description = 'Draw a line over the sketched line in the image',
Keywords = 'polygons, trace, draw',
Reward = "{:.2f}".format(fee),
MaxAssignments = 1,
LifetimeInSeconds = self.config['hit_lifetime'],
AssignmentDurationInSeconds = self.store.getHitTimeout(),
AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'],
Question = question,
)
self.logger.info(f"Created hit: {new_hit}")
if not self.config['for_real']:
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
else:
self.logger.info("https://worker.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
self.currentHit.hit_id = new_hit['HIT']['HITId']
self.store.saveHIT(self.currentHit)
# TODO: have HITStore/HIT take care of this by emitting a signal
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
self.server.statusPage.set('hit_created', self.currentHit.created_at)
self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
self.server.statusPage.set('state', self.currentHit.getStatus())
# mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']:
notification_info= self.mturk.update_notification_settings(
HITTypeId=new_hit['HIT']['HITTypeId'],
Notification = {
'Destination' : self.config['amazon']['sqs_url'],
'Transport': 'SQS',
'Version': '2014-08-15',
'EventTypes': [
'AssignmentAccepted',
'AssignmentAbandoned',
'AssignmentReturned',
'AssignmentSubmitted',
# 'AssignmentRejected',
# 'AssignmentApproved',
# 'HITCreated',
# 'HITExpired',
# 'HITReviewable',
# 'HITExtended',
# 'HITDisposed',
# 'Ping',
]
},
Active=True
)
self.logger.debug(notification_info)
def cleanDrawing(self):
with self.scanLock:
self.eventQueue.put(Signal('scan.start'))
# Scan to reset
cmd = [
'sudo', 'scanimage', '-d', 'epkowa','--resolution=100',
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
_, e = proc.communicate(80)
time.sleep(5) # sleep a few seconds for scanner to return to start position
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
self.eventQueue.put(Signal('system.reset'))
self.eventQueue.put(Signal('scan.finished'))
def reset(self) -> str:
self.plotter.park()
# Very confusing to have scanning on a reset (because often nothing has happened), so don't do itd
# scan = threading.Thread(target=self.cleanDrawing, name='reset')
# scan.start()
self.server.statusPage.clearAssignment()
def scanImage(self) -> str:
"""
Run scanimage on scaner and returns a string with the filename
"""
cmd = [
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
'--resolution=100', # lower res, faster (more powerful) scan & wipe
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
self.logger.info(f"{cmd}")
filename = self.currentHit.getImagePath()
with self.scanLock:
self.eventQueue.put(Signal('scan.start'))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
#TODO: should clear self.isRunning.clear() ?
try:
f = io.BytesIO(o)
img = Image.open(f)
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
tunedImg.save(filename)
except Exception as e:
self.logger.critical("Cannot create image from scan. Did scanner work?")
self.logger.exception(e)
# TODO: create
copyfile('www/basic.svg', filename)
time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
def setLight(self, on):
value = 1 if on else 0
cmd = [
'usbrelay', f'HURTM_1={value}'
]
self.logger.info(f"Trigger light {cmd}")
code = subprocess.call(cmd)
if code > 0:
self.logger.warning(f"Error on light change: {code}")