]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/main/java/dst/ass3/event/EventPublisher.java
[3.1] refactored queues - removed manual bindings.
[pub/jan/dst18.git] / ass3-event / src / main / java / dst / ass3 / event / EventPublisher.java
1 package dst.ass3.event;
2
3 import java.util.concurrent.atomic.AtomicBoolean;
4 import java.util.function.Function;
5
6 import dst.ass3.event.model.domain.IUploadEventInfo;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
11 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
12 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
13 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
14 import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup;
15 import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup;
16 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
17 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
18 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
19 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers;
20 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder;
21 import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder;
22
23 /**
24  * An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link IUploadEventInfo}
25  * objects to these clients.
26  */
27 public class EventPublisher {
28
29     private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);
30
31     private final Object clientChannelMonitor = new Object();
32
33     private final int port;
34     private final AtomicBoolean closed;
35
36     private EventLoopGroup bossGroup;
37     private EventLoopGroup workerGroup;
38     private ChannelGroup clientChannels;
39
40     public EventPublisher(int port) {
41         this.port = port;
42         this.closed = new AtomicBoolean(false);
43     }
44
45     public int getPort() {
46         return port;
47     }
48
49     /**
50      * Broadcast an event to all listening channels. Does nothing if no clients are connected.
51      *
52      * @param event the event to publish
53      * @throws IllegalStateException if the publisher hasn't been started yet or has been closed
54      */
55     public void publish(IUploadEventInfo event) {
56         if (clientChannels == null || closed.get()) {
57             throw new IllegalStateException();
58         }
59
60         clientChannels.writeAndFlush(event).syncUninterruptibly();
61
62         // wait a bit for event to propagate
63         try {
64             Thread.sleep(10);
65         } catch (InterruptedException e) {}
66     }
67
68     /**
69      * Like {@link #publish(IUploadEventInfo)} but waits for a given number of milliseconds and then passes the current system
70      * time to a factory function.
71      *
72      * @param delay the delay in ms
73      * @param provider the provider
74      */
75     public void publish(long delay, Function<Long, IUploadEventInfo> provider) {
76         if (delay > 0) {
77             try {
78                 Thread.sleep(delay);
79             } catch (InterruptedException e) {
80             }
81         }
82         publish(provider.apply(System.currentTimeMillis()));
83     }
84
85     /**
86      * This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are
87      * connected, the method returns immediately.
88      */
89     public void waitForClients() {
90         if (clientChannels.isEmpty()) {
91             LOG.debug("Waiting for clients to connect...");
92             synchronized (clientChannelMonitor) {
93                 try {
94                     clientChannelMonitor.wait();
95                 } catch (InterruptedException e) {
96                     LOG.debug("Interrupted while waiting on client connections", e);
97                 }
98             }
99         }
100     }
101
102     public int getConnectedClientCount() {
103         if (clientChannels == null || closed.get()) {
104             throw new IllegalStateException();
105         }
106         return clientChannels.size();
107     }
108
109     /**
110      * Closes all active client connections.
111      */
112     public void dropClients() {
113         if (clientChannels == null || closed.get()) {
114             throw new IllegalStateException();
115         }
116         clientChannels.close().syncUninterruptibly().group().clear();
117     }
118
119     /**
120      * Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during
121      * connection.
122      */
123     public void start() {
124         bossGroup = new NioEventLoopGroup();
125         workerGroup = new NioEventLoopGroup();
126         clientChannels = new DefaultChannelGroup(workerGroup.next());
127
128         ServerBootstrap b = new ServerBootstrap();
129         b.group(bossGroup, workerGroup)
130                 .channel(NioServerSocketChannel.class)
131                 .childHandler(new ClientChannelInitializer());
132
133         // Bind and start to accept incoming connections
134         ChannelFuture f = b.bind(port).addListener(future -> {
135             if (!future.isSuccess()) {
136                 LOG.error("Error while binding socket");
137                 close();
138             }
139         }).syncUninterruptibly();
140         LOG.info("Accepting connections on {}", f.channel());
141     }
142
143     /**
144      * Closes all channels and resources.
145      */
146     public void close() {
147         if (closed.compareAndSet(false, true)) {
148             LOG.info("Shutting down event loops");
149             if (bossGroup != null) {
150                 bossGroup.shutdownGracefully();
151             }
152             if (workerGroup != null) {
153                 workerGroup.shutdownGracefully();
154             }
155         }
156     }
157
158     private class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
159         @Override
160         public void initChannel(SocketChannel ch) throws Exception {
161             LOG.info("Initializing client channel {}", ch);
162             clientChannels.add(ch);
163
164             ch.pipeline()
165                     .addFirst(new ObjectEncoder())
166                     .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader())));
167
168             synchronized (clientChannelMonitor) {
169                 clientChannelMonitor.notifyAll();
170             }
171         }
172     }
173 }