/*
 * Decompiled with CFR 0.152.
 */
package com.kingdee.bos.qing.datasource.join.worknodes;

import com.kingdee.bos.qing.common.memory.MemWarningLevel;
import com.kingdee.bos.qing.common.memory.MemoryObserver;
import com.kingdee.bos.qing.datasource.exception.JoinRejectedExecutionException;
import com.kingdee.bos.qing.datasource.join.base.ForkJoinPool;
import com.kingdee.bos.qing.datasource.join.config.QingJoinConfig;
import com.kingdee.bos.qing.datasource.join.task.JoinRootTask;
import com.kingdee.bos.qing.datasource.join.task.JoinTaskRequest;
import com.kingdee.bos.qing.datasource.join.task.JoinTaskRuntime;
import com.kingdee.bos.qing.datasource.join.task.TaskEvent;
import com.kingdee.bos.qing.datasource.join.task.TaskEventListener;
import com.kingdee.bos.qing.datasource.join.task.TaskScale;
import com.kingdee.bos.qing.datasource.join.taskadvise.TaskCancelType;
import com.kingdee.bos.qing.datasource.join.util.JoinUtil;
import com.kingdee.bos.qing.datasource.join.worknodes.GlobalTaskCounter;
import com.kingdee.bos.qing.datasource.join.worknodes.JoinTaskManager;
import com.kingdee.bos.qing.datasource.join.worknodes.NodeState;
import com.kingdee.bos.qing.datasource.join.worknodes.NodeStateChangLisener;
import com.kingdee.bos.qing.datasource.join.worknodes.NodeStateChangeEvent;
import com.kingdee.bos.qing.datasource.join.worknodes.NodeStateType;
import com.kingdee.bos.qing.datasource.join.worknodes.TaskRetryExecutor;
import com.kingdee.bos.qing.util.LogUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class JoinWorkNode {
    private ForkJoinPool forkJoinPool;
    private Map<String, JoinTaskRuntime> taskRuntimeMaps = new HashMap<String, JoinTaskRuntime>();
    private TaskEventListener taskEventListener = new TaskListener();
    private NodeStateChangLisener stateChangLisener;
    private volatile NodeState nodeState;
    private static int workNodeCount = 0;
    private String myWorkNodeId;
    private LinkedBlockingQueue<EventRunner> eventQueues = new LinkedBlockingQueue();
    private ExecutorService eventExecutor = Executors.newSingleThreadExecutor();
    private GlobalTaskCounter globalTaskCounter;

    JoinWorkNode(int poolSize, GlobalTaskCounter globalTaskCounter) {
        this.forkJoinPool = new ForkJoinPool(poolSize);
        this.nodeState = NodeState.initState(poolSize);
        this.globalTaskCounter = globalTaskCounter;
        this.myWorkNodeId = "WorkNode_" + workNodeCount++;
        this.eventExecutor.submit(new EventExecuteLoop());
    }

    JoinTaskRuntime submitJoin(JoinTaskRequest taskRequest) throws InterruptedException, JoinRejectedExecutionException {
        JoinTaskRuntime runtime = this.createTaskRuntime(taskRequest);
        SubmitEvent event = new SubmitEvent(runtime);
        if (!this.eventQueues.offer(event)) {
            LogUtil.warn((String)("task join the queue failed,taskId:" + taskRequest.getTaskId()));
        }
        if (!event.getSubmitResult()) {
            throw new JoinRejectedExecutionException("submit task failed,taskId:" + taskRequest.getTaskId());
        }
        return runtime;
    }

    void submitJoin(JoinTaskRuntime runtime) throws InterruptedException, JoinRejectedExecutionException {
        runtime.setWorkNodeId(this.myWorkNodeId);
        runtime.setTaskListener(this.taskEventListener);
        SubmitEvent event = new SubmitEvent(runtime);
        if (!this.eventQueues.offer(event)) {
            LogUtil.warn((String)("task join the queue failed,taskId:" + runtime.getTaskId()));
        }
        if (!event.getSubmitResult()) {
            throw new JoinRejectedExecutionException("submit task failed,taskId:" + runtime.getTaskId());
        }
    }

    public String getMyWorkNodeId() {
        return this.myWorkNodeId;
    }

    public int getMaxLimit() {
        return this.nodeState.getMaxRunningTaskLimit();
    }

    JoinTaskRuntime tryRemoveWaitingTask() throws InterruptedException {
        StealWaitingTaskEvent stealEvent = new StealWaitingTaskEvent();
        if (!this.eventQueues.offer(stealEvent)) {
            LogUtil.warn((String)"task join the queue failed");
        }
        return stealEvent.getResult();
    }

    NodeState getNodeStateCopy() {
        return this.nodeState.newCopy();
    }

    void setNodeStateListener(NodeStateChangLisener stateChangLisener) {
        this.stateChangLisener = stateChangLisener;
    }

    private JoinTaskRuntime createTaskRuntime(JoinTaskRequest taskRequest) {
        JoinTaskRuntime runtime = new JoinTaskRuntime(taskRequest);
        runtime.setWorkNodeId(this.myWorkNodeId);
        runtime.setTaskListener(this.taskEventListener);
        runtime.registerTimeoutListener(TaskRetryExecutor.getInstance().timeoutListener());
        runtime.registerTimeoutListener(JoinTaskManager.getInstance().getTaskMonitorDataRecorder());
        runtime.registerInterruptedListener(JoinTaskManager.getInstance().getTaskMonitorDataRecorder());
        return runtime;
    }

    void checkLongTimeRunningTask() {
        if (!this.eventQueues.offer(new LongRunningCheckEvent())) {
            LogUtil.warn((String)"task join the queue failed");
        }
    }

    void forceFinishRetryTaskOnTimeout(JoinTaskRuntime runtime) {
        FinishEvent event = new FinishEvent(runtime);
        if (!this.eventQueues.offer(event)) {
            LogUtil.warn((String)("task join the queue failed,taskId:" + runtime.getTaskId()));
        }
    }

    private List<JoinTaskRuntime> selectLongRunningTask() {
        ArrayList<JoinTaskRuntime> longRunningTaskRuntimes = new ArrayList<JoinTaskRuntime>();
        Collection<JoinTaskRuntime> runtimes = this.taskRuntimeMaps.values();
        long longJoinTime = QingJoinConfig.getInstance().getLongJoinTime();
        for (JoinTaskRuntime taskRuntime : runtimes) {
            long runningTime = taskRuntime.getRunningTime();
            if (runningTime >= (long)QingJoinConfig.getInstance().getTimeout() * 1000L) {
                taskRuntime.cancel(TaskCancelType.TIMEOUT);
                continue;
            }
            if (this.nodeState.longTimeRunTaskExist(taskRuntime.getTaskId()) || taskRuntime.getStatus() != 1 || runningTime < longJoinTime * 1000L) continue;
            longRunningTaskRuntimes.add(taskRuntime);
        }
        return longRunningTaskRuntimes;
    }

    private void cancelExceedMaxLimitTask(MemWarningLevel currentLevel) {
        long maxLimit = QingJoinConfig.getInstance().getMaxJoinDataLimit();
        MemWarningLevel triggerLevel = QingJoinConfig.getInstance().getMaxJoinDataTriggerLevel();
        if (maxLimit == -1L || currentLevel.getPriority() < triggerLevel.getPriority()) {
            return;
        }
        for (JoinTaskRuntime runtime : this.taskRuntimeMaps.values()) {
            if (runtime.isFinish() || !runtime.getTaskProgress().exceedMaxLimit(maxLimit)) continue;
            runtime.cancel(TaskCancelType.EXCEED_MAX_LIMIT);
        }
    }

    private void cancelUnsafeTask() {
        long safeLimit = QingJoinConfig.getInstance().getSafeJoinDataLimit();
        long smallTaskLimit = QingJoinConfig.getInstance().getSmallJoinThreadShold();
        long maxLimit = QingJoinConfig.getInstance().getMaxJoinDataLimit();
        for (JoinTaskRuntime runtime : this.taskRuntimeMaps.values()) {
            if (runtime.isFinish() || runtime.getTotalAdviseRow() <= smallTaskLimit && runtime.getTaskProgress().isSafe(safeLimit)) continue;
            if (runtime.getTaskProgress().exceedMaxLimit(maxLimit)) {
                runtime.cancel(TaskCancelType.EXCEED_MAX_LIMIT);
                continue;
            }
            if (runtime.survivalNotCancel()) continue;
            runtime.cancel(TaskCancelType.RETRY);
        }
    }

    private void cancelAll() {
        long maxLimit = QingJoinConfig.getInstance().getMaxJoinDataLimit();
        for (JoinTaskRuntime runtime : this.taskRuntimeMaps.values()) {
            if (runtime.isFinish()) continue;
            if (runtime.getTaskProgress().exceedMaxLimit(maxLimit)) {
                runtime.cancel(TaskCancelType.EXCEED_MAX_LIMIT);
                continue;
            }
            if (runtime.survivalNotCancel()) continue;
            runtime.cancel(TaskCancelType.RETRY);
        }
    }

    public MemoryObserver newMemObserver() {
        return new HeapMemObserver();
    }

    private class HeapMemObserver
    implements MemoryObserver {
        private HeapMemObserver() {
        }

        public void notifyMemoryWarning(MemWarningLevel warningLevel, double currentOldSpaceRatio) {
            if (!JoinWorkNode.this.eventQueues.offer(new CancelTaskEvent(warningLevel))) {
                LogUtil.warn((String)"task join the queue failed");
            }
        }

        public MemWarningLevel lowestLevel() {
            return MemWarningLevel.BLUE;
        }

        public int getPriority() {
            return 10;
        }

        public String getObserverId() {
            return JoinWorkNode.this.myWorkNodeId;
        }
    }

    private class CancelTaskEvent
    extends EventRunnerAdapter {
        private MemWarningLevel warningLevel;

        public CancelTaskEvent(MemWarningLevel warningLevel) {
            this.warningLevel = warningLevel;
        }

        @Override
        protected void runInternal() {
            switch (this.warningLevel) {
                case YELLOW: 
                case BLUE: {
                    JoinWorkNode.this.cancelExceedMaxLimitTask(this.warningLevel);
                    break;
                }
                case ORANGE: {
                    JoinWorkNode.this.cancelUnsafeTask();
                    break;
                }
                case RED: {
                    JoinWorkNode.this.cancelAll();
                }
            }
        }
    }

    private class SubmitEvent
    extends EventRunnerAdapter {
        private CountDownLatch latch;
        private boolean isSucceed;
        private JoinTaskRuntime runtime;

        public SubmitEvent(JoinTaskRuntime taskRuntime) {
            this.latch = new CountDownLatch(1);
            this.isSucceed = true;
            this.runtime = taskRuntime;
        }

        @Override
        protected void runInternal() {
            NodeStateType sourceState = JoinWorkNode.this.nodeState.getState();
            LogUtil.info((String)(JoinUtil.joinLogPrefix() + "task Event:" + (Object)((Object)TaskEvent.EventType.SUBMIT) + ",taskId:" + this.runtime.getTaskId() + ",nodeId:" + JoinWorkNode.this.myWorkNodeId));
            JoinWorkNode.this.taskRuntimeMaps.put(this.runtime.getTaskId(), this.runtime);
            if (this.runtime.getRetryTimes() == 0) {
                JoinWorkNode.this.nodeState.changeState(new TaskEvent(this.runtime, TaskEvent.EventType.SUBMIT));
            }
            JoinWorkNode.this.forkJoinPool.submit(new JoinRootTask(this.runtime));
            JoinWorkNode.this.stateChangLisener.onNodeStateChanged(new NodeStateChangeEvent(JoinWorkNode.this.myWorkNodeId, sourceState, JoinWorkNode.this.nodeState.getState()));
            this.latch.countDown();
        }

        @Override
        public void handleException(Exception e) {
            this.isSucceed = false;
            this.latch.countDown();
        }

        public boolean getSubmitResult() throws InterruptedException {
            if (this.latch.await(15L, TimeUnit.SECONDS)) {
                return this.isSucceed;
            }
            if (this.setTimeout()) {
                return false;
            }
            this.latch.await();
            return this.isSucceed;
        }
    }

    private class LongRunningCheckEvent
    extends EventRunnerAdapter {
        private LongRunningCheckEvent() {
        }

        @Override
        protected void runInternal() {
            List longRunningTaskRuntimes = JoinWorkNode.this.selectLongRunningTask();
            for (JoinTaskRuntime runtime : longRunningTaskRuntimes) {
                JoinWorkNode.this.nodeState.changeState(new TaskEvent(runtime, TaskEvent.EventType.LONGTIME_RUNNING));
            }
        }
    }

    private class FinishEvent
    extends EventRunnerAdapter {
        private JoinTaskRuntime joinTaskRuntime;

        public FinishEvent(JoinTaskRuntime joinTaskRuntime) {
            this.joinTaskRuntime = joinTaskRuntime;
        }

        @Override
        protected void runInternal() {
            NodeStateType sourceState = JoinWorkNode.this.nodeState.getState();
            LogUtil.info((String)(JoinUtil.joinLogPrefix() + "task Event:" + (Object)((Object)TaskEvent.EventType.FINISHED) + ",taskId:" + this.joinTaskRuntime.getTaskId() + ",nodeId:" + JoinWorkNode.this.myWorkNodeId));
            JoinWorkNode.this.taskRuntimeMaps.remove(this.joinTaskRuntime.getTaskId());
            if (this.joinTaskRuntime.getScale() == TaskScale.LARGE) {
                JoinWorkNode.this.nodeState.descreamentLargeScale();
            }
            if (this.joinTaskRuntime.getRetryTimes() > 0) {
                TaskRetryExecutor.getInstance().retryFinish(this.joinTaskRuntime.getTaskId());
            }
            JoinWorkNode.this.globalTaskCounter.release(this.joinTaskRuntime.isCanceled() ? -1 : 0);
            JoinWorkNode.this.nodeState.changeState(new TaskEvent(this.joinTaskRuntime, TaskEvent.EventType.FINISHED));
            JoinWorkNode.this.stateChangLisener.onNodeStateChanged(new NodeStateChangeEvent(JoinWorkNode.this.myWorkNodeId, sourceState, JoinWorkNode.this.nodeState.getState()));
        }
    }

    private class RetryEvent
    extends EventRunnerAdapter {
        private JoinTaskRuntime runtime;

        public RetryEvent(JoinTaskRuntime joinTaskRuntime) {
            this.runtime = joinTaskRuntime;
        }

        @Override
        protected void runInternal() {
            LogUtil.info((String)(JoinUtil.joinLogPrefix() + "task Event:" + (Object)((Object)TaskEvent.EventType.RETRY) + ",taskId:" + this.runtime.getTaskId() + ",nodeId:" + JoinWorkNode.this.myWorkNodeId));
            JoinWorkNode.this.taskRuntimeMaps.remove(this.runtime.getTaskId());
            if (this.runtime.getScale() == TaskScale.LARGE) {
                JoinWorkNode.this.nodeState.descreamentLargeScale();
            }
            TaskRetryExecutor.getInstance().submitRetryTask(JoinWorkNode.this, this.runtime);
        }
    }

    private class StealWaitingTaskEvent
    extends EventRunnerAdapter {
        private CountDownLatch latch;
        private JoinTaskRuntime result;

        private StealWaitingTaskEvent() {
            this.latch = new CountDownLatch(1);
        }

        @Override
        protected void runInternal() {
            String removedTaskId = null;
            long currentTime = System.currentTimeMillis();
            for (JoinTaskRuntime runtime : JoinWorkNode.this.taskRuntimeMaps.values()) {
                if (!runtime.isWaiting() || runtime.getStolenCount() >= 3) continue;
                long preStolenTime = runtime.getPreStolenTime();
                long duration = currentTime - preStolenTime;
                if (preStolenTime > 0L && duration < 60000L || !runtime.trySetNotRunInThisWorkNode()) continue;
                removedTaskId = runtime.getTaskId();
                break;
            }
            if (removedTaskId != null) {
                this.result = (JoinTaskRuntime)JoinWorkNode.this.taskRuntimeMaps.remove(removedTaskId);
            }
            if (null != this.result) {
                NodeStateType sourceState = JoinWorkNode.this.nodeState.getState();
                JoinWorkNode.this.nodeState.changeState(new TaskEvent(this.result, TaskEvent.EventType.STOLEN));
                JoinWorkNode.this.stateChangLisener.onNodeStateChanged(new NodeStateChangeEvent(JoinWorkNode.this.myWorkNodeId, sourceState, JoinWorkNode.this.nodeState.getState()));
            }
            this.latch.countDown();
        }

        public JoinTaskRuntime getResult() throws InterruptedException {
            boolean succeed = this.latch.await(30000L, TimeUnit.MILLISECONDS);
            if (succeed) {
                return this.result;
            }
            if (this.setTimeout()) {
                return null;
            }
            this.latch.await();
            return this.result;
        }

        @Override
        public void handleException(Exception e) {
            this.latch.countDown();
        }
    }

    private class ScaleChangedEvent
    extends EventRunnerAdapter {
        private JoinTaskRuntime runtime;

        public ScaleChangedEvent(JoinTaskRuntime joinTaskRuntime) {
            this.runtime = joinTaskRuntime;
        }

        @Override
        protected void runInternal() {
            TaskScale scale = this.runtime.getScale();
            if (scale == TaskScale.LARGE) {
                JoinWorkNode.this.nodeState.increamentLargeScale();
            }
        }
    }

    private abstract class EventRunnerAdapter
    implements EventRunner {
        public static final int STATUS_WAITING = 0;
        public static final int STATUS_RUNNING = 1;
        public static final int STATUS_TIMEOUT = 2;
        protected AtomicInteger stats = new AtomicInteger(0);

        private EventRunnerAdapter() {
        }

        protected boolean beginRun() {
            return this.stats.compareAndSet(0, 1);
        }

        protected boolean setTimeout() {
            return this.stats.compareAndSet(0, 2);
        }

        @Override
        public void runEvent() {
            if (!this.beginRun()) {
                return;
            }
            this.runInternal();
        }

        protected abstract void runInternal();

        @Override
        public void handleException(Exception e) {
        }
    }

    private static interface EventRunner {
        public void runEvent();

        public void handleException(Exception var1);
    }

    private class EventExecuteLoop
    implements Runnable {
        private boolean running = true;

        private EventExecuteLoop() {
        }

        @Override
        public void run() {
            try {
                while (this.running) {
                    EventRunner event = (EventRunner)JoinWorkNode.this.eventQueues.take();
                    this.executeEvent(event);
                }
            }
            catch (InterruptedException e) {
                LogUtil.error((String)"Event executor thread interrupted", (Throwable)e);
            }
        }

        public void stop() {
            this.running = false;
        }

        private void executeEvent(EventRunner event) {
            try {
                event.runEvent();
            }
            catch (Exception e) {
                event.handleException(e);
                LogUtil.error((String)"event run failed", (Throwable)e);
            }
        }
    }

    private class TaskListener
    implements TaskEventListener {
        private TaskListener() {
        }

        @Override
        public void fireTaskEvent(JoinTaskRuntime runtime, TaskEvent.EventType eventType) {
            switch (eventType) {
                case FINISHED: {
                    if (JoinWorkNode.this.eventQueues.offer(new FinishEvent(runtime))) break;
                    LogUtil.warn((String)("task join the queue failed,taskId:" + runtime.getTaskId()));
                    break;
                }
                case RETRY: {
                    if (JoinWorkNode.this.eventQueues.offer(new RetryEvent(runtime))) break;
                    LogUtil.warn((String)("task join the queue failed,taskId:" + runtime.getTaskId()));
                    break;
                }
                case SCALECHANGED: {
                    if (JoinWorkNode.this.eventQueues.offer(new ScaleChangedEvent(runtime))) break;
                    LogUtil.warn((String)("task join the queue failed,taskId:" + runtime.getTaskId()));
                }
            }
        }
    }
}

