/*
 * Decompiled with CFR 0.152.
 */
package kd.bos.algo.dataset.store.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kd.bos.algo.AlgoException;
import kd.bos.algo.Row;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.Iterators;
import kd.bos.algo.dataset.store.impl.RowConvert;
import kd.bos.algo.dataset.store.sort.MergeIterable;
import kd.bos.algo.dataset.store.sort.RowOrderComparator;
import kd.bos.algo.dataset.store.sort.SortThreadPools;
import kd.bos.algo.dataset.store.sort.TashaSortBuffer;
import kd.bos.algo.dataset.store.spill.DummyInvokable;
import kd.bos.algo.dataset.store.spill.MemIO;
import kd.bos.algo.dataset.store.spill.MemIOFactory;
import kd.bos.algo.dataset.store.spill.SpillingWriter;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;

public class TashaSortMerger {
    private BlockingQueue<InputElement> inputQueue = new LinkedBlockingQueue<InputElement>();
    private BlockingQueue<BufferElement> emptyQueue = new LinkedBlockingQueue<BufferElement>();
    private BlockingQueue<BufferElement> sortQueue = new LinkedBlockingQueue<BufferElement>();
    private BlockingQueue<BufferElement> spillQueue = new LinkedBlockingQueue<BufferElement>();
    private boolean startSpill = false;
    private RowOrderComparator comparator = null;
    private RowMeta rowMeta;
    private boolean threadStarted = false;
    private boolean readFinished;
    private int rowCount = 0;
    private Object rowCountLock = new Object();
    private boolean closed;
    private static BufferElement SPILL_ELEMENT = new BufferElement();
    private static BufferElement EOF_ELEMENT = new BufferElement();
    private static InputElement EOF_INPUT = new InputElement(null);
    private Iterable<Row> result;
    private Exception resultException;
    private ArrayList<SpillingWriter> writers = new ArrayList();
    private Object resultLock = new Object();

    public TashaSortMerger(RowMeta rowMeta, int[] orderIndexes, boolean[] descs, int bufferCount, int bufferMaxRowCount) {
        this.rowMeta = rowMeta;
        this.comparator = new RowOrderComparator(orderIndexes, descs);
        for (int i = 0; i < bufferCount; ++i) {
            TashaSortBuffer buffer = new TashaSortBuffer(this.comparator, bufferMaxRowCount);
            this.emptyQueue.add(new BufferElement(buffer));
        }
    }

    public void writeInput(Iterator<Row> input) {
        this.inputQueue.offer(new InputElement(input));
        if (!this.threadStarted) {
            this.initThread(1, 1);
            this.threadStarted = true;
        }
    }

