/*
 * Decompiled with CFR 0.152.
 */
package com.kingdee.eas.csinterface.agent.service.work.basedata.thread;

import com.kingdee.eas.csinterface.agent.service.work.basedata.thread.DiskQueueBucket;
import com.kingdee.util.marshal.IMarshalObject;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class DiskBufferedQueue<E extends IMarshalObject> {
    private static final long MIN_EACH_FILE_SIZE = 0x3200000L;
    private List<DiskQueueBucket<E>> diskBuckets;
    private LinkedBlockingQueue<E> ramQueue;
    private AtomicBoolean isInit;
    private final int bucketCount;
    private int writeBucketIndex;
    private int readBucketIndex;
    private Thread porterThread;
    private Object diskBucketReadableLock = new Object();
    private AtomicInteger elementCount = new AtomicInteger();
    private volatile boolean finishWrite;

    public DiskBufferedQueue(File queueDataDir, String fileName, int bucketCount, long maxTatalFileSize, int ramQueueSize) {
        if (bucketCount <= 1) {
            throw new IllegalArgumentException("bucketCount must greater than 1");
        }
        long minTotalMaxFileSize = (long)bucketCount * 0x3200000L;
        if (maxTatalFileSize < minTotalMaxFileSize) {
            throw new IllegalArgumentException("tatalMaxFileSize must greater than or equals to " + minTotalMaxFileSize);
        }
        long eachBucketSize = (long)Math.floor(maxTatalFileSize / (long)bucketCount);
        this.diskBuckets = new ArrayList<DiskQueueBucket<E>>(bucketCount);
        this.isInit = new AtomicBoolean(true);
        for (int i = 0; i < bucketCount; ++i) {
            this.diskBuckets.add(new DiskQueueBucket(queueDataDir, fileName + "_" + i, eachBucketSize));
        }
        this.bucketCount = bucketCount;
        this.ramQueue = new LinkedBlockingQueue(ramQueueSize);
        this.porterThread = new PorterThread(this);
        this.porterThread.setName("PorterThread-for-" + fileName);
        this.porterThread.setDaemon(true);
        this.porterThread.start();
    }

    public DiskBufferedQueue(File queueDataDir, String fileName, long maxTatalFileSize, int ramQueueSize) {
        this(queueDataDir, fileName, 2, maxTatalFileSize, ramQueueSize);
    }

    public DiskBufferedQueue(File queueDataDir, String fileName, long maxTatalFileSize) {
        this(queueDataDir, fileName, 2, maxTatalFileSize, 3000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        for (int i = 0; i < this.diskBuckets.size(); ++i) {
            this.diskBuckets.get(i).close();
        }
        Object object = this.diskBucketReadableLock;
        synchronized (object) {
            this.porterThread.interrupt();
        }
    }

    public boolean isWritable() {
        DiskQueueBucket<E> writeBucket = this.diskBuckets.get(this.writeBucketIndex);
        return writeBucket.isReadyToWrite();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finishWrite() {
        this.finishWrite = true;
        this.diskBuckets.get(this.writeBucketIndex).setReadyToRead();
        Object object = this.diskBucketReadableLock;
        synchronized (object) {
            this.diskBucketReadableLock.notify();
        }
    }

    public boolean isFinishWrite() {
        return this.finishWrite;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean offer(E e) {
        if (this.finishWrite) {
            throw new IllegalStateException("queue is finishWrite!");
        }
        if (this.isInit.get()) {
            if (!this.ramQueue.offer(e)) {
                DiskQueueBucket<E> writeBucket = this.diskBuckets.get(0);
                if (writeBucket.isReadyToWrite()) {
                    if (!writeBucket.add(e)) {
                        Object object = this.diskBucketReadableLock;
                        synchronized (object) {
                            this.diskBucketReadableLock.notify();
                        }
                        this.isInit.set(false);
                        ++this.writeBucketIndex;
                        if (this.writeBucketIndex >= this.bucketCount) {
                            this.writeBucketIndex = 0;
                        }
                        return false;
                    }
                    this.elementCount.incrementAndGet();
                    return true;
                }
                return false;
            }
            this.elementCount.incrementAndGet();
            return true;
        }
        DiskQueueBucket<E> writeBucket = this.diskBuckets.get(this.writeBucketIndex);
        while (writeBucket.isReadyToWrite()) {
            if (!writeBucket.add(e)) {
                Object object = this.diskBucketReadableLock;
                synchronized (object) {
                    this.diskBucketReadableLock.notify();
                }
                ++this.writeBucketIndex;
                if (this.writeBucketIndex >= this.bucketCount) {
                    this.writeBucketIndex = 0;
                }
                System.out.println("switch to next writeBucketIndex : " + this.writeBucketIndex);
                writeBucket = this.diskBuckets.get(this.writeBucketIndex);
                continue;
            }
            this.elementCount.incrementAndGet();
            return true;
        }
        return false;
    }

    public int size() {
        return this.elementCount.get();
    }

    public E take() throws InterruptedException {
        IMarshalObject e = (IMarshalObject)this.ramQueue.take();
        this.elementCount.decrementAndGet();
        return (E)e;
    }

    private static class PorterThread<E extends IMarshalObject>
    extends Thread {
        DiskBufferedQueue<E> diskBufferedQueue;

        public PorterThread(DiskBufferedQueue<E> diskBufferedQueue) {
            this.diskBufferedQueue = diskBufferedQueue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            DiskQueueBucket queueBucket = (DiskQueueBucket)((DiskBufferedQueue)this.diskBufferedQueue).diskBuckets.get(((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex);
            while (true) {
                if (!queueBucket.isReadyToRead()) {
                    Object object = ((DiskBufferedQueue)this.diskBufferedQueue).diskBucketReadableLock;
                    synchronized (object) {
                        if (!queueBucket.isReadyToRead()) {
                            try {
                                ((DiskBufferedQueue)this.diskBufferedQueue).diskBucketReadableLock.wait();
                            }
                            catch (InterruptedException _e) {
                                return;
                            }
                        }
                    }
                }
                while (queueBucket.size() > 0) {
                    Object e = queueBucket.poll();
                    try {
                        ((DiskBufferedQueue)this.diskBufferedQueue).ramQueue.put(e);
                    }
                    catch (InterruptedException _e) {
                        _e.printStackTrace();
                        return;
                    }
                }
                ((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex++;
                if (((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex >= ((DiskBufferedQueue)this.diskBufferedQueue).bucketCount) {
                    ((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex = 0;
                }
                System.out.println("switch to next readBucketIndex : " + ((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex);
                queueBucket = (DiskQueueBucket)((DiskBufferedQueue)this.diskBufferedQueue).diskBuckets.get(((DiskBufferedQueue)this.diskBufferedQueue).readBucketIndex);
            }
        }
    }
}

