WIP on the server

This commit is contained in:
Ruben van de Ven 2019-10-23 22:33:37 +02:00
parent 189223fdba
commit a9e8ec4069
17 changed files with 501 additions and 23711 deletions

View file

@ -9,6 +9,9 @@ coloredlogs = "*"
boto3 = "*" boto3 = "*"
PyYAML = "*" PyYAML = "*"
SQLAlchemy = "*" SQLAlchemy = "*"
httpagentparser = "*"
geoip2 = "*"
ink-extensions = "*"
[dev-packages] [dev-packages]

99
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "2ecd89c76c2fb319746f9616100668ec8f556d502ec72fe2644bc5b491307fa7" "sha256": "860fb04e54e9877d9409d7e4853e80ab3d7c0bee9fe3a538d99a73f460dc363d"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -21,18 +21,32 @@
}, },
"boto3": { "boto3": {
"hashes": [ "hashes": [
"sha256:839285fbd6f3ab16170af449ae9e33d0eccf97ca22de17d9ff68b8da2310ea06", "sha256:2edee79d0e78c08b6d14d4dd91c0e4b3438dd4c90c859f06a397268b1cac17b2",
"sha256:d93f1774c4bc66e02acdda2067291acb9e228a035435753cb75f83ad2904cbe3" "sha256:3cd2078144c10417eb04e4bb263ea8e50a21c4aceafb52db33e3fe71e73b48aa"
], ],
"index": "pypi", "index": "pypi",
"version": "==1.9.253" "version": "==1.10.0"
}, },
"botocore": { "botocore": {
"hashes": [ "hashes": [
"sha256:3baf129118575602ada9926f5166d82d02273c250d0feb313fc270944b27c48b", "sha256:507b8f13583a64ec2c9c112ff6e3dd8b548060adc7e1f57f25fda9fa34c2dfdb",
"sha256:dc080aed4f9b220a9e916ca29ca97a9d37e8e1d296fe89cbaeef929bf0c8066b" "sha256:c4b2ffb0f6ed7169beb260485bf5a42ee72a0a02f49f48b0557ed5e32bcf9e79"
], ],
"version": "==1.12.253" "version": "==1.13.0"
},
"certifi": {
"hashes": [
"sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50",
"sha256:fd7c7c74727ddcf00e9acd26bba8da604ffec95bf1c2144e67aff7a8b50e6cef"
],
"version": "==2019.9.11"
},
"chardet": {
"hashes": [
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
],
"version": "==3.0.4"
}, },
"coloredlogs": { "coloredlogs": {
"hashes": [ "hashes": [
@ -50,6 +64,21 @@
], ],
"version": "==0.15.2" "version": "==0.15.2"
}, },
"geoip2": {
"hashes": [
"sha256:a37ddac2d200ffb97c736da8b8ba9d5d8dc47da6ec0f162a461b681ecac53a14",
"sha256:f7ffe9d258e71a42cf622ce6350d976de1d0312b9f2fbce3975c7d838b57ecf0"
],
"index": "pypi",
"version": "==2.9.0"
},
"httpagentparser": {
"hashes": [
"sha256:23805523b61b04b9412cd98cd1fe0415401045fd5447d3b62a02ae60d22814ed"
],
"index": "pypi",
"version": "==1.9.0"
},
"humanfriendly": { "humanfriendly": {
"hashes": [ "hashes": [
"sha256:23057b10ad6f782e7bc3a20e3cb6768ab919f619bbdc0dd75691121bbde5591d", "sha256:23057b10ad6f782e7bc3a20e3cb6768ab919f619bbdc0dd75691121bbde5591d",
@ -57,6 +86,21 @@
], ],
"version": "==4.18" "version": "==4.18"
}, },
"idna": {
"hashes": [
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
"sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"
],
"version": "==2.8"
},
"ink-extensions": {
"hashes": [
"sha256:0157737f30063e48bbfb45beeef0545b01bb7d9b182b3f39695e9411f4d029eb",
"sha256:76e6e460307c96b05995790f30207fc1cc0f42d8795a4bb7b34cd2476f5ab70b"
],
"index": "pypi",
"version": "==1.0.2"
},
"jmespath": { "jmespath": {
"hashes": [ "hashes": [
"sha256:3720a4b1bd659dd2eecad0666459b9788813e032b83e7ba58578e48254e0a0e6", "sha256:3720a4b1bd659dd2eecad0666459b9788813e032b83e7ba58578e48254e0a0e6",
@ -64,6 +108,39 @@
], ],
"version": "==0.9.4" "version": "==0.9.4"
}, },
"lxml": {
"hashes": [
"sha256:02ca7bf899da57084041bb0f6095333e4d239948ad3169443f454add9f4e9cb4",
"sha256:096b82c5e0ea27ce9138bcbb205313343ee66a6e132f25c5ed67e2c8d960a1bc",
"sha256:0a920ff98cf1aac310470c644bc23b326402d3ef667ddafecb024e1713d485f1",
"sha256:17cae1730a782858a6e2758fd20dd0ef7567916c47757b694a06ffafdec20046",
"sha256:17e3950add54c882e032527795c625929613adbd2ce5162b94667334458b5a36",
"sha256:1f4f214337f6ee5825bf90a65d04d70aab05526c08191ab888cb5149501923c5",
"sha256:2e8f77db25b0a96af679e64ff9bf9dddb27d379c9900c3272f3041c4d1327c9d",
"sha256:4dffd405390a45ecb95ab5ab1c1b847553c18b0ef8ed01e10c1c8b1a76452916",
"sha256:6b899931a5648862c7b88c795eddff7588fb585e81cecce20f8d9da16eff96e0",
"sha256:726c17f3e0d7a7200718c9a890ccfeab391c9133e363a577a44717c85c71db27",
"sha256:760c12276fee05c36f95f8040180abc7fbebb9e5011447a97cdc289b5d6ab6fc",
"sha256:796685d3969815a633827c818863ee199440696b0961e200b011d79b9394bbe7",
"sha256:891fe897b49abb7db470c55664b198b1095e4943b9f82b7dcab317a19116cd38",
"sha256:a471628e20f03dcdfde00770eeaf9c77811f0c331c8805219ca7b87ac17576c5",
"sha256:a63b4fd3e2cabdcc9d918ed280bdde3e8e9641e04f3c59a2a3109644a07b9832",
"sha256:b0b84408d4eabc6de9dd1e1e0bc63e7731e890c0b378a62443e5741cfd0ae90a",
"sha256:be78485e5d5f3684e875dab60f40cddace2f5b2a8f7fede412358ab3214c3a6f",
"sha256:c27eaed872185f047bb7f7da2d21a7d8913457678c9a100a50db6da890bc28b9",
"sha256:c81cb40bff373ab7a7446d6bbca0190bccc5be3448b47b51d729e37799bb5692",
"sha256:d11874b3c33ee441059464711cd365b89fa1a9cf19ae75b0c189b01fbf735b84",
"sha256:e9c028b5897901361d81a4718d1db217b716424a0283afe9d6735fe0caf70f79",
"sha256:fe489d486cd00b739be826e8c1be188ddb74c7a1ca784d93d06fda882a6a1681"
],
"version": "==4.4.1"
},
"maxminddb": {
"hashes": [
"sha256:449a1713d37320d777d0db286286ab22890f0a176492ecf3ad8d9319108f2f79"
],
"version": "==1.5.1"
},
"python-dateutil": { "python-dateutil": {
"hashes": [ "hashes": [
"sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb", "sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb",
@ -91,6 +168,13 @@
"index": "pypi", "index": "pypi",
"version": "==5.1.2" "version": "==5.1.2"
}, },
"requests": {
"hashes": [
"sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4",
"sha256:9cf5292fcd0f598c671cfc1e0d7d1a7f13bb8085e9a590f48c010551dc6c4b31"
],
"version": "==2.22.0"
},
"s3transfer": { "s3transfer": {
"hashes": [ "hashes": [
"sha256:6efc926738a3cd576c2a79725fed9afde92378aa5c6a957e3af010cb019fac9d", "sha256:6efc926738a3cd576c2a79725fed9afde92378aa5c6a957e3af010cb019fac9d",
@ -130,7 +214,6 @@
"sha256:3de946ffbed6e6746608990594d08faac602528ac7015ac28d33cee6a45b7398", "sha256:3de946ffbed6e6746608990594d08faac602528ac7015ac28d33cee6a45b7398",
"sha256:9a107b99a5393caf59c7aa3c1249c16e6879447533d0887f4336dde834c7be86" "sha256:9a107b99a5393caf59c7aa3c1249c16e6879447533d0887f4336dde834c7be86"
], ],
"markers": "python_version >= '3.4'",
"version": "==1.25.6" "version": "==1.25.6"
} }
}, },

