1 package dst.ass3.event;
3 import java.lang.reflect.Proxy;
4 import java.net.SocketAddress;
5 import java.util.concurrent.BlockingQueue;
6 import java.util.concurrent.LinkedBlockingQueue;
8 import dst.ass3.event.model.domain.IUploadEventInfo;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
12 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
13 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
14 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
15 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
16 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
17 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
18 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
19 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
20 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
21 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
22 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
23 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
26 * An EventSubscriber receives IUploadEventInfo objects through a netty SocketChannel. Create and connect an
27 * EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}.
29 public class EventSubscriber {
31 private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class);
33 private static final IUploadEventInfo POISON_PILL = (IUploadEventInfo) Proxy.newProxyInstance(
34 IUploadEventInfo.class.getClassLoader(), new Class[]{IUploadEventInfo.class}, (p, m, a) -> null);
36 private final SocketAddress publisherAddress;
38 private final BlockingQueue<IUploadEventInfo> queue;
40 private volatile boolean closed;
42 private Channel channel;
43 private EventLoopGroup loop;
45 private EventSubscriber(SocketAddress publisherAddress) {
46 this.publisherAddress = publisherAddress;
47 this.queue = new LinkedBlockingQueue<>();
51 * Blocks to receive the next IUploadEventInfo published into the channel. Returns {@code null} if the underlying
52 * channel has been closed or the thread was interrupted.
54 * @return the next IUploadEventInfo object
55 * @throws IllegalStateException thrown if the previous call returned null and the channel was closed
57 public IUploadEventInfo receive() throws IllegalStateException {
58 synchronized (queue) {
59 if (closed && queue.isEmpty()) {
60 throw new IllegalStateException();
64 IUploadEventInfo event;
68 if (event == POISON_PILL) {
73 } catch (InterruptedException e) {
78 private Future<?> start() {
79 loop = new NioEventLoopGroup();
81 channel = new Bootstrap()
83 .channel(NioSocketChannel.class)
84 .option(ChannelOption.TCP_NODELAY, true)
85 .handler(new EventSubscriberHandler())
86 .connect(publisherAddress) // ChannelFuture
87 .addListener(future -> {
88 if (!future.isSuccess()) {
89 LOG.error("Error while connecting");
93 .syncUninterruptibly()
96 LOG.info("Connected to channel {}", channel);
98 return loop.submit(() -> {
100 channel.closeFuture().sync();
101 } catch (InterruptedException e) {
110 * Closes all resources and threads used by the EventSubscriber.
112 public void close() {
115 synchronized (queue) {
116 if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) {
117 LOG.info("Shutting down event loop");
118 loop.shutdownGracefully();
123 synchronized (queue) {
125 LOG.debug("Adding poison pill to queue");
127 queue.add(POISON_PILL);
134 * Creates a new EventSubscriber that connects to given SocketAddress.
136 * @param address the socket address
137 * @return a new EventSubscriber
139 public static EventSubscriber subscribe(SocketAddress address) {
140 EventSubscriber eventSubscriber = new EventSubscriber(address);
141 eventSubscriber.start();
142 return eventSubscriber;
145 private class EventSubscriberHandler extends ChannelInboundHandlerAdapter {
147 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
150 if (!(msg instanceof IUploadEventInfo)) {
151 LOG.error("Unknown message type received {}", msg);
155 synchronized (queue) {
157 queue.add((IUploadEventInfo) msg);
163 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
164 LOG.error("EventSubscriberHandler caught an exception", cause);
170 public void channelActive(ChannelHandlerContext ctx) {
172 .addFirst(new ObjectEncoder())
173 .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));