]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
[3.3] first part.
[pub/jan/dst18.git] / ass3-event / src / main / java / dst / ass3 / event / impl / EventSourceFunction.java
1 package dst.ass3.event.impl;
2
3 import dst.ass3.event.Constants;
4 import dst.ass3.event.EventSubscriber;
5 import dst.ass3.event.IEventSourceFunction;
6 import dst.ass3.event.model.domain.IUploadEventInfo;
7 import org.apache.flink.api.common.functions.IterationRuntimeContext;
8 import org.apache.flink.api.common.functions.RuntimeContext;
9 import org.apache.flink.configuration.Configuration;
10
11 import java.net.InetSocketAddress;
12
13 public class EventSourceFunction implements IEventSourceFunction {
14     private EventSubscriber es;
15     private boolean isRunning;
16     private RuntimeContext runtimeContext;
17
18     @Override
19     public void open(Configuration parameters) throws Exception {
20         es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT));
21     }
22
23     @Override
24     public void close() throws Exception {
25         if (es == null) return;
26         cancel();
27         es.close();
28         es = null;
29     }
30
31     @Override
32     public RuntimeContext getRuntimeContext() {
33         return runtimeContext;
34     }
35
36     @Override
37     public void setRuntimeContext(RuntimeContext runtimeContext) {
38         this.runtimeContext = runtimeContext;
39     }
40
41     @Override
42     public IterationRuntimeContext getIterationRuntimeContext() {
43         return null;
44     }
45
46     @Override
47     public void run(SourceContext<IUploadEventInfo> ctx) throws Exception {
48         isRunning = true;
49         while (isRunning) {
50             IUploadEventInfo event = es.receive();
51             if (event == null) {
52                 isRunning = false;
53                 continue;
54             }
55             ctx.collectWithTimestamp(event, event.getTimestamp());
56         }
57     }
58
59     @Override
60     public void cancel() {
61         isRunning = false;
62     }
63 }