22
README.md Normal file
View file

@ -0,0 +1,22 @@
## Webserver
Webserver is published to the web trough ssh remote forward. In /etc/ssh/sshd_config set `GatewayPorts yes`.
Then start `autossh` to maintain the connection:
```bash
autossh -M 0 -o "ServerAliveInterval 30" -o "ServerAliveCountMax 3" -R 8888:localhost:8888 here.rubenvandeven.com
```
To resolve the country the (imprecise) GeoLite2 Free Country dataset is used. Download it [from MaxMind](https://dev.maxmind.com/geoip/geoip2/geolite2/) and store it in the project root folder.
## Scanning
For scanning run visudo and add to the sudoers file:
```
mt ALL=(ALL) NOPASSWD: /usr/bin/scanimage
```

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -42,7 +42,7 @@ class HIT(Base):
hit_id = Column(String(255)) # amazon's hit id hit_id = Column(String(255)) # amazon's hit id
created_at = Column(DateTime, default=datetime.datetime.now()) created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, default=datetime.datetime.now()) updated_at = Column(DateTime, default=datetime.datetime.now())
uniqid = Column(String(32), default=uuid.uuid4().hex) uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
assignment_id = Column(String(255), default = None) assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None) worker_id = Column(String(255), default = None)
accept_time = Column(DateTime, default=None) accept_time = Column(DateTime, default=None)
@ -56,38 +56,73 @@ class HIT(Base):
turk_screen_height = Column(Integer, default = None) turk_screen_height = Column(Integer, default = None)
def getImagePath(self):
return os.path.join('www', self.getImageUrl())
def getImageUrl(self):
return f"scans/{self.id}.png"
class Store: class Store:
def __init__(self, db_filename, logLevel=0): def __init__(self, db_filename, logLevel=0):
path = os.path.abspath(db_filename) path = os.path.abspath(db_filename)
if logLevel <= logging.DEBUG: if logLevel <= logging.DEBUG:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
self.engine = create_engine('sqlite:///'+path, echo=False) self.engine = create_engine('sqlite:///'+path, echo=False, connect_args={'check_same_thread': False})
Base.metadata.create_all(self.engine) Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine) self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
@contextmanager @contextmanager
def getSession(self): def getSession(self):
"""Provide a transactional scope around a series of operations.""" """Provide a transactional scope around a series of operations."""
session = self.Session()
try: try:
yield session yield self.session
session.commit() self.session.commit()
except: except:
session.rollback() self.session.rollback()
raise raise
finally:
session.close()
def getHits(self, session): def getHits(self, session):
return session.query(Source).order_by(HIT.created_at.desc()) return self.session.query(Source).order_by(HIT.created_at.desc())
def getHitById(self, hitId):
return self.session.query(HIT).\
filter(HIT.id==hitId).one()
def getHitByRemoteId(self, amazonHitId):
return self.session.query(HIT).\
filter(HIT.hit_id==amazonHitId).one()
def getLastSubmittedHit(self):
return self.session.query(HIT).\
filter(HIT.submit_page_at!=None).\
order_by(HIT.submit_page_at.desc()).first()
def createHIT(self):
with self.getSession() as s:
hit = HIT()
s.add(hit)
s.flush()
s.refresh(hit)
logger.info(f"Created HIT {hit.id}")
return hit
def saveHIT(self, hit):
with self.getSession() as s:
logger.info(f"Updating hit! {hit.id}")
# s.flush()
def addHIT(self, hit: HIT): def addHIT(self, hit: HIT):
with self.getSession() as s: with self.getSession() as s:
s.add(hit) s.add(hit)
s.flush() s.flush()
s.refresh(hit) s.refresh(hit)
logging.info(f"Added {hit.id}") logger.info(f"Added {hit.id}")
# def rmSource(self, id: int): # def rmSource(self, id: int):
# with self.getSession() as session: # with self.getSession() as session:

