]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java
Add ass3-elastic profile
[pub/jan/dst18.git] / ass3-event / src / main / java / dst / ass3 / event / EventSubscriber.java
1 package dst.ass3.event;
2
3 import java.lang.reflect.Proxy;
4 import java.net.SocketAddress;
5 import java.util.concurrent.BlockingQueue;
6 import java.util.concurrent.LinkedBlockingQueue;
7
8 import dst.ass3.event.model.domain.IUploadEventInfo;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
13 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
14 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
15 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
16 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
17 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
18 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
19 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
20 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
21 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
22 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
23 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
24
25 /**
26  * An EventSubscriber receives IUploadEventInfo objects through a netty SocketChannel. Create and connect an
27  * EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}.
28  */
29 public class EventSubscriber {
30
31     private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class);
32
33     private static final IUploadEventInfo POISON_PILL = (IUploadEventInfo) Proxy.newProxyInstance(
34             IUploadEventInfo.class.getClassLoader(), new Class[]{IUploadEventInfo.class}, (p, m, a) -> null);
35
36     private final SocketAddress publisherAddress;
37
38     private final BlockingQueue<IUploadEventInfo> queue;
39
40     private volatile boolean closed;
41
42     private Channel channel;
43     private EventLoopGroup loop;
44
45     private EventSubscriber(SocketAddress publisherAddress) {
46         this.publisherAddress = publisherAddress;
47         this.queue = new LinkedBlockingQueue<>();
48     }
49
50     /**
51      * Blocks to receive the next IUploadEventInfo published into the channel. Returns {@code null} if the underlying
52      * channel has been closed or the thread was interrupted.
53      *
54      * @return the next IUploadEventInfo object
55      * @throws IllegalStateException thrown if the previous call returned null and the channel was closed
56      */
57     public IUploadEventInfo receive() throws IllegalStateException {
58         synchronized (queue) {
59             if (closed && queue.isEmpty()) {
60                 throw new IllegalStateException();
61             }
62         }
63
64         IUploadEventInfo event;
65         try {
66             event = queue.take();
67
68             if (event == POISON_PILL) {
69                 return null;
70             } else {
71                 return event;
72             }
73         } catch (InterruptedException e) {
74             return null;
75         }
76     }
77
78     private Future<?> start() {
79         loop = new NioEventLoopGroup();
80
81         channel = new Bootstrap()
82                 .group(loop)
83                 .channel(NioSocketChannel.class)
84                 .option(ChannelOption.TCP_NODELAY, true)
85                 .handler(new EventSubscriberHandler())
86                 .connect(publisherAddress) // ChannelFuture
87                 .addListener(future -> {
88                     if (!future.isSuccess()) {
89                         LOG.error("Error while connecting");
90                         close();
91                     }
92                 })
93                 .syncUninterruptibly()
94                 .channel();
95
96         LOG.info("Connected to channel {}", channel);
97
98         return loop.submit(() -> {
99             try {
100                 channel.closeFuture().sync();
101             } catch (InterruptedException e) {
102                 // noop
103             } finally {
104                 close();
105             }
106         });
107     }
108
109     /**
110      * Closes all resources and threads used by the EventSubscriber.
111      */
112     public void close() {
113         try {
114             if (loop != null) {
115                 synchronized (queue) {
116                     if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) {
117                         LOG.info("Shutting down event loop");
118                         loop.shutdownGracefully();
119                     }
120                 }
121             }
122         } finally {
123             synchronized (queue) {
124                 if (!closed) {
125                     LOG.debug("Adding poison pill to queue");
126                     closed = true;
127                     queue.add(POISON_PILL);
128                 }
129             }
130         }
131     }
132
133     /**
134      * Creates a new EventSubscriber that connects to given SocketAddress.
135      *
136      * @param address the socket address
137      * @return a new EventSubscriber
138      */
139     public static EventSubscriber subscribe(SocketAddress address) {
140         EventSubscriber eventSubscriber = new EventSubscriber(address);
141         eventSubscriber.start();
142         return eventSubscriber;
143     }
144
145     private class EventSubscriberHandler extends ChannelInboundHandlerAdapter {
146         @Override
147         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
148             ctx.read();
149
150             if (!(msg instanceof IUploadEventInfo)) {
151                 LOG.error("Unknown message type received {}", msg);
152                 return;
153             }
154
155             synchronized (queue) {
156                 if (!closed) {
157                     queue.add((IUploadEventInfo) msg);
158                 }
159             }
160         }
161
162         @Override
163         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
164             LOG.error("EventSubscriberHandler caught an exception", cause);
165             ctx.close();
166             close();
167         }
168
169         @Override
170         public void channelActive(ChannelHandlerContext ctx) {
171             ctx.pipeline()
172                     .addFirst(new ObjectEncoder())
173                     .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
174         }
175
176     }
177 }
178