584 lines
29 KiB
Python
584 lines
29 KiB
Python
import logging
|
|
import yaml
|
|
from sorteerhoed import HITStore
|
|
import os
|
|
import subprocess
|
|
import boto3
|
|
import threading
|
|
from queue import Queue
|
|
from sorteerhoed.plotter import Plotter
|
|
import queue
|
|
from sorteerhoed.sqs import SqsListener
|
|
from sorteerhoed.webserver import Server
|
|
import time
|
|
from sorteerhoed.Signal import Signal
|
|
import io
|
|
from PIL import Image
|
|
import datetime
|
|
from shutil import copyfile
|
|
import colorsys
|
|
import tqdm
|
|
|
|
|
|
class Level(object):
|
|
"""
|
|
Level image effect adapted from https://stackoverflow.com/a/3125421
|
|
"""
|
|
|
|
def __init__(self, minv, maxv, gamma):
|
|
self.minv= minv/255.0
|
|
self.maxv= maxv/255.0
|
|
self._interval= self.maxv - self.minv
|
|
self._invgamma= 1.0/gamma
|
|
|
|
def new_level(self, value):
|
|
if value <= self.minv: return 0.0
|
|
if value >= self.maxv: return 1.0
|
|
return ((value - self.minv)/self._interval)**self._invgamma
|
|
|
|
def convert_and_level(self, band_values):
|
|
h, s, v= colorsys.rgb_to_hsv(*(i/255.0 for i in band_values))
|
|
new_v= self.new_level(v)
|
|
return tuple(int(255*i)
|
|
for i
|
|
in colorsys.hsv_to_rgb(h, s, new_v))
|
|
|
|
@classmethod
|
|
def level_image(cls, image, minv=0, maxv=255, gamma=1.0):
|
|
"""Level the brightness of image (a PIL.Image instance)
|
|
All values ≤ minv will become 0
|
|
All values ≥ maxv will become 255
|
|
gamma controls the curve for all values between minv and maxv"""
|
|
|
|
if image.mode != "RGB":
|
|
raise ValueError("this works with RGB images only")
|
|
|
|
new_image= image.copy()
|
|
|
|
leveller= cls(minv, maxv, gamma)
|
|
levelled_data= [
|
|
leveller.convert_and_level(data)
|
|
for data in image.getdata()]
|
|
new_image.putdata(levelled_data)
|
|
return new_image
|
|
|
|
class CentralManagement():
|
|
"""
|
|
Central management reads config and controls process flow
|
|
|
|
The HIT Store is the archive of hits
|
|
mturk thread communicates with mturk
|
|
server thread is tornado communicating with the turkers and with the status interface on the installation
|
|
Plotter thread reads plotter queue and sends it to there
|
|
Scanner is for now a simpe imagescan command
|
|
SQS: thread that listens for updates from Amazon
|
|
"""
|
|
def __init__(self, debug_mode):
|
|
self.logger = logging.getLogger("sorteerhoed").getChild('management')
|
|
self.debug = debug_mode
|
|
self.currentHit = None
|
|
self.lastHitTime = None
|
|
|
|
self.eventQueue = Queue()
|
|
self.statusPageQueue = Queue()
|
|
self.isRunning = threading.Event()
|
|
self.isScanning = threading.Event()
|
|
self.scanLock = threading.Lock()
|
|
self.notPaused = threading.Event()
|
|
self.lightStatus = 0
|
|
|
|
|
|
def loadConfig(self, filename, args):
|
|
self.configFile = filename
|
|
self.args = args
|
|
|
|
self.reloadConfig()
|
|
|
|
varDb = os.path.join(
|
|
# self.config['storage_dir'],
|
|
'hit_store.db'
|
|
)
|
|
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
|
|
self.store.registerUpdateHook(self.updateLightHook) # change light based on status
|
|
|
|
self.logger.debug(f"Loaded configuration: {self.config}")
|
|
|
|
def reloadConfig(self):
|
|
# reload config settings
|
|
with open(self.configFile, 'r') as fp:
|
|
self.logger.debug('Load config from {}'.format(self.configFile))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
if self.args.no_plotter:
|
|
self.config['dummy_plotter'] = True
|
|
self.config['for_real'] = self.args.for_real
|
|
|
|
|
|
# self.panopticon = Panopticon(self, self.config, self.voiceStorage)
|
|
def start(self):
|
|
self.isRunning.set()
|
|
self.notPaused.set()
|
|
|
|
try:
|
|
|
|
# M-turk connection
|
|
MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
|
|
MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com'
|
|
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client
|
|
self.mturk = boto3.client('mturk',
|
|
aws_access_key_id = self.config['amazon']['user_id'],
|
|
aws_secret_access_key = self.config['amazon']['user_secret'],
|
|
region_name='us-east-1',
|
|
endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX
|
|
)
|
|
|
|
self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}")
|
|
if not self.config['for_real']:
|
|
self.logger.info("Remove block from sandbox worker account")
|
|
self.mturk.delete_worker_block(WorkerId='A1CK46PK9VEUH5', Reason='Myself on Sandbox')
|
|
|
|
# clear any pending hits:
|
|
pending_hits = self.mturk.list_hits(MaxResults=100)
|
|
for pending_hit in pending_hits['HITs']:
|
|
# print(pending_hit['HITId'], pending_hit['HITStatus'])
|
|
if pending_hit['HITStatus'] == 'Assignable':
|
|
self.logger.warn(f"Expire stale hit: {pending_hit['HITId']}: {pending_hit['HITStatus']}")
|
|
self.mturk.update_expiration_for_hit(
|
|
HITId=pending_hit['HITId'],
|
|
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
|
|
)
|
|
self.mturk.delete_hit(HITId=pending_hit['HITId'])
|
|
staleHit = self.store.getHitByRemoteId(pending_hit['HITId'])
|
|
staleHit.delete()
|
|
self.store.saveHIT(staleHit)
|
|
|
|
|
|
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
|
|
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
|
|
sqsThread.start()
|
|
|
|
# the plotter itself
|
|
self.plotter = Plotter(self.config, self.eventQueue, self.isRunning, self.scanLock)
|
|
plotterThread = threading.Thread(target=self.plotter.start, name='plotter')
|
|
plotterThread.start()
|
|
|
|
# webserver for turks and status
|
|
self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store)
|
|
serverThread = threading.Thread(target=self.server.start, name='server')
|
|
serverThread.start()
|
|
|
|
# event listener:
|
|
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
|
|
dispatcherThread.start()
|
|
|
|
self.eventQueue.put(Signal('start', {'ding':'test'}))
|
|
|
|
while self.isRunning.is_set():
|
|
time.sleep(.5)
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
finally:
|
|
self.logger.warning("Stopping Central Managment")
|
|
self.isRunning.clear()
|
|
self.server.stop()
|
|
|
|
self.expireCurrentHit()
|
|
|
|
def expireCurrentHit(self):
|
|
if self.currentHit and not self.currentHit.isConfirmed():
|
|
if self.currentHit.hit_id: # hit pending at Amazon
|
|
self.logger.warn(f"Expire hit: {self.currentHit.hit_id}")
|
|
self.mturk.update_expiration_for_hit(
|
|
HITId=self.currentHit.hit_id,
|
|
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
|
|
)
|
|
try:
|
|
self.mturk.delete_hit(HITId=self.currentHit.hit_id)
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
|
|
if not self.currentHit.isSubmitted():
|
|
self.currentHit.delete()
|
|
self.store.saveHIT(self.currentHit)
|
|
|
|
|
|
def eventListener(self):
|
|
while self.isRunning.is_set():
|
|
try:
|
|
signal = self.eventQueue.get(True, 1)
|
|
except queue.Empty:
|
|
pass
|
|
# self.logger.log(5, "Empty queue.")
|
|
else:
|
|
try:
|
|
"""
|
|
Possible events:
|
|
- SQS events: accept/abandoned/returned/submitted
|
|
- webserver events: open, draw, submit
|
|
- scan complete
|
|
- HIT created
|
|
- Plotter complete
|
|
-
|
|
"""
|
|
self.logger.info(f"SIGNAL: {signal}")
|
|
if signal.name == 'start':
|
|
self.makeHit()
|
|
self.lastHitTime = datetime.datetime.now()
|
|
elif signal.name == 'hit.scan':
|
|
# start a scan
|
|
if signal.params['id'] != self.currentHit.id:
|
|
self.logger.info(f"Hit.scan had wrong id: {signal}")
|
|
continue
|
|
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning'))
|
|
|
|
elif signal.name == 'hit.scanned':
|
|
# TODO: wrap up hit & make new HIT
|
|
if signal.params['hit_id'] != self.currentHit.id:
|
|
self.logger.info(f"Hit.scanned had wrong id: {signal}")
|
|
continue
|
|
|
|
self.currentHit.scanned_at = datetime.datetime.utcnow()
|
|
self.store.saveHIT(self.currentHit)
|
|
|
|
time_diff = datetime.datetime.now() - self.lastHitTime
|
|
to_wait = 10 - time_diff.total_seconds()
|
|
# self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan'))
|
|
|
|
if to_wait > 0:
|
|
self.logger.warn(f"Sleep until next hit: {to_wait}s")
|
|
time.sleep(to_wait)
|
|
else:
|
|
self.logger.info(f"No need to wait: {to_wait}s")
|
|
|
|
self.makeHit()
|
|
self.lastHitTime = datetime.datetime.now()
|
|
elif signal.name == 'hit.creating':
|
|
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit'))
|
|
pass
|
|
elif signal.name == 'hit.created':
|
|
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit'))
|
|
pass
|
|
elif signal.name == 'scan.start':
|
|
pass
|
|
elif signal.name == 'scan.finished':
|
|
# probably see hit.scanned
|
|
pass
|
|
|
|
elif signal.name == 'hit.assignment':
|
|
# Create new assignment
|
|
if signal.params['hit_id'] != self.currentHit.id:
|
|
continue
|
|
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
|
|
|
|
elif signal.name == 'assignment.info':
|
|
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
|
|
if not assignment:
|
|
self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}")
|
|
|
|
change = False
|
|
for name, value in signal.params.items():
|
|
if name == 'ip':
|
|
assignment.turk_ip = value
|
|
if name == 'location':
|
|
assignment.turk_country = value
|
|
if name == 'os':
|
|
assignment.turk_os = value
|
|
if name == 'browser':
|
|
assignment.turk_browser = value
|
|
change = True
|
|
self.logger.debug(f'Set assignment: {name} to {value}')
|
|
|
|
if change:
|
|
self.store.saveAssignment(assignment)
|
|
|
|
elif signal.name == 'server.open':
|
|
self.currentHit.open_page_at = datetime.datetime.utcnow()
|
|
self.store.saveHIT(self.currentHit)
|
|
self.setLight(True)
|
|
elif signal.name == 'server.close':
|
|
if not signal.params['abandoned']:
|
|
continue
|
|
a = self.currentHit.getLastAssignment()
|
|
if a.assignment_id != signal.params['assignment_id']:
|
|
self.logger.info(f"Close of older assignment_id: {signal}")
|
|
continue
|
|
self.logger.critical(f"Websocket closed of active assignment_id: {signal}")
|
|
a.abandoned_at = datetime.datetime.utcnow()
|
|
self.store.saveAssignment(a)
|
|
self.plotter.park()
|
|
self.setLight(False)
|
|
elif signal.name == 'assignment.submit':
|
|
a = self.currentHit.getLastAssignment()
|
|
if a.assignment_id != signal.params['assignment_id']:
|
|
self.logger.critical(f"Submit of invalid assignment_id: {signal}")
|
|
|
|
a.submit_page_at = datetime.datetime.utcnow()
|
|
self.store.saveAssignment(a)
|
|
self.plotter.park()
|
|
# park always triggers a plotter.finished after being processed
|
|
|
|
elif signal.name[:4] == 'sqs.':
|
|
if signal.params['event']['HITId'] != self.currentHit.hit_id:
|
|
self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT")
|
|
sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId'])
|
|
updateStatus = False
|
|
else:
|
|
sqsHit = self.currentHit
|
|
updateStatus = True
|
|
|
|
sqsAssignment = sqsHit.getAssignmentById(signal.params['event']['AssignmentId'])
|
|
if not sqsAssignment:
|
|
self.logger.critical(f"Invalid assignmentId given for hit: {signal.params['event']}")
|
|
continue
|
|
|
|
if signal.name == 'sqs.AssignmentAccepted':
|
|
self.logger.info(f'Set status progress to accepted')
|
|
sqsAssignment.accept_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
sqsAssignment.worker_id = signal.params['event']['WorkerId']
|
|
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
elif signal.name == 'sqs.AssignmentAbandoned':
|
|
self.logger.info(f'Set status progress to abandoned')
|
|
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
sqsAssignment.abandoned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
# if updateStatus:
|
|
# self.setLight(False)
|
|
self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.')
|
|
|
|
elif signal.name == 'sqs.AssignmentReturned':
|
|
self.logger.info(f'Set status progress to returned')
|
|
sqsAssignment.returned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
|
|
elif signal.name == 'sqs.AssignmentSubmitted':
|
|
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
|
|
self.logger.info(f'Set status progress to submitted')
|
|
sqsAssignment.answer = signal.params['event']['Answer']
|
|
if sqsAssignment.uuid not in sqsAssignment.answer:
|
|
self.logger.critical(f"Not a valid answer given?! {sqsAssignment.answer}")
|
|
|
|
if not sqsAssignment.submit_page_at:
|
|
# page not submitted, hit is. Nevertheless, create new hit.
|
|
try:
|
|
self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment')
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
self.makeHit()
|
|
else:
|
|
sqsAssignment.confirmed_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
# block de worker na succesvolle submit, om dubbele workers te voorkomen
|
|
# TODO: Disabled after worker mail, use quals instead
|
|
#self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.')
|
|
#self.logger.warn("Block worker after submission")
|
|
|
|
|
|
self.store.saveHIT(sqsHit)
|
|
|
|
elif signal.name == 'plotter.finished':
|
|
# is _always_ triggered after submit due to plotter.park()
|
|
if self.currentHit and self.currentHit.isSubmitted():
|
|
self.currentHit.plotted_at = datetime.datetime.utcnow()
|
|
self.store.saveHIT(self.currentHit)
|
|
|
|
self.logger.info("Start scan thread")
|
|
scan = threading.Thread(target=self.scanImage, name='scan')
|
|
scan.start()
|
|
elif signal.name == 'plotter.parked':
|
|
# should this have the code from plotter.finished?
|
|
pass
|
|
else:
|
|
self.logger.critical(f"Unknown signal: {signal.name}")
|
|
except Exception as e:
|
|
self.logger.critical(f"Exception on handling {signal}")
|
|
self.logger.exception(e)
|
|
|
|
def makeHit(self):
|
|
self.expireCurrentHit() # expire hit if it is there
|
|
|
|
self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'}))
|
|
|
|
self.reloadConfig() # reload new config values if they are set
|
|
|
|
# self.notPaused.wait()
|
|
|
|
self.currentHit = self.store.createHIT()
|
|
self.store.currentHit = self.currentHit
|
|
|
|
self.logger.info(f"Make HIT {self.currentHit.id}")
|
|
|
|
question = '''<?xml version="1.0" encoding="UTF-8"?>
|
|
<ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
|
|
<ExternalURL>https://guest.rubenvandeven.com:8888/draw?id={HIT_NR}</ExternalURL>
|
|
<FrameHeight>0</FrameHeight>
|
|
</ExternalQuestion>
|
|
'''.replace("{HIT_NR}",str(self.currentHit.id))
|
|
estimatedHitDuration = self.store.getEstimatedHitDuration()
|
|
|
|
|
|
# set minimum rate, if they take longer we increase the pay
|
|
fee = max(self.config['minimum_fee'], (self.config['hour_rate_aim']/3600.) * estimatedHitDuration)
|
|
self.currentHit.fee = fee
|
|
self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
|
|
new_hit = self.mturk.create_hit(
|
|
Title = 'Trace the drawn line',
|
|
Description = 'Draw a line over the sketched line in the image',
|
|
Keywords = 'polygons, trace, draw',
|
|
Reward = "{:.2f}".format(fee),
|
|
MaxAssignments = 1,
|
|
LifetimeInSeconds = self.config['hit_lifetime'],
|
|
AssignmentDurationInSeconds = self.store.getHitTimeout(),
|
|
AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'],
|
|
Question = question,
|
|
)
|
|
|
|
self.logger.info(f"Created hit: {new_hit}")
|
|
if not self.config['for_real']:
|
|
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
|
|
else:
|
|
self.logger.info("https://worker.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
|
|
|
|
self.currentHit.hit_id = new_hit['HIT']['HITId']
|
|
|
|
self.store.saveHIT(self.currentHit)
|
|
# self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
|
|
# self.server.statusPage.set('hit_created', self.currentHit.created_at)
|
|
# self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
|
|
# self.server.statusPage.set('state', self.currentHit.getStatus())
|
|
|
|
self.eventQueue.put(Signal('hit.created', {'id': self.currentHit.id, 'remote_id': self.currentHit.hit_id}))
|
|
|
|
# mturk.send_test_event_notification()
|
|
if self.config['amazon']['sqs_url']:
|
|
notification_info= self.mturk.update_notification_settings(
|
|
HITTypeId=new_hit['HIT']['HITTypeId'],
|
|
Notification = {
|
|
'Destination' : self.config['amazon']['sqs_url'],
|
|
'Transport': 'SQS',
|
|
'Version': '2014-08-15',
|
|
'EventTypes': [
|
|
'AssignmentAccepted',
|
|
'AssignmentAbandoned',
|
|
'AssignmentReturned',
|
|
'AssignmentSubmitted',
|
|
# 'AssignmentRejected',
|
|
# 'AssignmentApproved',
|
|
# 'HITCreated',
|
|
# 'HITExpired',
|
|
# 'HITReviewable',
|
|
# 'HITExtended',
|
|
# 'HITDisposed',
|
|
# 'Ping',
|
|
]
|
|
},
|
|
Active=True
|
|
)
|
|
self.logger.debug(notification_info)
|
|
|
|
def cleanDrawing(self):
|
|
with self.scanLock:
|
|
self.eventQueue.put(Signal('scan.start'))
|
|
# Scan to reset
|
|
cmd = [
|
|
'sudo', 'scanimage', '-d', 'epkowa','--resolution=100',
|
|
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
|
|
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
|
|
'-x',str(181),
|
|
'-y',str(245)
|
|
]
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
|
# opens connection to scanner, but only starts scanning when output becomes ready:
|
|
_, e = proc.communicate(80)
|
|
time.sleep(5) # sleep a few seconds for scanner to return to start position
|
|
if e:
|
|
self.logger.critical(f"Scanner caused: {e.decode()}")
|
|
|
|
self.eventQueue.put(Signal('system.reset'))
|
|
self.eventQueue.put(Signal('scan.finished'))
|
|
|
|
def reset(self) -> str:
|
|
self.plotter.park()
|
|
# Very confusing to have scanning on a reset (because often nothing has happened), so don't do itd
|
|
# scan = threading.Thread(target=self.cleanDrawing, name='reset')
|
|
# scan.start()
|
|
self.server.statusPage.clearAssignment()
|
|
|
|
def scanImage(self) -> str:
|
|
"""
|
|
Run scanimage on scaner and returns a string with the filename
|
|
"""
|
|
|
|
with self.scanLock:
|
|
if self.config['dummy_plotter']:
|
|
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
|
|
self.eventQueue.put(Signal('scan.start'))
|
|
self.logger.warning("Fake scanner for a few seconds")
|
|
for i in tqdm.tqdm(range(5)):
|
|
time.sleep(1)
|
|
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
|
|
self.eventQueue.put(Signal('scan.finished'))
|
|
return
|
|
|
|
|
|
cmd = [
|
|
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
|
|
'--resolution=100', # lower res, faster (more powerful) scan & wipe
|
|
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
|
|
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
|
|
'-x',str(181),
|
|
'-y',str(245)
|
|
]
|
|
self.logger.info(f"{cmd}")
|
|
filename = self.currentHit.getImagePath()
|
|
|
|
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
|
|
self.eventQueue.put(Signal('scan.start'))
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# opens connection to scanner, but only starts scanning when output becomes ready:
|
|
o, e = proc.communicate(80)
|
|
if e:
|
|
self.logger.critical(f"Scanner caused: {e.decode()}")
|
|
# Should this clear self.isRunning.clear() ?
|
|
|
|
try:
|
|
f = io.BytesIO(o)
|
|
img = Image.open(f)
|
|
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
|
|
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
|
|
tunedImg.save(filename)
|
|
except Exception as e:
|
|
self.logger.critical("Cannot create image from scan. Did scanner work?")
|
|
self.logger.exception(e)
|
|
copyfile('www/basic.svg', filename)
|
|
|
|
time.sleep(5) # sleep a few seconds for scanner to return to start position
|
|
|
|
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
|
|
self.eventQueue.put(Signal('scan.finished'))
|
|
|
|
def setLight(self, on):
|
|
value = 1 if on else 0
|
|
|
|
if self.lightStatus == value:
|
|
return
|
|
self.lightStatus = value
|
|
|
|
cmd = [
|
|
'usbrelay', f'HURTM_1={value}'
|
|
]
|
|
self.logger.info(f"Trigger light {cmd}")
|
|
code = subprocess.call(cmd)
|
|
if code > 0:
|
|
self.logger.warning(f"Error on light change: {code}")
|
|
|
|
def updateLightHook(self, hit = None):
|
|
# ignore hit attribute, which comes from the HITstore
|
|
self.setLight(self.getLightStatus())
|
|
|
|
def getLightStatus(self) -> bool:
|
|
if not self.currentHit:
|
|
return False
|
|
a = self.currentHit.getLastAssignment()
|
|
if not a:
|
|
return False
|
|
if self.currentHit.plotted_at: # wait till plotter is done
|
|
return False
|
|
return True
|