View file

@ -5,6 +5,9 @@ class Signal:
Named 'signal' to avoid confusion with threading.Event Named 'signal' to avoid confusion with threading.Event
""" """
def __init__(self, name: str, params: dict): def __init__(self, name: str, params: dict = None):
self.name = name self.name = name
self.params = params self.params = params
def __repr__(self):
return f"<Signal {self.name}: {self.params}>"

View file

@ -11,6 +11,10 @@ import queue
from sorteerhoed.sqs import SqsListener from sorteerhoed.sqs import SqsListener
from sorteerhoed.webserver import Server from sorteerhoed.webserver import Server
import time import time
from sorteerhoed.Signal import Signal
import io
from PIL import Image
import datetime
class CentralManagement(): class CentralManagement():
@ -67,27 +71,30 @@ class CentralManagement():
self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}") self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}")
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
sqsThread = threading.Thread(target=self.sqs.start) sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
sqsThread.start() sqsThread.start()
# the plotter itself # the plotter itself
self.plotter = Plotter(self.config, self.eventQueue, self.isRunning) self.plotter = Plotter(self.config, self.eventQueue, self.isRunning)
plotterThread = threading.Thread(target=self.plotter.start) plotterThread = threading.Thread(target=self.plotter.start, name='plotter')
plotterThread.start() plotterThread.start()
# webserver for turks and status # webserver for turks and status
self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q) self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store)
serverThread = threading.Thread(target=self.server.start) serverThread = threading.Thread(target=self.server.start, name='server')
serverThread.start() serverThread.start()
# event listener: # event listener:
dispatcherThread = threading.Thread(target=self.eventListener) dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
dispatcherThread.start() dispatcherThread.start()
#
#
self.makeHit() self.eventQueue.put(Signal('start', {'ding':'test'}))
while self.isRunning.is_set():
time.sleep(.5)
finally: finally:
self.isRunning.clear() self.isRunning.clear()
@ -111,19 +118,55 @@ class CentralManagement():
- Plotter complete - Plotter complete
- -
""" """
print(signal) #TODO: make level debug()
self.logger.warn(f"SIGNAL: {signal}")
if signal.name == 'start':
self.makeHit()
pass
elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT
pass
elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue
for name, value in enumerate(signal.params):
self.logger.debug(f'Set status: {name} to {value}')
self.server.statusPage.set(name, value)
elif signal.name == 'server.submit':
self.currentHit.submit_page_at = datetime.datetime.now()
self.store.saveHIT(self.currentHit)
self.plotter.park()
# park should alway triggers a plotter.finished after being processed
elif signal.name == 'sqs.AssignmentAccepted':
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
pass
elif signal.name == 'sqs.AssignmentAbandoned':
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
pass
elif signal.name == 'sqs.AssignmentReturned':
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
pass
elif signal.name == 'sqs.AssignmentSubmitted':
pass
elif signal.name == 'plotter.finished':
if self.currentHit.submit_page_at:
# TODO: scan!
pass
elif signal.name == '':
pass
# handle singals/events: # handle singals/events:
# TODO: next steps # TODO: next steps
# TODO: update status # TODO: update status
def makeHit(self): def makeHit(self):
self.currentHit = HITStore.HIT() self.currentHit = self.store.createHIT()
self.store.addHIT(self.currentHit)
self.logger(f"Make HIT {self.currentHit.id}") self.logger.info(f"Make HIT {self.currentHit.id}")
question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",self.currentHit.id) question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
new_hit = self.mturk.create_hit( new_hit = self.mturk.create_hit(
Title = 'Trace the drawn line', Title = 'Trace the drawn line',
Description = 'Draw a line over the sketched line in the image', Description = 'Draw a line over the sketched line in the image',
@ -136,12 +179,11 @@ class CentralManagement():
Question = question, Question = question,
) )
self.logger.info("Created hit:", new_hit) self.logger.info(f"Created hit: {new_hit}")
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
self.currentHit.hit_id = new_hit['HIT']['HITId'] self.currentHit.hit_id = new_hit['HIT']['HITId']
self.store.saveHIT(self.currentHit)
print(self.currentHit)
# mturk.send_test_event_notification() # mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']: if self.config['amazon']['sqs_url']:
@ -175,10 +217,18 @@ class CentralManagement():
Run scanimage on scaner and returns a string with the filename Run scanimage on scaner and returns a string with the filename
""" """
cmd = [ cmd = [
'sudo', 'scanimage' 'sudo', 'scanimage', '-d', 'epkowa'
] ]
filename = "" filename = self.currentHit.getImagePath()
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate(60) # opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
f = io.BytesIO(o)
img = Image.open(f)
img.save(filename)
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
exec

