/*
 * Decompiled with CFR 0.152.
 */
package kd.bos.algox.flink.core.translate;

import java.util.Map;
import kd.bos.algo.AlgoException;
import kd.bos.algo.util.Tuple2;
import kd.bos.algox.DataSetX;
import kd.bos.algox.JoinDataSetX;
import kd.bos.algox.RowX;
import kd.bos.algox.core.AbstractDataSetX;
import kd.bos.algox.core.JoinSelectedField;
import kd.bos.algox.flink.core.myfunc.MyJoinFunction;
import kd.bos.algox.flink.core.myfunc.MyJoinFunction2;
import kd.bos.algox.flink.core.myfunc.MyJoinFunction3;
import kd.bos.algox.flink.core.translate.Translate;
import kd.bos.algox.flink.core.translate.Translator;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.JoinOperator;

public class JoinTranslator
extends Translator<JoinDataSetX> {
    public JoinTranslator(JoinDataSetX x) {
        super(x);
    }

    @Override
    public DataSet<RowX> go(ExecutionEnvironment env, Map<DataSetX, DataSet<RowX>> dup) {
        JoinOperator.JoinOperatorSets joins;
        AbstractDataSetX xs1 = (AbstractDataSetX)((JoinDataSetX)this.x).getSource();
        DataSet<RowX> source1 = Translate.translateDataSet(env, (DataSetX)xs1, dup);
        AbstractDataSetX xs2 = (AbstractDataSetX)((JoinDataSetX)this.x).getSources().get(1);
        DataSet<RowX> source2 = Translate.translateDataSet(env, (DataSetX)xs2, dup);
        int onSize = ((JoinDataSetX)this.x).getOnList().size();
        if (onSize == 0) {
            throw new AlgoException("Join on required.");
        }
        int[] whereFields = new int[onSize];
        int[] equalsFields = new int[onSize];
        for (int i = 0; i < onSize; ++i) {
            Tuple2 tuple = (Tuple2)((JoinDataSetX)this.x).getOnList().get(i);
            whereFields[i] = xs1.getRowMeta().getFieldIndex((String)tuple.t1);
            equalsFields[i] = xs2.getRowMeta().getFieldIndex((String)tuple.t2);
        }
        RichJoinFunction<RowX, RowX, RowX> func = this.createJoin();
        switch (((JoinDataSetX)this.x).getJoinType()) {
            case INNER: {
                joins = source1.join(source2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE);
                break;
            }
            case LEFT: {
                joins = source1.leftOuterJoin(source2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE);
                break;
            }
            case RIGHT: {
                joins = source1.rightOuterJoin(source2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE);
                break;
            }
            case FULL: {
                joins = source1.fullOuterJoin(source2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE);
                break;
            }
            default: {
                throw new AlgoException("not support join type now.");
            }
        }
        JoinOperator operator = joins.where(whereFields).equalTo(equalsFields).with(func);
        operator.name("Join at " + ((JoinDataSetX)this.x).getLocation());
        return operator;
    }

    private RichJoinFunction<RowX, RowX, RowX> createJoin() {
        if (((JoinDataSetX)this.x).getFunc() != null) {
            return new MyJoinFunction(((JoinDataSetX)this.x).getFunc());
        }
        if (((JoinDataSetX)this.x).getSelectFields().isEmpty()) {
            return new MyJoinFunction3(((JoinDataSetX)this.x).getRowMeta());
        }
        int[] indexes = new int[((JoinDataSetX)this.x).getSelectFields().size()];
        boolean[] lefts = new boolean[indexes.length];
        for (int i = 0; i < indexes.length; ++i) {
            JoinSelectedField sf = (JoinSelectedField)((JoinDataSetX)this.x).getSelectFields().get(i);
            indexes[i] = sf.fieldIndex;
            lefts[i] = sf.left;
        }
        return new MyJoinFunction2(((JoinDataSetX)this.x).getRowMeta(), indexes, lefts);
    }
}

