package dst.ass3.event.impl;

import dst.ass3.event.Constants;
import dst.ass3.event.EventSubscriber;
import dst.ass3.event.IEventSourceFunction;
import dst.ass3.event.model.domain.IUploadEventInfo;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

import java.net.InetSocketAddress;

public class EventSourceFunction implements IEventSourceFunction {
    private EventSubscriber es;
    private boolean isRunning;
    private RuntimeContext runtimeContext;

    @Override
    public void open(Configuration parameters) throws Exception {
        es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT));
    }

    @Override
    public void close() throws Exception {
        if (es == null) return;
        cancel();
        es.close();
        es = null;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return runtimeContext;
    }

    @Override
    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
    }

    @Override
    public IterationRuntimeContext getIterationRuntimeContext() {
        return null;
    }

    @Override
    public void run(SourceContext<IUploadEventInfo> ctx) throws Exception {
        isRunning = true;
        while (isRunning) {
            IUploadEventInfo event = es.receive();
            if (event == null) {
                isRunning = false;
                continue;
            }
            ctx.collectWithTimestamp(event, event.getTimestamp());
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
