/*
 * Decompiled with CFR 0.152.
 */
package kd.bos.algo.dataset.cache.kv;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kd.bos.algo.AlgoException;
import kd.bos.algo.CacheHint;
import kd.bos.algo.Row;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.AbstractRow;
import kd.bos.algo.dataset.cache.DataSetCacheMeta;
import kd.bos.algo.dataset.cache.DataSetCacheSpi;
import kd.bos.algo.dataset.cache.SimpleMetaImpl;
import kd.bos.algo.dataset.cache.TimeoutInfo;
import kd.bos.algo.serde.RowSerde;
import kd.bos.algo.storage.KVStorage;
import kd.bos.metric.Meter;
import kd.bos.metric.MetricSystem;
import kd.bos.metric.Timer;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;

public class KVSpiImpl
implements DataSetCacheSpi {
    private KVStorage storage;
    private RowSerde rowSerde;
    private Timer timer;
    private Meter meter;
    private Meter errorMeter;
    private Timer timerByName;
    private Meter meterByName;
    private Meter errorByName;
    MainThread mainThread = null;

    public KVSpiImpl(KVStorage storage) {
        this.storage = storage;
        this.timer = MetricSystem.timer((String)"kd.metrics.algo.cache.saveTimer");
        this.meter = MetricSystem.meter((String)"kd.metrics.algo.cache.saveMeter");
        this.errorMeter = MetricSystem.meter((String)"kd.metrics.algo.cache.saveMeter.error");
        this.timerByName = MetricSystem.timer((String)("kd.metrics.algo.cache.saveTimer." + storage.getName()));
        this.meterByName = MetricSystem.meter((String)("kd.metrics.algo.cache.saveMeter." + storage.getName()));
        this.errorByName = MetricSystem.meter((String)("kd.metrics.algo.cache.saveMeter.error." + storage.getName()));
    }

    private String getCacheId(CacheHint hint) {
        if (hint.getCacheId() != null) {
            return hint.getCacheId();
        }
        return this.generateId();
    }

    private String generateId() {
        return "datasetcache-" + UUID.randomUUID().toString().replace("-", "");
    }

    public String getPageKey(String key1, int page) {
        return "page" + page;
    }

    @Override
    public DataSetCacheMeta save(RowMeta rowMeta, Iterator<Row> iter, CacheHint hint) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        Timer.Context context = this.timer.time();
        Timer.Context contextByName = this.timerByName.time();
        String key1 = this.getCacheId(hint);
        KVStorage.KVWriter writer = null;
        try {
            writer = this.storage.create(key1, hint.getTimeout());
            TraceSpan span = Tracer.getCurrentSpan();
            int rowCount = this.writeRows2(span, writer, key1, rowMeta, iter, hint);
            this.meter.mark((long)rowCount);
            this.meterByName.mark((long)rowCount);
            SimpleMetaImpl meta = new SimpleMetaImpl(key1, rowMeta, rowCount, hint.getPageSize());
            this.writeMeta(writer, key1, meta);
            this.writeTimeout(writer, key1, hint);
            if (writer != null) {
                try {
                    writer.flush();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            SimpleMetaImpl simpleMetaImpl = meta;
            return simpleMetaImpl;
        }
        catch (IOException e) {
            this.errorMeter.mark();
            this.errorByName.mark();
            throw new AlgoException("can't write page: " + e.getMessage(), e);
        }
        finally {
            context.stop();
            contextByName.stop();
        }
    }

    private void writeTimeout(KVStorage.KVWriter writer, String key1, CacheHint hint) throws IOException {
        long timeout = hint.getTimeout();
        long createTime = System.currentTimeMillis();
        writer.put("timeout", ("" + timeout).getBytes());
        writer.put("createTime", ("" + createTime).getBytes());
    }

    private void writeMeta(KVStorage.KVWriter writer, String key1, SimpleMetaImpl meta) throws IOException {
        String key2 = "meta";
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(out);
        oos.writeObject(meta);
        out.flush();
        writer.put(key2, out.toByteArray());
    }

    private int writeRows(KVStorage.KVWriter writer, String key1, RowMeta rowMeta, Iterator<Row> iter, CacheHint hint) throws IOException {
        int rowCount = 0;
        while (iter.hasNext()) {
            int pageId = rowCount / hint.getPageSize();
            int writeRowCount = this.writePage(writer, key1, pageId, rowMeta, iter, hint);
            rowCount += writeRowCount;
        }
        return rowCount;
    }

    private int writeRows2(TraceSpan span, KVStorage.KVWriter writer, String key1, RowMeta rowMeta, Iterator<Row> iter, CacheHint hint) throws IOException {
        long t1 = System.currentTimeMillis();
        int rowCount = 0;
        ArrayList<Row> list = new ArrayList<Row>();
        int pageSize = hint.getPageSize();
        int pageId = 0;
        while (iter.hasNext()) {
            pageId = rowCount / pageSize;
            Row row = iter.next();
            row = ((AbstractRow)row).persist();
            list.add(row);
            if (++rowCount % pageSize != 0) continue;
            Page page = new Page(pageId, list);
            if (this.mainThread == null) {
                this.mainThread = new MainThread(writer, key1, rowMeta, hint);
            }
            this.mainThread.addPage(page);
            list = new ArrayList();
        }
        if (list.size() > 0) {
            Page page = new Page(pageId, list);
            if (this.mainThread != null) {
                this.mainThread.addPage(page);
            }
        }
        long cost = System.currentTimeMillis() - t1;
        span.addTag("iteratorRowCost", "" + cost, true);
        if (this.mainThread != null) {
            this.mainThread.end();
            this.mainThread.waitDone();
            span.addTag("processRowCost", "" + this.mainThread.cost, true);
        } else if (list.size() > 0) {
            Page page = new Page(pageId, list);
            this.writePage(writer, key1, rowMeta, page, hint);
        }
        return rowCount;
    }

    private int writePage(KVStorage.KVWriter writer, String key1, RowMeta rowMeta, Page page, CacheHint hint) throws IOException {
        return this.writePage(writer, key1, page.pageId, rowMeta, page.list.iterator(), hint);
    }

    private int writePage(KVStorage.KVWriter writer, String key1, int pageId, RowMeta rowMeta, Iterator<Row> iter, CacheHint hint) throws IOException {
        int rowCount;
        String key2 = this.getPageKey(key1, pageId);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(out);
        for (rowCount = 0; iter.hasNext() && rowCount < hint.getPageSize(); ++rowCount) {
            Row row = iter.next();
            this.writeRow(rowMeta, row, dout);
        }
        this.rowSerde.flush(dout);
        dout.flush();
        byte[] bytes = out.toByteArray();
        writer.put(key2, bytes);
        return rowCount;
    }

    private void writeRow(RowMeta rowMeta, Row row, DataOutputStream out) {
        this.rowSerde.write(rowMeta, row, out);
    }

    private Row readRow(RowMeta rowMeta, DataInputStream in) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        return this.rowSerde.read(rowMeta, in);
    }

    @Override
    public DataSetCacheMeta getMeta(String id) {
        KVStorage.KVReader reader = null;
        try {
            reader = this.storage.open(id);
            if (reader == null) {
                return null;
            }
            byte[] bytes = reader.get("meta");
            if (bytes == null) {
                return null;
            }
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            return (DataSetCacheMeta)ois.readObject();
        }
        catch (Exception e) {
            throw new AlgoException("can't getMeta: " + e.getMessage(), e);
        }
    }

    @Override
    public TimeoutInfo getTimeoutInfo(String id) {
        KVStorage.KVReader reader = null;
        try {
            reader = this.storage.open(id);
            String[] keys = new String[]{"timeout", "createTime"};
            List<byte[]> bytess = reader.get(keys);
            String sTimeout = new String(bytess.get(0));
            String sCreateTime = new String(bytess.get(1));
            TimeoutInfo info = new TimeoutInfo();
            info.setCreateTime(Long.parseLong(sCreateTime));
            info.setTimeout(Integer.parseInt(sTimeout));
            return info;
        }
        catch (IOException e) {
            throw new AlgoException("error getTimeoutInfo: " + e.getMessage(), e);
        }
    }

    @Override
    public void delete(DataSetCacheMeta meta) {
        this.delete(meta.getId());
    }

    @Override
    public void delete(String id) {
        try {
            this.storage.delete(id);
        }
        catch (IOException e) {
            throw new AlgoException("error delete: " + e.getMessage(), e);
        }
    }

    @Override
    public Iterator<Row> getIterator(DataSetCacheMeta meta, int begin, int length) {
        return this.getList(meta, begin, length).iterator();
    }

    @Override
    public List<Row> getList(DataSetCacheMeta meta, int begin, int length) {
        Throwable throwable = null;
        try (TraceSpan span = Tracer.create((String)"DataSet", (String)"Cache.KVSpi.getList");){
            int offset;
            int pageId;
            int pageSize;
            int rowCount;
            RowMeta rowMeta;
            String id = meta.getId();
            span.addTag("id", id);
            span.addTag("begin", String.valueOf(begin));
            span.addTag("length", String.valueOf(length));
            KVStorage.KVReader reader = null;
            try {
                reader = this.storage.open(id);
                rowMeta = meta.getRowMeta();
                rowCount = meta.getRowCount();
                if (rowCount == 0) {
                    List<Row> list = Collections.emptyList();
                    return list;
                }
                pageSize = meta.getPageSize();
                pageId = begin / pageSize;
                offset = begin % pageSize;
            }
            catch (IOException e) {
                try {
                    throw new AlgoException("error when get dataset cache iterator.", e);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
            String key2 = this.getPageKey(id, pageId);
            byte[] bytes = reader.get(key2);
            if (bytes == null) {
                throw AlgoException.create("kv cache not exists for id:%s,pageId:%d", id, pageId);
            }
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            DataInputStream din = new DataInputStream(in);
            ArrayList<Row> result = new ArrayList<Row>();
            int index = 0;
            int add = 0;
            while (add < length && begin + add < rowCount) {
                Row row = this.readRow(rowMeta, din);
                if (index < offset) {
                    ++index;
                    continue;
                }
                result.add(row);
                if (++index % pageSize != 0 || begin + ++add >= rowCount) continue;
                offset = 0;
                ((InputStream)in).close();
                key2 = this.getPageKey(id, ++pageId);
                bytes = reader.get(key2);
                in = new ByteArrayInputStream(bytes);
                din = new DataInputStream(in);
            }
            ArrayList<Row> arrayList = result;
            return arrayList;
        }
    }

    class Page {
        List<Row> list;
        int pageId;

        Page(int pageId, List<Row> list) {
            this.pageId = pageId;
            this.list = list;
        }
    }

    class MainThread
    extends Thread {
        LinkedBlockingQueue<Page> queue = new LinkedBlockingQueue(4);
        boolean started = false;
        boolean end = false;
        CountDownLatch latch = new CountDownLatch(1);
        long cost = 0L;
        private KVStorage.KVWriter writer;
        private String key1;
        private RowMeta rowMeta;
        private CacheHint hint;
        private AlgoException error;

        public MainThread(KVStorage.KVWriter writer, String key1, RowMeta rowMeta, CacheHint hint) {
            this.writer = writer;
            this.key1 = key1;
            this.rowMeta = rowMeta;
            this.hint = hint;
        }

        void end() {
            this.end = true;
        }

        void addPage(Page page) {
            if (this.error != null) {
                this.latch.countDown();
                throw this.error;
            }
            try {
                this.queue.put(page);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (!this.started) {
                this.start();
                this.started = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Page page = null;
            try {
                while (true) {
                    try {
                        page = this.queue.poll(10L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (page == null) {
                        if (!this.end) continue;
                        break;
                    }
                    long t1 = System.currentTimeMillis();
                    this.processOne(page);
                    this.cost += System.currentTimeMillis() - t1;
                }
            }
            finally {
                this.latch.countDown();
            }
        }

        private void processOne(Page page) {
            try {
                KVSpiImpl.this.writePage(this.writer, this.key1, this.rowMeta, page, this.hint);
            }
            catch (AlgoException e) {
                this.error = e;
                this.end = true;
            }
            catch (Exception e) {
                this.error = new AlgoException(e);
                this.end = true;
            }
        }

        public void waitDone() {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

