/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.reader;

import java.io.IOException;
import kd.bos.algox.AlgoXException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.hack.io.network.LocalMode;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.util.Preconditions;

public abstract class AbstractReader
implements ReaderBase {
    protected final InputGate inputGate;
    private final TaskEventHandler taskEventHandler = new TaskEventHandler();
    private boolean isIterative;
    private int currentNumberOfEndOfSuperstepEvents;

    protected AbstractReader(InputGate inputGate) {
        this.inputGate = inputGate;
    }

    @Override
    public boolean isFinished() {
        return this.inputGate.isFinished();
    }

    @Override
    public void registerTaskEventListener(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
        this.taskEventHandler.subscribe(listener, eventType);
    }

    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        this.inputGate.sendTaskEvent(event);
    }

    protected boolean handleEvent(AbstractEvent event) throws IOException {
        Class<?> eventType = event.getClass();
        if (LocalMode.isLocal()) {
            if (eventType == EndOfPartitionEvent.class) {
                return true;
            }
            throw new AlgoXException("Don't call me when local model.");
        }
        try {
            if (eventType == EndOfPartitionEvent.class) {
                return true;
            }
            if (eventType == EndOfSuperstepEvent.class) {
                return this.incrementEndOfSuperstepEventAndCheck();
            }
            if (event instanceof TaskEvent) {
                this.taskEventHandler.publish((TaskEvent)event);
                return false;
            }
            throw new IllegalStateException("Received unexpected event of type " + eventType + " at reader.");
        }
        catch (Throwable t) {
            throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t);
        }
    }

    public void publish(TaskEvent event) {
        this.taskEventHandler.publish(event);
    }

    @Override
    public void setIterativeReader() {
        this.isIterative = true;
    }

    @Override
    public void startNextSuperstep() {
        Preconditions.checkState((boolean)this.isIterative, (Object)"Tried to start next superstep in a non-iterative reader.");
        Preconditions.checkState((this.currentNumberOfEndOfSuperstepEvents == this.inputGate.getNumberOfInputChannels() ? 1 : 0) != 0, (Object)"Tried to start next superstep before reaching end of previous superstep.");
        this.currentNumberOfEndOfSuperstepEvents = 0;
    }

    @Override
    public boolean hasReachedEndOfSuperstep() {
        return this.isIterative && this.currentNumberOfEndOfSuperstepEvents == this.inputGate.getNumberOfInputChannels();
    }

    private boolean incrementEndOfSuperstepEventAndCheck() {
        Preconditions.checkState((boolean)this.isIterative, (Object)"Tried to increment superstep count in a non-iterative reader.");
        Preconditions.checkState((this.currentNumberOfEndOfSuperstepEvents + 1 <= this.inputGate.getNumberOfInputChannels() ? 1 : 0) != 0, (Object)("Received too many (" + this.currentNumberOfEndOfSuperstepEvents + ") end of superstep events."));
        return ++this.currentNumberOfEndOfSuperstepEvents == this.inputGate.getNumberOfInputChannels();
    }
}

