1 package dst.ass3.event;
3 import java.util.ArrayList;
4 import java.util.HashMap;
7 import java.util.concurrent.BlockingQueue;
8 import java.util.concurrent.ConcurrentHashMap;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.TimeUnit;
12 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
15 * A SinkFunction that collects objects into a queue located in a shared global state. Each collector accesses a
16 * specific key in the shared state.
18 * @param <T> the sink input type
20 public class StaticQueueSink<T> implements SinkFunction<T> {
22 private static final long serialVersionUID = -3965500756295835669L;
24 private static Map<String, BlockingQueue<?>> state = new ConcurrentHashMap<>();
28 public StaticQueueSink(String key) {
33 public void invoke(T value, Context context) throws Exception {
41 public List<T> take(int n) {
42 List<T> list = new ArrayList<>(n);
44 for (int i = 0; i < n; i++) {
46 list.add(get().take());
47 } catch (InterruptedException e) {
48 throw new RuntimeException("Interrupted while accessing queue", e);
58 } catch (InterruptedException e) {
63 public T poll(long ms) {
65 return get().poll(ms, TimeUnit.MILLISECONDS);
66 } catch (InterruptedException e) {
71 public synchronized BlockingQueue<T> get() {
75 @SuppressWarnings("unchecked")
76 private static <R> BlockingQueue<R> get(String key) {
77 return (BlockingQueue) state.computeIfAbsent(key, k -> new LinkedBlockingQueue<>());
80 public static void clearAll() {