    public void writeEof() {
        if (this.result == null && this.resultException == null) {
            this.inputQueue.offer(EOF_INPUT);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getRowCount() {
        while (!this.readFinished && this.resultException == null) {
            Object object = this.rowCountLock;
            synchronized (object) {
                try {
                    this.rowCountLock.wait(5L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
        if (this.resultException != null) {
            throw new AlgoException("Error obtaining the sorted input: " + this.resultException.getMessage(), this.resultException);
        }
        return this.rowCount;
    }

    public Iterator<Row> takeInput() {
        while (true) {
            try {
                InputElement element;
                while ((element = this.inputQueue.take()) == null) {
                }
                if (element == EOF_INPUT) {
                    return null;
                }
                return element.iter;
            }
            catch (InterruptedException interruptedException) {
                continue;
            }
            break;
        }
    }

    public void close() {
        this.closed = true;
        this.emptyQueue.clear();
        this.sortQueue.clear();
        this.spillQueue.clear();
        this.writers.forEach(writer -> {
            try {
                writer.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        });
    }

    private void initThread(int sortParallel, int spillParallel) {
        int i;
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>(){

            public void handleException(IOException exception) {
                if (!TashaSortMerger.this.closed) {
                    TashaSortMerger.this.setResultException(exception);
                    TashaSortMerger.this.close();
                }
            }
        };
        RunnableBase runnable = new ReadingRunnable(exceptionHandler, "Algo-SortMerger-Read");
        SortThreadPools.execute(runnable);
        for (i = 0; i < sortParallel; ++i) {
            runnable = new SortRunnable(exceptionHandler, "Algo-SortMerger-Sort");
            SortThreadPools.execute(runnable);
        }
        for (i = 0; i < spillParallel; ++i) {
            runnable = new SpillRunnable(exceptionHandler, "Algo-SortMerger-Spill");
            SortThreadPools.execute(runnable);
        }
    }

    private void registerWriter(SpillingWriter writer) {
        this.writers.add(writer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setResult(Iterable<Row> result) {
        Object object = this.resultLock;
        synchronized (object) {
            if (this.resultException == null) {
                this.result = result;
                this.resultLock.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setResultException(Exception exception) {
        Object object = this.resultLock;
        synchronized (object) {
            if (this.resultException == null) {
                this.resultException = exception;
                this.resultLock.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Iterator<Row> getIterator() throws InterruptedException {
        Object object = this.resultLock;
        synchronized (object) {
            while (this.result == null && this.resultException == null) {
                this.resultLock.wait();
            }
            if (this.resultException != null) {
                throw new AlgoException("Error obtaining the sorted input: " + this.resultException.getMessage(), this.resultException);
            }
            return this.result.iterator();
        }
    }

    protected abstract class RunnableBase
    implements Runnable,
    Thread.UncaughtExceptionHandler {
        private final ExceptionHandler<IOException> exceptionHandler;
        private volatile boolean alive;
        private String name;

        protected RunnableBase(ExceptionHandler<IOException> exceptionHandler, String name) {
            this.name = name;
            this.exceptionHandler = exceptionHandler;
            this.alive = true;
        }

        @Override
        public void run() {
            String oldName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(this.name);
                Thread.currentThread().setUncaughtExceptionHandler(this);
                this.go();
            }
            catch (Throwable t) {
                this.internalHandleException(new IOException("Thread '" + this.name + "' terminated due to an exception: " + t.getMessage(), t));
            }
            finally {
                if (oldName != null) {
                    Thread.currentThread().setName(oldName);
                }
                Thread.currentThread().setUncaughtExceptionHandler(null);
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
        }

        protected final void internalHandleException(IOException ioex) {
            if (!this.isRunning()) {
                return;
            }
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException((Throwable)ioex);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.internalHandleException(new IOException("Thread '" + t.getName() + "' terminated due to an uncaught exception: " + e.getMessage(), e));
        }
    }

    private class SpillRunnable
    extends RunnableBase {
        protected SpillRunnable(ExceptionHandler<IOException> exceptionHandler, String name) {
            super(exceptionHandler, name);
        }

        @Override
        protected void go() throws IOException {
            BufferElement element;
            ArrayDeque<BufferElement> cache = new ArrayDeque<BufferElement>();
            boolean cacheOnly = false;
            while (!TashaSortMerger.this.closed && this.isRunning()) {
                try {
                    element = (BufferElement)TashaSortMerger.this.spillQueue.poll(100L, TimeUnit.MILLISECONDS);
                    if (element == null) {
                        continue;
                    }
                }
                catch (InterruptedException iex) {
                    throw new IOException("The spilling thread was interrupted.");
                }
                if (element == SPILL_ELEMENT) break;
                if (element == EOF_ELEMENT) {
                    cacheOnly = true;
                    break;
                }
                cache.add(element);
            }
            if (TashaSortMerger.this.closed || !this.isRunning()) {
                return;
            }
            if (cacheOnly) {
                if (cache.isEmpty()) {
                    TashaSortMerger.this.setResult(Iterators.emptyable);
                } else if (cache.size() == 1) {
                    TashaSortMerger.this.setResult(((BufferElement)cache.poll()).buffer);
                } else {
                    ArrayList<Iterable<Row>> iterables = new ArrayList<Iterable<Row>>();
                    for (BufferElement ele : cache) {
                        iterables.add(ele.buffer);
                    }
                    MergeIterable mergeIterable = new MergeIterable(iterables, TashaSortMerger.this.comparator);
                    TashaSortMerger.this.setResult(mergeIterable);
                }
                return;
            }
            while (!TashaSortMerger.this.closed) {
                try {
                    element = this.takeNext(cache);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) continue;
                    return;
                }
                if (TashaSortMerger.this.closed || !this.isRunning()) {
                    return;
                }
                if (element == EOF_ELEMENT) break;
                MemIO memIO = MemIOFactory.createForSpillBuffer();
                SpillingWriter writer = new SpillingWriter(TashaSortMerger.this.rowMeta, memIO.getMemoryManager(), memIO.getIoManager(), memIO.getMemoryManager().getTotalNumPages(), new DummyInvokable());
                TashaSortMerger.this.registerWriter(writer);
                writer.writeRowIter(element.buffer.iterator());
                element.buffer.reset();
                TashaSortMerger.this.emptyQueue.add(element);
            }
            ArrayList<Iterable<Row>> iterables = new ArrayList<Iterable<Row>>();
            for (SpillingWriter writer : TashaSortMerger.this.writers) {
                Iterable<Row> iter = RowConvert.toRowIterable(TashaSortMerger.this.rowMeta, writer);
                iterables.add(iter);
            }
            MergeIterable mergeIterable = new MergeIterable(iterables, TashaSortMerger.this.comparator);
            TashaSortMerger.this.setResult(mergeIterable);
        }

        private BufferElement takeNext(Queue<BufferElement> cache) throws InterruptedException {
            return cache.isEmpty() ? (BufferElement)TashaSortMerger.this.spillQueue.take() : cache.poll();
        }
    }

    private class SortRunnable
    extends RunnableBase {
        protected SortRunnable(ExceptionHandler<IOException> exceptionHandler, String name) {
            super(exceptionHandler, name);
        }

        @Override
        protected void go() throws IOException {
            BufferElement element = null;
            while (!TashaSortMerger.this.closed) {
                while (!TashaSortMerger.this.closed) {
                    try {
                        element = (BufferElement)TashaSortMerger.this.sortQueue.poll(100L, TimeUnit.MILLISECONDS);
                        if (element == null) continue;
                        if (element == SPILL_ELEMENT) {
                            TashaSortMerger.this.spillQueue.add(element);
                            continue;
                        }
                        if (element == EOF_ELEMENT) {
                            TashaSortMerger.this.spillQueue.add(element);
                            this.end();
                            return;
                        }
                        element.buffer.doSort();
                        TashaSortMerger.this.spillQueue.add(element);
                        element = null;
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
            TashaSortMerger.this.spillQueue.add(EOF_ELEMENT);
        }

        private void end() {
        }
    }

    private class ReadingRunnable
    extends RunnableBase {
        protected ReadingRunnable(ExceptionHandler<IOException> exceptionHandler, String name) {
            super(exceptionHandler, name);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void go() throws IOException {
            Iterator<Row> input = null;
            BufferElement element = null;
            TashaSortBuffer buffer = null;
            Row failedRow = null;
            block5: while (!TashaSortMerger.this.closed && (input = TashaSortMerger.this.takeInput()) != null) {
                while (!TashaSortMerger.this.closed) {
                    while (!TashaSortMerger.this.closed && buffer == null) {
                        try {
                            element = (BufferElement)TashaSortMerger.this.emptyQueue.poll(100L, TimeUnit.MILLISECONDS);
                            if (element == null) continue;
                            buffer = element.buffer;
                            if (failedRow == null) break;
                            buffer.addRow(failedRow);
                            failedRow = null;
                            break;
                        }
                        catch (InterruptedException interruptedException) {
                        }
                    }
                    boolean fullBuffer = false;
                    while (!TashaSortMerger.this.closed && input.hasNext()) {
                        Row row = input.next();
                        TashaSortMerger.this.rowCount++;
                        if (buffer.addRow(row)) continue;
                        failedRow = row;
                        fullBuffer = true;
                        break;
                    }
                    if (!fullBuffer) continue block5;
                    TashaSortMerger.this.sortQueue.add(element);
                    element = null;
                    buffer = null;
                    if (TashaSortMerger.this.startSpill) continue;
                    TashaSortMerger.this.startSpill = true;
                    TashaSortMerger.this.sortQueue.add(SPILL_ELEMENT);
                }
            }
            if (element != null) {
                TashaSortMerger.this.sortQueue.add(element);
            }
            TashaSortMerger.this.sortQueue.add(EOF_ELEMENT);
            TashaSortMerger.this.readFinished = true;
            Object object = TashaSortMerger.this.rowCountLock;
            synchronized (object) {
                TashaSortMerger.this.rowCountLock.notifyAll();
            }
        }
    }

    private static class InputElement {
        Iterator<Row> iter;

        InputElement(Iterator<Row> iter) {
            this.iter = iter;
        }
    }

    private static class BufferElement {
        TashaSortBuffer buffer;

        BufferElement() {
        }

        BufferElement(TashaSortBuffer buffer) {
            this.buffer = buffer;
        }
    }
}

