/*
 * Decompiled with CFR 0.152.
 */
package com.kingdee.bos.qing.datasource.spec.qs;

import com.kingdee.bos.qing.datasource.exception.AbstractDataSourceException;
import com.kingdee.bos.qing.datasource.exception.DataSourcePersistenceException;
import com.kingdee.bos.qing.datasource.meta.MetaInfo;
import com.kingdee.bos.qing.datasource.spec.qs.QSDataSourceWriter;
import com.kingdee.bos.qing.filesystem.manager.api.IQingFile;
import com.kingdee.bos.qing.util.LogUtil;
import com.kingdee.bos.qing.util.ThreadPoolManage;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class QSDataSourceAsyncWriter
extends QSDataSourceWriter {
    private ExecutorService executor = Executors.newFixedThreadPool(1, (ThreadFactory)new ThreadPoolManage.NamedThreadFactory("Qing-QSDataSource-AsyncWriteTask"));
    private volatile boolean running = false;
    private volatile AtomicBoolean finishedWriteData = new AtomicBoolean(false);
    private volatile AtomicBoolean closed = new AtomicBoolean(false);
    private Future<?> writeFuture = null;
    private volatile DataSourcePersistenceException writingException;
    private LinkedBlockingQueue<Object[]> dataQueue = new LinkedBlockingQueue(10000);

    public QSDataSourceAsyncWriter(IQingFile dataFile) {
        super(dataFile);
    }

    @Override
    public void start(MetaInfo metaInfo) {
        super.start(metaInfo);
        this.running = true;
        FutureTask<Object> task = new FutureTask<Object>(new WriteTask());
        this.writeFuture = this.executor.submit(task);
    }

    private void appendRow(Object[] row) throws DataSourcePersistenceException, InterruptedException {
        if (!this.running) {
            if (this.writingException != null) {
                throw this.writingException;
            }
            throw new DataSourcePersistenceException("data write task is not running");
        }
        this.dataQueue.put(row);
    }

    @Override
    public void writeData(Object[] row) throws DataSourcePersistenceException, InterruptedException {
        this.appendRow(row);
    }

    @Override
    public void finishWriteData() throws DataSourcePersistenceException {
        if (this.finishedWriteData.getAndSet(true)) {
            return;
        }
        this.running = false;
        this.waitFinish();
        this.executor.shutdown();
        super.finishWriteData();
    }

    @Override
    public void close(Exception exception) {
        if (this.closed.getAndSet(true)) {
            return;
        }
        try {
            this.finishWriteData();
        }
        catch (DataSourcePersistenceException e) {
            LogUtil.error((String)"finish write data failed", (Throwable)e);
        }
        super.close(exception);
    }

    private void waitFinish() {
        if (null != this.writeFuture) {
            try {
                this.writeFuture.get();
            }
            catch (Exception e) {
                LogUtil.error((String)"get future error", (Throwable)e);
                return;
            }
        }
    }

    private class WriteTask
    implements Callable<Object> {
        private WriteTask() {
        }

        @Override
        public Object call() throws Exception {
            try {
                long pollNoDataTimes = 0L;
                while (QSDataSourceAsyncWriter.this.running) {
                    Object[] row = (Object[])QSDataSourceAsyncWriter.this.dataQueue.poll(100L, TimeUnit.MILLISECONDS);
                    if (null == row) {
                        if (++pollNoDataTimes < 120000L) continue;
                        QSDataSourceAsyncWriter.this.running = false;
                        LogUtil.warn((String)"qs async write task is stopped because no data writable in 3 hours");
                        continue;
                    }
                    pollNoDataTimes = 0L;
                    QSDataSourceAsyncWriter.super.writeData(row);
                }
                this.writeReserved();
                return null;
            }
            catch (Exception e) {
                if (e instanceof DataSourcePersistenceException) {
                    QSDataSourceAsyncWriter.this.writingException = (DataSourcePersistenceException)((Object)e);
                } else {
                    QSDataSourceAsyncWriter.this.writingException = new DataSourcePersistenceException((Throwable)e);
                }
                QSDataSourceAsyncWriter.this.running = false;
                QSDataSourceAsyncWriter.this.dataQueue.clear();
                return null;
            }
        }

        private void writeReserved() throws AbstractDataSourceException, InterruptedException {
            if (QSDataSourceAsyncWriter.this.dataQueue.size() > 0) {
                ArrayList reserved = new ArrayList();
                QSDataSourceAsyncWriter.this.dataQueue.drainTo(reserved);
                for (Object[] row : reserved) {
                    QSDataSourceAsyncWriter.super.writeData(row);
                }
            }
        }
    }
}