View file

@ -14,6 +14,16 @@ class Plotter:
self.isRunning = runningEvent self.isRunning = runningEvent
self.logger = logging.getLogger("sorteerhoed").getChild("plotter") self.logger = logging.getLogger("sorteerhoed").getChild("plotter")
def park(self):
self.logger.info("Queue to park plotter")
if self.config['dummy_plotter']:
# fastest way home :-)
self.q.put([0,0,0])
else:
# TODO: find a nice way to park the axidraw in the 0 position
self.q.put([0,0,0])
def start(self): def start(self):
self.axiDrawCueListener() self.axiDrawCueListener()
@ -31,24 +41,24 @@ class Plotter:
self.eventQ.put(Signal('plotter.finished')) self.eventQ.put(Signal('plotter.finished'))
else: else:
time.sleep(.05) time.sleep(.05)
self.logging.debug(f'Dummy plotter move: {move}') self.logger.debug(f'Dummy plotter move: {move}')
self.logger.info("Stopping dummy plotter") self.logger.info("Stopping dummy plotter")
else: else:
ad = axidraw.AxiDraw() self.ad = axidraw.AxiDraw()
ad.interactive() self.ad.interactive()
connected = ad.connect() connected = self.ad.connect()
if not connected: if not connected:
raise Exception("Cannot connect to Axidraw") raise Exception("Cannot connect to Axidraw")
try: try:
ad.options.units = 1 # set to use centimeters instead of inches self.ad.options.units = 1 # set to use centimeters instead of inches
ad.options.accel = 100; self.ad.options.accel = 100;
ad.options.speed_penup = 100 self.ad.options.speed_penup = 100
ad.options.speed_pendown = 100 self.ad.options.speed_pendown = 100
ad.options.model = 2 # A3, set to 1 for A4 self.ad.options.model = 1 # A3, set to 1 for A4
ad.moveto(0,0) self.ad.moveto(0,0)
plotterWidth = 22 plotterWidth = 22
plotterHeight = 18 # 16? plotterHeight = 18 # 16?
@ -65,14 +75,14 @@ class Plotter:
plotterRan = False plotterRan = False
self.eventQ.put(Signal('plotter.finished')) self.eventQ.put(Signal('plotter.finished'))
else: else:
ad.moveto(move[0]* plotterWidth, move[1]*plotterHeight) self.ad.moveto(move[0]* plotterWidth, move[1]*plotterHeight)
self.logging.debug(f'handler! {move}') self.logger.debug(f'handler! {move}')
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
finally: finally:
self.logger.warning("Close Axidraw connection") self.logger.warning("Close Axidraw connection")
ad.moveto(0,0) self.ad.moveto(0,0)
ad.disconnect() self.ad.disconnect()
# send shutdown signal (if not already set) # send shutdown signal (if not already set)
self.isRunning.clear() self.isRunning.clear()

