/*
 * Decompiled with CFR 0.152.
 */
package kd.bos.algo.dataset.input;

import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kd.bos.algo.AlgoException;
import kd.bos.algo.InputExecutor;
import kd.bos.algo.Row;
import kd.bos.algo.dataset.AbstractRow;
import kd.bos.algo.dataset.InnerRowIterator;

public class InputExecutorsRowIterator
extends InnerRowIterator {
    private InputExecutor<?>[] executors;
    private int fetchSize = 1000;
    private ReentrantLock allFetchingLock = new ReentrantLock();
    private Condition allFetchingCondition = this.allFetchingLock.newCondition();
    private static int THREADID = 0;

    public InputExecutorsRowIterator(InputExecutor<?>[] executors) {
        this.executors = executors;
        this.init();
    }

    private void init() {
        for (InputExecutor<?> executor : this.executors) {
            executor.open();
        }
    }

    @Override
    protected boolean _hasNext() {
        return false;
    }

    @Override
    protected Row _next() {
        return null;
    }

    class FetchThread
    implements Runnable {
        private InputExecutor<?> executor;
        private ArrayList<Row> cachedRows = null;
        private ArrayList<Row> fetchingRows = new ArrayList();
        private boolean fetchEof = false;
        private boolean fetching = false;
        private ReentrantLock lock = new ReentrantLock();
        private Condition condition = this.lock.newCondition();
        int pos = 0;
        private Exception fetchException;

        FetchThread(InputExecutor<?> executor) {
            this.executor = executor;
            new Thread((Runnable)this, "Algo-Input-FetchThread-" + THREADID++).start();
        }

        @Override
        public void run() {
            this.executor.open();
            try {
                this.lock.lock();
                while (!this.fetchEof && this.fetchException != null) {
                    try {
                        this.condition.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    this.fetch();
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        public NextResult next() {
            if (this.fetchException != null) {
                if (this.fetchException instanceof AlgoException) {
                    throw (AlgoException)this.fetchException;
                }
                throw new AlgoException(this.fetchException);
            }
            if (this.fetching) {
                return NextResult.FETCHING;
            }
            if (this.fetchEof && this.fetchingRows == null) {
                return NextResult.EOF;
            }
            if (this.cachedRows == null) {
                if (this.fetchingRows != null) {
                    this.cachedRows = this.fetchingRows;
                    this.fetchingRows = null;
                    this.pos = 0;
                    return this.nextFromCache();
                }
                this.fetching = true;
                this.doFetch();
                return NextResult.FETCHING;
            }
            if (this.pos < this.cachedRows.size()) {
                return this.nextFromCache();
            }
            this.cachedRows = null;
            this.fetching = true;
            this.doFetch();
            return NextResult.FETCHING;
        }

        private void doFetch() {
            try {
                this.lock.lock();
                this.condition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }

        private NextResult nextFromCache() {
            return new NextResult(this.cachedRows.get(this.pos++));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fetch() {
            try {
                this.fetching = true;
                int count = 0;
                this.fetchingRows = new ArrayList();
                while (count++ < InputExecutorsRowIterator.this.fetchSize) {
                    if (this.executor.hasNext()) {
                        AbstractRow row = ((AbstractRow)this.executor.next()).persist();
                        this.fetchingRows.add(row);
                        continue;
                    }
                    this.fetchEof = true;
                    break;
                }
            }
            catch (Exception e) {
                this.fetchEof = true;
                this.fetchException = e;
            }
            finally {
                this.fetching = false;
                try {
                    InputExecutorsRowIterator.this.allFetchingLock.lock();
                    InputExecutorsRowIterator.this.allFetchingCondition.signal();
                }
                finally {
                    InputExecutorsRowIterator.this.allFetchingLock.unlock();
                }
            }
        }
    }

    static class NextResult {
        Row row;
        boolean eof;
        boolean fetching;
        static NextResult EOF = new NextResult(null, true, false);
        static NextResult FETCHING = new NextResult(null, false, true);

        NextResult(Row row) {
            this.row = row;
        }

        NextResult(Row row, boolean eof, boolean fetching) {
            this.row = row;
            this.eof = eof;
            this.fetching = fetching;
        }
    }
}

