/*
 * Decompiled with CFR 0.152.
 */
package kd.bos.algo.dataset.join;

import java.util.Iterator;
import kd.bos.algo.AlgoException;
import kd.bos.algo.Row;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.AbstractDataSet;
import kd.bos.algo.dataset.InnerRowIterator;
import kd.bos.algo.dataset.join.HHJMemory;
import kd.bos.algo.dataset.join.JoinDataSet;
import kd.bos.algo.dataset.join.JoinDataSetBuilder;
import kd.bos.algo.dataset.join.hhj.HhjJoinFunction;
import kd.bos.algo.dataset.join.hhj.HybridHashJoiner;
import kd.bos.algo.dataset.store.impl.RowConvert;
import kd.bos.algo.util.ConfigUtil;
import kd.bos.algox.RowX;
import kd.bos.algox.flink.type.RowXTypeInfo;
import kd.bos.algox.flink.type.TypeUtil;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.log4j.Logger;

public abstract class HHJDataSet
extends JoinDataSet {
    private static final Logger log = Logger.getLogger(HHJDataSet.class);
    private static final String KEY_THREADLIMIT = "algo.join.hhj.maxPerThread";
    private static final int default_threadlimit = 2;
    private HHJMemory memory;
    private HybridHashJoiner hhj;

    private static int getThreadLimit() {
        return ConfigUtil.getInteger(KEY_THREADLIMIT, 2);
    }

    public HHJDataSet(String name, AbstractDataSet leftDataSet, AbstractDataSet rightDataSet, JoinDataSetBuilder builder) {
        super(name, leftDataSet, rightDataSet, builder);
    }

    @Override
    protected InnerRowIterator createIterator() {
        this.checkClosed();
        AbstractDataSet leftDataSet = super.getInput(0);
        AbstractDataSet rightDataSet = super.getInput(1);
        final RowMeta leftRowMeta = leftDataSet.getRowMeta();
        final RowMeta rightRowMeta = rightDataSet.getRowMeta();
        RowXTypeInfo probeTypeInfo = TypeUtil.toRowXType(leftRowMeta);
        TypeSerializer<RowX> probeSerializer = probeTypeInfo.createSerializer(leftRowMeta);
        TypeComparator<RowX> probeComparator = probeTypeInfo.createComparator(leftRowMeta, this.leftOrderItems);
        RowXTypeInfo buildTypeInfo = TypeUtil.toRowXType(rightRowMeta);
        TypeSerializer<RowX> buildSerializer = buildTypeInfo.createSerializer(rightRowMeta);
        TypeComparator<RowX> buildComparator = buildTypeInfo.createComparator(rightRowMeta, this.rightOrderItems);
        GenericPairComparator recordPairComparator = new GenericPairComparator(probeComparator, buildComparator);
        MutableObjectIterator<RowX> probeX = RowConvert.toMutableRowXIterator(leftRowMeta, leftDataSet.innerIterator());
        MutableObjectIterator<RowX> buildX = RowConvert.toMutableRowXIterator(rightRowMeta, rightDataSet.innerIterator());
        HhjJoinFunction func = new HhjJoinFunction(){

            @Override
            public Row join(RowX probeRowX, RowX buildRowX) {
                return HHJDataSet.this.makeTargetRow(RowConvert.toRow(leftRowMeta, probeRowX), RowConvert.toRow(rightRowMeta, buildRowX));
            }
        };
        this.memory = this.getEnvironment().getHHJMemCache().acquire();
        try {
            this.hhj = this.createHhj(probeX, buildX, probeSerializer, probeComparator, buildSerializer, buildComparator, (TypePairComparator<RowX, RowX>)recordPairComparator, func, this.memory);
            this.hhj.open();
            InnerRowIterator innerRowIterator = this.makeIterator();
            return innerRowIterator;
        }
        catch (Exception e) {
            this.closeHHJ();
            if (e instanceof AlgoException) {
                throw (AlgoException)e;
            }
            throw new AlgoException(e);
        }
    }

    private InnerRowIterator makeIterator() {
        Iterator<Row> iter = this.hhj.getRowIterator();
        if (this.exceedThreadLimit()) {
            iter = this.convertToStore(iter);
            this.closeHHJ();
            return InnerRowIterator.wrapper(iter);
        }
        this.getEnvironment().registerHybridHashJoiner(this.hhj);
        return InnerRowIterator.wrapper(iter);
    }

    private Iterator<Row> convertToStore(Iterator<Row> iter) {
        this.createStore(iter);
        return this.store.getRowIterator();
    }

    private boolean exceedThreadLimit() {
        return this.getEnvironment().getHhjCount() >= HHJDataSet.getThreadLimit();
    }

    protected abstract HybridHashJoiner createHhj(MutableObjectIterator<RowX> var1, MutableObjectIterator<RowX> var2, TypeSerializer<RowX> var3, TypeComparator<RowX> var4, TypeSerializer<RowX> var5, TypeComparator<RowX> var6, TypePairComparator<RowX, RowX> var7, HhjJoinFunction var8, HHJMemory var9) throws MemoryAllocationException;

    @Override
    public void realClose() {
        this.closeHHJ();
        this.getEnvironment().unregisterHybridHashJoiner(this.hhj);
    }

    private void closeHHJ() {
        if (this.hhj != null) {
            try {
                this.hhj.closeAndGetFreedMemory();
            }
            catch (Exception e) {
                log.error((Object)e);
            }
            this.hhj = null;
        }
        if (this.memory != null) {
            this.getEnvironment().getHHJMemCache().recycle(this.memory);
        }
    }
}

