14 def signal_handler(signum, frame):
15 sig = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items())) if v.startswith('SIG') and not v.startswith('SIG_'))
16 print('\n*** '+str(sig[signum])+' received. Graceful shutdown.')
20 def on_message(channel, method_frame, header_frame, body):
21 print('*** on_message(): '+workertype+'-'+str(method_frame.delivery_tag))
24 startmillis = int(round(time.time() * 1000))
26 ret['requestId'] = str(uuid.uuid1())
29 if workertype == 'document':
30 time.sleep(random.randint(3,5))
31 if workertype == 'quiz':
32 time.sleep(random.randint(1,2))
33 if workertype == 'video':
34 time.sleep(random.randint(8,11))
36 ret['processingTime'] = str(int(round(time.time() * 1000))-startmillis)
37 channel.basic_ack(delivery_tag=method_frame.delivery_tag)
38 channel.basic_publish('dst.workers', 'requests.'+workertype, json.dumps(ret), pika.BasicProperties())
42 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.99.99',credentials=pika.PlainCredentials('dst', 'dst')))
43 channel = connection.channel()
44 channel.queue_declare(queue='dst.'+workertype, durable=True, exclusive=False, auto_delete=False)
45 channel.basic_qos(prefetch_count=1)
46 channel.basic_consume(on_message, queue='dst.'+workertype)
48 print('*** Starting consuming.')
49 channel.start_consuming()
52 if __name__ == '__main__':
53 if len(sys.argv) != 2:
54 print('Takes exactly one parameter: WORKERTYPE')
57 workertype = sys.argv[1]
58 print('*** Starting worker of type: '+workertype)
59 signal.signal(signal.SIGTERM, signal_handler)
60 signal.signal(signal.SIGINT, signal_handler)