/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SpillableSubpartitionView
implements ResultSubpartitionView {
    private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
    private final SpillableSubpartition parent;
    private final ArrayDeque<BufferConsumer> buffers;
    private final IOManager ioManager;
    private final int memorySegmentSize;
    private final BufferAvailabilityListener listener;
    private final AtomicBoolean isReleased = new AtomicBoolean(false);
    private final long numBuffers;
    private BufferConsumer nextBuffer;
    private volatile SpilledSubpartitionView spilledView;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SpillableSubpartitionView(SpillableSubpartition parent, ArrayDeque<BufferConsumer> buffers, IOManager ioManager, int memorySegmentSize, BufferAvailabilityListener listener) {
        this.parent = (SpillableSubpartition)Preconditions.checkNotNull((Object)parent);
        this.buffers = (ArrayDeque)Preconditions.checkNotNull(buffers);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.memorySegmentSize = memorySegmentSize;
        this.listener = (BufferAvailabilityListener)Preconditions.checkNotNull((Object)listener);
        ArrayDeque<BufferConsumer> arrayDeque = buffers;
        synchronized (arrayDeque) {
            this.numBuffers = buffers.size();
            this.nextBuffer = buffers.poll();
        }
        if (this.nextBuffer != null) {
            listener.notifyDataAvailable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int releaseMemory() throws IOException {
        ArrayDeque<BufferConsumer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.spilledView != null || this.nextBuffer == null) {
                return 0;
            }
            BufferFileWriter spillWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
            long spilledBytes = 0L;
            int numBuffers = this.buffers.size();
            for (int i = 0; i < numBuffers; ++i) {
                try (BufferConsumer bufferConsumer = this.buffers.remove();){
                    Buffer buffer = bufferConsumer.build();
                    Preconditions.checkState((boolean)bufferConsumer.isFinished(), (Object)"BufferConsumer must be finished before spilling. Otherwise we would not be able to simply remove it from the queue. This should be guaranteed by creating ResultSubpartitionView only once Subpartition isFinished.");
                    this.parent.updateStatistics(buffer);
                    spilledBytes += (long)buffer.getSize();
                    spillWriter.writeBlock((Object)buffer);
                    continue;
                }
            }
            this.spilledView = new SpilledSubpartitionView(this.parent, this.memorySegmentSize, spillWriter, numBuffers, this.listener);
            LOG.debug("Spilling {} bytes for sub partition {} of {}.", new Object[]{spilledBytes, this.parent.index, this.parent.parent.getPartitionId()});
            return numBuffers;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
        Buffer current = null;
        boolean nextBufferIsEvent = false;
        int newBacklog = 0;
        boolean isMoreAvailable = false;
        ArrayDeque<BufferConsumer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased.get()) {
                return null;
            }
            if (this.nextBuffer != null) {
                current = this.nextBuffer.build();
                Preconditions.checkState((boolean)this.nextBuffer.isFinished(), (Object)"We can only read from SpillableSubpartition after it was finished");
                newBacklog = this.parent.decreaseBuffersInBacklogUnsafe(this.nextBuffer.isBuffer());
                this.nextBuffer.close();
                this.nextBuffer = this.buffers.poll();
                if (this.nextBuffer != null) {
                    nextBufferIsEvent = !this.nextBuffer.isBuffer();
                    isMoreAvailable = true;
                }
                this.parent.updateStatistics(current);
                if (this.spilledView == null) {
                    return new ResultSubpartition.BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
                }
            }
        }
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            if (current != null) {
                return new ResultSubpartition.BufferAndBacklog(current, spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent());
            }
            return spilled.getNextBuffer();
        }
        throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
    }

    @Override
    public void notifyDataAvailable() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            SpilledSubpartitionView spilled = this.spilledView;
            if (spilled != null) {
                spilled.releaseAllResources();
            }
            ArrayDeque<BufferConsumer> arrayDeque = this.buffers;
            synchronized (arrayDeque) {
                if (this.nextBuffer != null) {
                    this.nextBuffer.close();
                    this.nextBuffer = null;
                }
            }
        }
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            spilled.notifySubpartitionConsumed();
        } else {
            this.parent.onConsumedSubpartition();
        }
    }

    @Override
    public boolean isReleased() {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            return spilled.isReleased();
        }
        return this.parent.isReleased() || this.isReleased.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean nextBufferIsEvent() {
        ArrayDeque<BufferConsumer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased.get()) {
                return false;
            }
            if (this.nextBuffer != null) {
                return !this.nextBuffer.isBuffer();
            }
        }
        Preconditions.checkState((this.spilledView != null ? 1 : 0) != 0, (Object)"No in-memory buffers available, but also nothing spilled.");
        return this.spilledView.nextBufferIsEvent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isAvailable() {
        ArrayDeque<BufferConsumer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.nextBuffer != null) {
                return true;
            }
            if (this.spilledView == null) {
                return false;
            }
        }
        return this.spilledView.isAvailable();
    }

    @Override
    public Throwable getFailureCause() {
        SpilledSubpartitionView spilled = this.spilledView;
        if (spilled != null) {
            return spilled.getFailureCause();
        }
        return this.parent.getFailureCause();
    }

    public String toString() {
        boolean hasSpilled = this.spilledView != null;
        return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of ResultPartition %s", this.parent.index, this.numBuffers, hasSpilled, this.parent.parent.getPartitionId());
    }

    @Override
    public ResultSubpartition getResultSubpartition() {
        return this.parent;
    }
}