View file

@ -4,6 +4,7 @@ from threading import Event
import logging import logging
import time import time
from sorteerhoed.Signal import Signal from sorteerhoed.Signal import Signal
import json
class SqsListener: class SqsListener:
def __init__(self, config, eventQ: Queue, runningEvent: Event): def __init__(self, config, eventQ: Queue, runningEvent: Event):
@ -29,14 +30,18 @@ class SqsListener:
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
for message in messages['Messages']: # 'Messages' is a list for message in messages['Messages']: # 'Messages' is a list
# process the messages # process the messages
self.debug(f"received: {message}") self.logger.info(f"received: {message}")
try: try:
for event in message['Body']['Events']: body = json.loads(message['Body'])
# self.logger.critical(f"Try: {body['Events']}")
for event in body['Events']:
# self.logger.warning(f"{event}")
self.eventQ.put(Signal( self.eventQ.put(Signal(
f"sqs.{event['EventType']}", f"sqs.{event['EventType']}",
{'event': event} {'event': event}
)) ))
except Exception: except Exception as e:
self.logger.exception(e)
pass pass
# next, we delete the message from the queue so no one else will process it again # next, we delete the message from the queue so no one else will process it again
sqs.delete_message( sqs.delete_message(

View file

@ -1,4 +1,3 @@
import argparse
import json import json
import logging import logging
import os import os
@ -8,15 +7,13 @@ import tornado.websocket
from urllib.parse import urlparse from urllib.parse import urlparse
import uuid import uuid
import coloredlogs
import glob
from pyaxidraw import axidraw # import module
from threading import Thread, Event from threading import Thread, Event
from queue import Queue, Empty from queue import Queue, Empty
import threading
from server_test import generated_image_dir
import asyncio import asyncio
from sorteerhoed import HITStore
from sorteerhoed.Signal import Signal
import httpagentparser
import geoip2.database
logger = logging.getLogger("sorteerhoed").getChild("webserver") logger = logging.getLogger("sorteerhoed").getChild("webserver")
@ -33,9 +30,12 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com'] CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com']
connections = set() connections = set()
def initialize(self, draw_q: Queue, generated_image_dir: str): def initialize(self, config, plotterQ: Queue, eventQ: Queue, store: HITStore, geoip_reader: geoip2.database.Reader):
self.draw_q = draw_q self.config = config
self.generated_image_dir = generated_image_dir self.plotterQ = plotterQ
self.eventQ = eventQ
self.store = store
self.geoip_reader = geoip_reader
def check_origin(self, origin): def check_origin(self, origin):
parsed_origin = urlparse(origin) parsed_origin = urlparse(origin)
@ -46,8 +46,29 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
# the client connected # the client connected
def open(self, p = None): def open(self, p = None):
self.__class__.connections.add(self) self.__class__.connections.add(self)
logger.info(f"New client connected: {self.request.remote_ip}") hit_id = self.get_query_argument('id')
self.hit = self.store.getHitById(hit_id)
if self.hit.submit_hit_at:
raise Exception("Opening websocket for already submitted hit")
logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}")
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, ip=self.request.remote_ip)))
self.strokes = [] self.strokes = []
ua = self.request.headers.get('User-Agent', None)
if ua:
ua_info = httpagentparser.detect(ua)
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, os=ua_info['os']['name'], browser=ua_info['browser']['name'])))
try:
geoip = self.geoip_reader.country(self.request.remote_ip)
logger.info(f"Geo {geoip}")
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, location=geoip.country.name)))
except Exception as e:
logger.exception(e)
logger.info("No geo IP possible")
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, location='Unknown')))
# self.write_message("hello!") # self.write_message("hello!")
# the client sent the message # the client sent the message
@ -60,7 +81,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
# TODO: min/max input # TODO: min/max input
point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])] point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])]
self.strokes.append(point) self.strokes.append(point)
self.draw_q.put(point) self.plotterQ.put(point)
elif msg['action'] == 'up': elif msg['action'] == 'up':
logger.info(f'up: {msg}') logger.info(f'up: {msg}')
@ -68,7 +89,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.strokes.append(point) self.strokes.append(point)
elif msg['action'] == 'submit': elif msg['action'] == 'submit':
logger.info(f'up: {msg}') logger.info(f'submit: {msg}')
id = self.submit_strokes() id = self.submit_strokes()
if not id: if not id:
self.write_message(json.dumps('error')) self.write_message(json.dumps('error'))
@ -76,11 +97,19 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.write_message(json.dumps({ self.write_message(json.dumps({
'action': 'submitted', 'action': 'submitted',
'msg': f"Submission ok, please refer to your submission as: {id}" 'msg': f"Submission ok, please refer to your submission as: {self.hit.uuid}"
})) }))
elif msg['action'] == 'down': elif msg['action'] == 'down':
# not used, implicit in move? # not used, implicit in move?
pass pass
elif msg['action'] == 'info':
self.eventQ.put(Signal('hit.info', dict(
hit_id=self.hit.id,
resolution=msg['resolution'],
browser=msg['browser']
)))
pass
else: else:
# self.send({'alert': 'Unknown request: {}'.format(message)}) # self.send({'alert': 'Unknown request: {}'.format(message)})
logger.warn('Unknown request: {}'.format(message)) logger.warn('Unknown request: {}'.format(message))
@ -98,6 +127,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
if len(self.strokes) < 1: if len(self.strokes) < 1:
return False return False
self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id)))
if self.config['dummy_plotter']:
d = strokes2D(self.strokes) d = strokes2D(self.strokes)
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?> svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg viewBox="0 0 600 600" <svg viewBox="0 0 600 600"
@ -112,14 +144,15 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
</svg> </svg>
""" """
id = uuid.uuid4().hex filename = self.hit.getImagePath()
logger.info(f"Write to {filename}")
filename = os.path.join(self.generated_image_dir , id+'.svg')
with open(filename, 'w') as fp: with open(filename, 'w') as fp:
logger.info(f"Wrote {filename}")
fp.write(svg) fp.write(svg)
return id # we fake a hit.scanned event
self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
return self.hit.uuid
@classmethod @classmethod
def rmConnection(cls, client): def rmConnection(cls, client):
@ -128,21 +161,43 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
cls.connections.remove(client) cls.connections.remove(client)
class LatestImageHandler(tornado.web.RequestHandler): class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost']
connections = set()
def initialize(self, generated_image_dir: str): def initialize(self):
self.generated_image_dir = generated_image_dir pass
def get(self): def check_origin(self, origin):
self.set_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0') parsed_origin = urlparse(origin)
self.set_header("Content-Type", "image/svg+xml") # parsed_origin.netloc.lower() gives localhost:3333
valid = any([parsed_origin.hostname.endswith(origin) for origin in self.CORS_ORIGINS])
return valid
list_of_files = glob.glob(os.path.join(self.generated_image_dir,'*.svg')) # the client connected
latest_file = max(list_of_files, key=os.path.getctime) def open(self, p = None):
with open(latest_file, 'r') as fp: self.__class__.connections.add(self)
self.write(fp.read())
# client disconnected
def on_close(self):
self.__class__.rmConnection(self)
logger.info(f"Client disconnected: {self.request.remote_ip}")
@classmethod
def rmConnection(cls, client):
if client not in cls.connections:
return
cls.connections.remove(client)
@classmethod
def update_for_all(cls, prop, value):
for connection in cls.connections:
connection.write_message(json.dumps({
'property': prop,
'value': value
}))
def strokes2D(strokes): def strokes2D(strokes):
# strokes to a d attribute for a path # strokes to a d attribute for a path
d = ""; d = "";
@ -165,7 +220,62 @@ def strokes2D(strokes):
last_stroke = stroke; last_stroke = stroke;
return d; return d;
class DrawPageHandler(tornado.web.RequestHandler):
def initialize(self, store: HITStore, path: str):
self.store = store
self.path = path
def get(self):
try:
hit_id = self.get_query_argument('id')
hit = self.store.getHitById(hit_id)
except Exception:
self.write("HIT not found")
else:
if hit.submit_page_at:
self.write("HIT already submitted")
return
previous_hit = self.store.getLastSubmittedHit()
if not previous_hit:
# start with basic svg
logger.warning("No previous HIT, start from basic svg")
image = "/basic.svg"
else:
image = previous_hit.getImageUrl()
logger.info(f"Image url: {image}")
self.set_header("Access-Control-Allow-Origin", "*")
contents = open(os.path.join(self.path, 'index.html'), 'r').read().replace("{IMAGE_URL}", image)
self.write(contents)
class StatusPage():
"""
Properties for on the status page, which are send over websockets the moment
they are altered.
"""
def __init__(self):
self.reset()
def reset(self):
self.hit_id = None
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
self.state = None
self.fee = None
self.hit_created = None
self.hit_opened = None
def __setattr__(self, name, value):
self.__dict__[name] =value
StatusWebSocketHandler.update_for_all(name, value)
def set(self, name, value):
return self.__setattr__(name, value)
class Server: class Server:
""" """
@ -174,7 +284,7 @@ class Server:
TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image
""" """
def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue): def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue, store: HITStore):
self.isRunning = runningEvent self.isRunning = runningEvent
self.eventQ = eventQ self.eventQ = eventQ
self.config = config self.config = config
@ -183,19 +293,33 @@ class Server:
self.plotterQ = plotterQ # communicate directly to plotter (skip main thread) self.plotterQ = plotterQ # communicate directly to plotter (skip main thread)
#self.config['server']['port'] #self.config['server']['port']
self.generated_image_dir = os.path.join('www','generated') self.web_root = os.path.join('www')
self.static_file_dir = os.path.join('www')
self.server_loop = None self.server_loop = None
self.store = store
self.statusPage = StatusPage()
def start(self): def start(self):
if not os.path.exists('GeoLite2-Country.mmdb'):
raise Exception("Please download the GeoLite2 Country database and place the 'GeoLite2-Country.mmdb' file in the project root.")
self.geoip_reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
try: try:
asyncio.set_event_loop(asyncio.new_event_loop()) asyncio.set_event_loop(asyncio.new_event_loop())
application = tornado.web.Application([ application = tornado.web.Application([
(r"/ws(.*)", WebSocketHandler, {'draw_q': self.plotterQ, 'generated_image_dir': self.generated_image_dir}), (r"/ws(.*)", WebSocketHandler, {
(r"/latest.svg", LatestImageHandler, {'generated_image_dir': self.generated_image_dir}), # TODO: have js request the right image, based on a 'start' button. This way we can trace the history of a drawing 'config': self.config,
'plotterQ': self.plotterQ,
'eventQ': self.eventQ,
'store': self.store,
'geoip_reader': self.geoip_reader
}),
(r"/status/ws", StatusWebSocketHandler),
(r"/draw", DrawPageHandler,
dict(store = self.store, path=self.web_root)),
(r"/(.*)", StaticFileWithHeaderHandler, (r"/(.*)", StaticFileWithHeaderHandler,
{"path": self.static_file_dir, "default_filename": 'index.html'}), {"path": self.web_root}),
], debug=True, autoreload=False) ], debug=True, autoreload=False)
application.listen(self.config['server']['port']) application.listen(self.config['server']['port'])
self.server_loop = tornado.ioloop.IOLoop.current() self.server_loop = tornado.ioloop.IOLoop.current()

