1 package dst.ass3.event;
3 import java.util.concurrent.atomic.AtomicBoolean;
4 import java.util.function.Function;
6 import dst.ass3.event.model.domain.IUploadEventInfo;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
10 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
11 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
12 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
13 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
14 import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup;
15 import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup;
16 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
17 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
18 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
19 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
20 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
21 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
24 * An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link IUploadEventInfo}
25 * objects to these clients.
27 public class EventPublisher {
29 private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);
31 private final Object clientChannelMonitor = new Object();
33 private final int port;
34 private final AtomicBoolean closed;
36 private EventLoopGroup bossGroup;
37 private EventLoopGroup workerGroup;
38 private ChannelGroup clientChannels;
40 public EventPublisher(int port) {
42 this.closed = new AtomicBoolean(false);
45 public int getPort() {
50 * Broadcast an event to all listening channels. Does nothing if no clients are connected.
52 * @param event the event to publish
53 * @throws IllegalStateException if the publisher hasn't been started yet or has been closed
55 public void publish(IUploadEventInfo event) {
56 if (clientChannels == null || closed.get()) {
57 throw new IllegalStateException();
60 clientChannels.writeAndFlush(event).syncUninterruptibly();
62 // wait a bit for event to propagate
65 } catch (InterruptedException e) {}
69 * Like {@link #publish(IUploadEventInfo)} but waits for a given number of milliseconds and then passes the current system
70 * time to a factory function.
72 * @param delay the delay in ms
73 * @param provider the provider
75 public void publish(long delay, Function<Long, IUploadEventInfo> provider) {
79 } catch (InterruptedException e) {
82 publish(provider.apply(System.currentTimeMillis()));
86 * This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are
87 * connected, the method returns immediately.
89 public void waitForClients() {
90 if (clientChannels.isEmpty()) {
91 LOG.debug("Waiting for clients to connect...");
92 synchronized (clientChannelMonitor) {
94 clientChannelMonitor.wait();
95 } catch (InterruptedException e) {
96 LOG.debug("Interrupted while waiting on client connections", e);
102 public int getConnectedClientCount() {
103 if (clientChannels == null || closed.get()) {
104 throw new IllegalStateException();
106 return clientChannels.size();
110 * Closes all active client connections.
112 public void dropClients() {
113 if (clientChannels == null || closed.get()) {
114 throw new IllegalStateException();
116 clientChannels.close().syncUninterruptibly().group().clear();
120 * Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during
123 public void start() {
124 bossGroup = new NioEventLoopGroup();
125 workerGroup = new NioEventLoopGroup();
126 clientChannels = new DefaultChannelGroup(workerGroup.next());
128 ServerBootstrap b = new ServerBootstrap();
129 b.group(bossGroup, workerGroup)
130 .channel(NioServerSocketChannel.class)
131 .childHandler(new ClientChannelInitializer());
133 // Bind and start to accept incoming connections
134 ChannelFuture f = b.bind(port).addListener(future -> {
135 if (!future.isSuccess()) {
136 LOG.error("Error while binding socket");
139 }).syncUninterruptibly();
140 LOG.info("Accepting connections on {}", f.channel());
144 * Closes all channels and resources.
146 public void close() {
147 if (closed.compareAndSet(false, true)) {
148 LOG.info("Shutting down event loops");
149 if (bossGroup != null) {
150 bossGroup.shutdownGracefully();
152 if (workerGroup != null) {
153 workerGroup.shutdownGracefully();
158 private class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
160 public void initChannel(SocketChannel ch) throws Exception {
161 LOG.info("Initializing client channel {}", ch);
162 clientChannels.add(ch);
165 .addFirst(new ObjectEncoder())
166 .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
168 synchronized (clientChannelMonitor) {
169 clientChannelMonitor.notifyAll();