/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.reader;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import kd.bos.algox.AlgoXException;
import kd.bos.algox.RowX;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.hack.io.network.DataTarget;
import org.apache.flink.runtime.hack.io.network.DataTransfer;
import org.apache.flink.runtime.hack.io.network.LocalMode;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;

abstract class AbstractRecordReader<T extends IOReadableWritable>
extends AbstractReader
implements ReaderBase {
    private final RecordDeserializer<T>[] recordDeserializers;
    private RecordDeserializer<T> currentRecordDeserializer;
    private boolean isFinished;
    private DataTransfer[] dataTransfers;
    DataTarget dataTarget;

    protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
        super(inputGate);
        if (LocalMode.isLocal()) {
            this.dataTarget = new DataTarget(10000);
            this.dataTransfers = inputGate instanceof SingleInputGate ? ((SingleInputGate)inputGate).getDataTransfers() : ((UnionInputGate)inputGate).getDataTransfers();
            for (DataTransfer dataTransfer : this.dataTransfers) {
                dataTransfer.addTarget(this.dataTarget);
            }
        }
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; ++i) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(tmpDirectories);
        }
    }

    private boolean getNextRecordForLocalMode(Object target) throws InterruptedException {
        if (target instanceof DeserializationDelegate) {
            do {
                RowX r;
                if ((r = (RowX)this.dataTarget.poll(1L, TimeUnit.MILLISECONDS)) == null) continue;
                ((DeserializationDelegate)target).setInstance((Object)r);
                return true;
            } while (!this.dataTarget.isFinished());
            this.isFinished = true;
            return false;
        }
        throw new AlgoXException("Not support target type: " + target.getClass());
    }

    protected boolean getNextRecord(T target) throws IOException, InterruptedException {
        if (this.isFinished) {
            return false;
        }
        if (this.dataTarget != null) {
            return this.getNextRecordForLocalMode(target);
        }
        while (true) {
            BufferOrEvent bufferOrEvent;
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(target);
                if (result.isBufferConsumed()) {
                    Buffer currentBuffer = this.currentRecordDeserializer.getCurrentBuffer();
                    currentBuffer.recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (result.isFullRecord()) {
                    return true;
                }
            }
            if ((bufferOrEvent = this.inputGate.getNextBufferOrEvent().orElseThrow(IllegalStateException::new)).isBuffer()) {
                this.currentRecordDeserializer = this.recordDeserializers[bufferOrEvent.getChannelIndex()];
                this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                continue;
            }
            if (this.recordDeserializers[bufferOrEvent.getChannelIndex()].hasUnfinishedData()) {
                throw new IOException("Received an event in channel " + bufferOrEvent.getChannelIndex() + " while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo, check the respective Kryo serializer.");
            }
            if (!this.handleEvent(bufferOrEvent.getEvent())) continue;
            if (this.inputGate.isFinished()) {
                this.isFinished = true;
                return false;
            }
            if (this.hasReachedEndOfSuperstep()) break;
        }
        return false;
    }

    public void clearBuffers() {
        for (RecordDeserializer<T> deserializer : this.recordDeserializers) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
        }
    }
}