38
www/basic.svg Normal file
View file

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:cc="http://creativecommons.org/ns#"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
id="svg8"
version="1.1"
viewBox="0 0 210 210"
height="210mm"
width="210mm">
<defs
id="defs2" />
<metadata
id="metadata5">
<rdf:RDF>
<cc:Work
rdf:about="">
<dc:format>image/svg+xml</dc:format>
<dc:type
rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
<dc:title></dc:title>
</cc:Work>
</rdf:RDF>
</metadata>
<g
transform="translate(0,-87)"
id="layer1">
<rect
y="138.32738"
x="51.327381"
height="107.34524"
width="107.34524"
id="rect815"
style="fill:none;fill-opacity:1;stroke:white;stroke-width:1;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0.75590557;stroke-opacity:1" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 1 KiB

BIN
www/cursor.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 344 B

View file

@ -33,7 +33,7 @@
width: 600px; width: 600px;
position: relative; position: relative;
background:#ccc; background:#ccc;
cursor: url(cursor.png) 6 6, auto;
} }
html, body{ html, body{
height: 100%; height: 100%;
@ -50,7 +50,7 @@
</ul> </ul>
<div id='interface'> <div id='interface'>
<div id='wrapper'> <div id='wrapper'>
<img src="/latest.svg" id='sample'> <img src="{IMAGE_URL}" id='sample'>
<svg id="canvas"> <svg id="canvas">
<path d="" id="stroke" /> <path d="" id="stroke" />
</svg> </svg>
@ -62,7 +62,7 @@
<div id='message'></div> <div id='message'></div>
</div> </div>
<script type="text/javascript"> <script type="text/javascript">
let url = window.location.origin.replace('http', 'ws') +'/ws'; let url = window.location.origin.replace('http', 'ws') +'/ws?' + window.location.search.substring(1);
let svgEl = document.getElementById("canvas"); let svgEl = document.getElementById("canvas");
let strokeEl = document.getElementById('stroke'); let strokeEl = document.getElementById('stroke');
let submitEl = document.getElementById('submit'); let submitEl = document.getElementById('submit');
@ -78,11 +78,12 @@
let pos = svgEl.getBoundingClientRect() let pos = svgEl.getBoundingClientRect()
let x = e.x - pos['left']; let x = e.x - pos['left'];
let y = e.y - pos['top']; let y = e.y - pos['top'];
strokes.push([x, y, 0]);
if(isDrawing) {
strokes.push([x, y, 0]);
let d = strokes2D(strokes); let d = strokes2D(strokes);
console.log(d);
strokeEl.setAttribute('d', d); strokeEl.setAttribute('d', d);
}
currentPoint = { currentPoint = {
'action': 'move', 'action': 'move',

9
www/status.html Normal file
View file

@ -0,0 +1,9 @@
<html>
<head>
<title>Status</title>
</head>
<body>
Status!
websocket at /status/ws
</body>
</html>