1 package dst.ass3.event.impl;
3 import dst.ass3.event.Constants;
4 import dst.ass3.event.EventSubscriber;
5 import dst.ass3.event.IEventSourceFunction;
6 import dst.ass3.event.model.domain.IUploadEventInfo;
7 import org.apache.flink.api.common.functions.IterationRuntimeContext;
8 import org.apache.flink.api.common.functions.RuntimeContext;
9 import org.apache.flink.configuration.Configuration;
11 import java.net.InetSocketAddress;
13 public class EventSourceFunction implements IEventSourceFunction {
14 private EventSubscriber es;
15 private boolean isRunning;
16 private RuntimeContext runtimeContext;
19 public void open(Configuration parameters) throws Exception {
20 es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT));
24 public void close() throws Exception {
25 if (es == null) return;
32 public RuntimeContext getRuntimeContext() {
33 return runtimeContext;
37 public void setRuntimeContext(RuntimeContext runtimeContext) {
38 this.runtimeContext = runtimeContext;
42 public IterationRuntimeContext getIterationRuntimeContext() {
47 public void run(SourceContext<IUploadEventInfo> ctx) throws Exception {
50 IUploadEventInfo event = es.receive();
55 ctx.collectWithTimestamp(event, event.getTimestamp());
60 public void cancel() {