/*
 * Decompiled with CFR 0.152.
 */
package com.kingdee.bos.openapi.third.invoker;

import com.kingdee.bos.openapi.common.bo.InvocationMessage;
import com.kingdee.bos.openapi.common.mq.MessageConsumer;
import com.kingdee.bos.openapi.common.mq.MqInvokeType;
import com.kingdee.bos.openapi.common.mq.RabbitMqConnectionFactory;
import com.kingdee.bos.openapi.third.AbstractInvoker;
import com.kingdee.bos.openapi.third.OpenApiInfo;
import com.kingdee.bos.openapi.third.exception.BizException;
import com.kingdee.bos.openapi.third.exception.InvokeNetworkException;
import com.kingdee.bos.openapi.third.exception.LoginException;
import com.kingdee.bos.openapi.third.invoker.AbstractLoginContext;
import com.kingdee.bos.openapi.third.invoker.AbstractMQLoginContext;
import com.kingdee.bos.openapi.third.utils.OpenApiTools;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.log4j.Logger;

public class RabbitMqInvoker
extends AbstractInvoker {
    private static final Logger logger = Logger.getLogger(RabbitMqInvoker.class);
    private Connection connection;
    private Channel channel;

    @Override
    public String invoke(AbstractLoginContext loginCtx, OpenApiInfo apiInfo) throws BizException, LoginException, InvokeNetworkException {
        AbstractMQLoginContext ctx = (AbstractMQLoginContext)loginCtx;
        this.init(ctx);
        return this._invoke(ctx, apiInfo);
    }

    private String _invoke(AbstractMQLoginContext ctx, OpenApiInfo apiInfo) throws BizException {
        final String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = null;
        String replyQueueName = null;
        try {
            replyQueueName = apiInfo.getReplyQueue();
            if (OpenApiTools.isEmpty(replyQueueName)) {
                replyQueueName = this.channel.queueDeclare().getQueue();
            }
            if (MqInvokeType.NOCALLBACK != apiInfo.getMqInvokeType()) {
                props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
            }
            String message = this.initInvokeMessage(ctx, apiInfo);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)(" Queue: " + apiInfo.getQueue() + " messgae : " + message));
            }
            this.channel.basicPublish("", apiInfo.getQueue(), props, message.getBytes("utf-8"));
        }
        catch (Exception e) {
            throw new BizException(e);
        }
        if (MqInvokeType.NOCALLBACK != apiInfo.getMqInvokeType()) {
            final ArrayBlockingQueue response = new ArrayBlockingQueue(1);
            final boolean isSync = apiInfo.getMqInvokeType() == MqInvokeType.CALLBACK_SYNC;
            try {
                MessageConsumer consumerTemp = null;
                if (apiInfo.getMqInvokeType() == MqInvokeType.CALLBACK_ASYNC) {
                    Class<MessageConsumer> callBackClazz = apiInfo.getCallBackClazz();
                    if (callBackClazz == null) {
                        logger.error((Object)("opanapi error. callbackClazz is null. info: " + apiInfo));
                    } else {
                        consumerTemp = callBackClazz.newInstance();
                        consumerTemp.setInfo(apiInfo);
                    }
                }
                final MessageConsumer consumer = consumerTemp;
                this.channel.basicConsume(replyQueueName, true, (Consumer)new DefaultConsumer(this.channel){

                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        if (properties.getCorrelationId().equals(corrId)) {
                            String result = new String(body, "utf-8");
                            if (isSync) {
                                response.offer(result);
                            } else if (consumer != null) {
                                consumer.setResultAsByte(body);
                                consumer.setResult(result);
                                consumer.handleResult();
                            }
                        }
                    }
                });
                if (isSync) {
                    return (String)response.take();
                }
            }
            catch (Exception e) {
                throw new BizException(e);
            }
        }
        return null;
    }

    private String initInvokeMessage(AbstractLoginContext ctx, OpenApiInfo apiInfo) throws Exception {
        InvocationMessage invokeMsg = new InvocationMessage();
        invokeMsg.setData(apiInfo);
        invokeMsg.setExtend(ctx.getloginMap());
        String message = this.dataType.parse2String(invokeMsg);
        return message;
    }

    private void init(AbstractMQLoginContext ctx) throws InvokeNetworkException {
        this.connection = RabbitMqConnectionFactory.getConnection(ctx.getConnEntity());
        try {
            this.channel = this.connection.createChannel();
        }
        catch (IOException e) {
            throw new InvokeNetworkException(e);
        }
    }

    @Override
    public void close(AbstractLoginContext ctx) throws InvokeNetworkException {
        try {
            this.channel.close();
        }
        catch (Exception e) {
            logger.error((Object)"openapi error mq ", (Throwable)e);
        }
    }
}

