]> git.somenet.org - pub/jan/dst18.git/blob - ass3-worker/worker.py
GITOLITE.txt
[pub/jan/dst18.git] / ass3-worker / worker.py
1 #!/usr/bin/env python3
2 #
3
4 import sys
5 import signal
6 import pika
7 import time
8 import json
9 import uuid
10 import random
11
12 workertype = None
13
14 def signal_handler(signum, frame):
15     sig = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items())) if v.startswith('SIG') and not v.startswith('SIG_'))
16     print('\n*** '+str(sig[signum])+' received. Graceful shutdown.')
17     sys.exit(0)
18
19
20 def on_message(channel, method_frame, header_frame, body):
21     print('*** on_message(): '+workertype+'-'+str(method_frame.delivery_tag))
22     print(body)
23     ret = dict()
24     startmillis = int(round(time.time() * 1000))
25
26     ret['requestId'] = str(uuid.uuid1())
27
28     # pseudo-work
29     if workertype == 'document':
30         time.sleep(random.randint(3,5))
31     if workertype == 'quiz':
32         time.sleep(random.randint(1,2))
33     if workertype == 'video':
34         time.sleep(random.randint(8,11))
35
36     ret['processingTime'] = str(int(round(time.time() * 1000))-startmillis)
37     channel.basic_ack(delivery_tag=method_frame.delivery_tag)
38     channel.basic_publish('dst.workers', 'requests.'+workertype, json.dumps(ret), pika.BasicProperties())
39
40
41 def mainloop():
42     connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.99.99',credentials=pika.PlainCredentials('dst', 'dst')))
43     channel = connection.channel()
44     channel.queue_declare(queue='dst.'+workertype, durable=True, exclusive=False, auto_delete=False)
45     channel.basic_qos(prefetch_count=1)
46     channel.basic_consume(on_message, queue='dst.'+workertype)
47
48     print('*** Starting consuming.')
49     channel.start_consuming()
50
51
52 if __name__ == '__main__':
53     if len(sys.argv) != 2:
54         print('Takes exactly one parameter: WORKERTYPE')
55         sys.exit(0)
56
57     workertype = sys.argv[1]
58     print('*** Starting worker of type: '+workertype)
59     signal.signal(signal.SIGTERM, signal_handler)
60     signal.signal(signal.SIGINT, signal_handler)
61     mainloop()