123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- package com.kingdee.eas.custom.shuiyou.task;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.cron.task.Task;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.google.common.collect.Maps;
- import com.kingdee.bos.BOSException;
- import com.kingdee.bos.Context;
- import com.kingdee.bos.dao.ormapping.ObjectUuidPK;
- import com.kingdee.bos.metadata.entity.*;
- import com.kingdee.bos.metadata.query.util.CompareType;
- import com.kingdee.eas.common.EASBizException;
- import com.kingdee.eas.custom.shuiyou.RequestStateEnum;
- import com.kingdee.eas.custom.shuiyou.RequestTypeEnum;
- import com.kingdee.eas.custom.shuiyou.uitls.ISYUtilsFacade;
- import com.kingdee.eas.custom.shuiyou.uitls.SYUCronTaskUtil;
- import com.kingdee.eas.custom.shuiyou.uitls.SYUtilsFacadeFactory;
- import com.kingdee.eas.util.app.DbUtil;
- import com.kingdee.shr.base.syssetting.MSFServiceFacadeFactory;
- import org.apache.log4j.Logger;
- import java.io.Serializable;
- import java.util.Date;
- import java.util.Map;
- import java.util.Optional;
- import java.util.concurrent.CountDownLatch;
- import java.util.function.Supplier;
- public class TaskInfo extends AbstractTaskInfo implements Serializable, Task
- {
- private static Logger logger =Logger.getLogger(TaskInfo.class);
- private StringBuilder msg=new StringBuilder();
- private boolean state=true;
- public static final String calculatingCode="100004";
- private int i=0;
- private Context ctx;
- private CountDownLatch cdl;
- public TaskInfo()
- {
- super();
- }
- protected TaskInfo(String pkField)
- {
- super(pkField);
- }
- /**
- * 设置上下文
- * @param context
- * @return
- */
- public TaskInfo setContext(Context context)
- {
- this.ctx=context;
- return this;
- }
- /**
- * 设置任务计数器
- * @param cdl
- * @return
- */
- public TaskInfo setCountDownLatch(CountDownLatch cdl)
- {
- this.cdl=cdl;
- return this;
- }
- /**
- * 当前任务状态
- * true 执行中
- * false 执行完
- * @return
- */
- public boolean isState() {
- return state;
- }
- /**
- * 消息
- * @return
- */
- public String getMsg() {
- return msg.toString();
- }
- @Override
- public void execute() {
- this.i=this.i+1;
- try {
- //更新为执行中
- if (TaskStateEnum.WAITTING.equals(this.getState())){
- this.setState(TaskStateEnum.RUNNING);
- TaskFactory.getLocalInstance(ctx).save(this);
- }
- //校验是否执行执行中
- TaskInfo taskPre = this.getTaskPre();
- //检查前置任务
- if(null!=taskPre&&null!=taskPre.getId()) {
- ITask iTask = TaskFactory.getLocalInstance(ctx);
- SelectorItemCollection sc = new SelectorItemCollection();
- sc.add(new SelectorItemInfo("id"));
- sc.add(new SelectorItemInfo("state"));
- taskPre = iTask.getTaskInfo(new ObjectUuidPK(this.getId()), sc);
- TaskStateEnum taskStateEnum = taskPre.getState();
- if (TaskStateEnum.FAIDED.equals(taskStateEnum)) {
- //前置任务执行失败后续任务标记失败
- this.setState(TaskStateEnum.FAIDED);
- this.setExecutionStatus(ExecutionStatusEnum.failed);
- //移除任务
- this.stop();
- }
- if (TaskStateEnum.DELETE.equals(taskStateEnum)) {
- //前置任务执行删除后续任务标记删除
- this.setState(TaskStateEnum.DELETE);
- this.setExecutionStatus(ExecutionStatusEnum.failed);
- //移除任务
- this.stop();
- }
- if (!TaskStateEnum.FINISHED.equals(taskStateEnum)) {
- //前置任务未执行完成等待下次执行
- return;
- }
- }
- Map<String,Object> paramOsf = Maps.newHashMap();
- Optional<TaskRequestParamInfo> requestParamInfo = Optional.ofNullable(this.getRequestParam());
- String param = requestParamInfo.orElse((new TaskRequestParamInfo())).getParamContent();
- // //拼接参数
- // if(StrUtil.isNotBlank(param)){
- // JSONObject jsonObject = JSONUtil.parseObj(param);
- // for(Map.Entry<String,Object> st : jsonObject.entrySet()){
- // Optional<Object> op = Optional.ofNullable(st.getValue());
- // paramOsf.put(st.getKey(),op.orElse("").toString());
- // }
- // }
- this.setCalTime(new Date());
- paramOsf.put("this_taskId",this.getId().toString());
- paramOsf.put("paramData",param);
- logger.info("任务:"+this.getId().toString()+",请求地址:"+this.getUrl()+",请求参数:"+param+"。");
- //调用业务实现
- MessageResult messageResult = this.callBackOSF(paramOsf);
- if(ExecutionStatusEnum.success.equals(messageResult.getStatus())){
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(messageResult.getMsg());
- requestResult.setTask(this);
- this.setRequestResult(requestResult);
- //设置成功
- this.setRequestState(RequestStateEnum.Successful);
- this.setState(TaskStateEnum.FINISHED);
- this.stop();
- }else if (ExecutionStatusEnum.again.equals(messageResult.getStatus())){
- //设置继续
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
- requestResult.setTask(this);
- }else if (ExecutionStatusEnum.await_w.equals(messageResult.getStatus())){
- //设置等待,会停止这个任务。TaskStateEnum状态需要后续子任务更新。
- // 即在任务中增加新的子任务,又需要等待子任务执行完成,才能继续的情况。
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
- requestResult.setTask(this);
- this.setRequestState(RequestStateEnum.Successful);
- this.setState(TaskStateEnum.RUNNING);
- this.stop();
- }else {
- //设置失败
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
- requestResult.setTask(this);
- this.setRequestResult(requestResult);
- this.setRequestState(RequestStateEnum.Failed);
- this.setState(TaskStateEnum.FAIDED);
- this.stop();
- }
- this.setExecutionStatus(messageResult.getStatus());
- }catch (Exception e){
- e.printStackTrace();
- logger.error(e);
- //设置失败
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(requestResult.getResultContent()+"----------"+e.getMessage());
- requestResult.setTask(this);
- //设置失败
- this.setRequestResult(requestResult);
- this.setRequestState(RequestStateEnum.Failed);
- this.setState(TaskStateEnum.FAIDED);
- this.setExecutionStatus(ExecutionStatusEnum.error);
- this.stop();
- }finally {
- //更新任务状态
- try {
- TaskRequestResultFactory.getLocalInstance(ctx).save(this.getRequestResult());
- TaskFactory.getLocalInstance(ctx).save(this);
- } catch (BOSException e) {
- e.printStackTrace();
- } catch (EASBizException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 任务调用实现
- */
- @Deprecated
- public void execute_old() {
- this.i=this.i+1;
- try {
- //设置执行中
- ISYUtilsFacade syu = SYUtilsFacadeFactory.getLocalInstance(this.ctx);
- Map<String,String> map = Maps.newHashMap();
- map.put("requestId",this.getRequestId());
- Optional<TaskRequestParamInfo> requestParamInfo = Optional.ofNullable(this.getRequestParam());
- String param = requestParamInfo.orElse((new TaskRequestParamInfo())).getParamContent();
- //拼接参数
- if(StrUtil.isNotBlank(param)){
- JSONObject jsonObject = JSONUtil.parseObj(param);
- for(Map.Entry<String,Object> st : jsonObject.entrySet()){
- Optional<Object> op = Optional.ofNullable(st.getValue());
- map.put(st.getKey(),op.orElse("").toString());
- }
- }
- String getParam = JSONUtil.toJsonStr(map);
- logger.info("任务:"+this.getId().toString()+",请求地址:"+this.getUrl()+",请求参数:"+getParam+"。");
- //调用远程接口
- String string = null;
- this.setCalTime(new Date());
- if(RequestTypeEnum.GET_VALUE.equals(this.getRequestType().getValue())){
- string = syu.get(this.getUrl(),getParam);
- }else{
- string = syu.post(this.getUrl(),getParam);
- }
- JSONObject jsonObject = JSONUtil.parseObj(string);
- JSONObject head = jsonObject.getJSONObject("head");
- if(null!=head){
- String status = (String) head.get("status");
- String code = (String) head.get("code");
- if("Y".equalsIgnoreCase(status)){
- logger.info("请求成功"+(this.i)+",结果:"+string);
- //请求成功停止这个任务
- this.stop();
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(string);
- requestResult.setTask(this);
- this.setRequestResult(requestResult);
- //设置成功
- this.setRequestState(RequestStateEnum.Successful);
- this.setState(TaskStateEnum.FINISHED);
- Map<String,Object> paramOsf = Maps.newHashMap();
- paramOsf.put("paramData",getParam);
- paramOsf.put("taskId",this.getId().toString());
- paramOsf.put("result",string);
- this.callBackOSF(paramOsf);
- } else if(calculatingCode.equals(code)&&StrUtil.isNotBlank(this.getRequestId())){
- logger.info("请求"+(this.i)+"计算中,结果:"+string);
- //继续
- } else {
- logger.info("请求"+(this.i)+"失败,结果:"+string);
- //请求失败停止这个任务
- this.stop();
- //设置返回值
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(string);
- requestResult.setTask(this);
- //设置失败
- this.setRequestResult(requestResult);
- this.setRequestState(RequestStateEnum.Successful);
- this.setState(TaskStateEnum.FAIDED);
- Map<String,Object> paramOsf = Maps.newHashMap();
- paramOsf.put("paramData",getParam);
- paramOsf.put("taskId",this.getId().toString());
- paramOsf.put("result",string);
- this.callBackOSF(paramOsf);
- }
- }
- }catch (Exception e){
- e.printStackTrace();
- logger.error(e);
- //设置失败
- Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
- TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
- requestResult.setResultContent(requestResult.getResultContent()+"----------"+e.getMessage());
- requestResult.setTask(this);
- //设置失败
- this.setRequestResult(requestResult);
- this.setRequestState(RequestStateEnum.Failed);
- this.setState(TaskStateEnum.FAIDED);
- this.stop();
- }finally {
- //更新任务状态
- try {
- TaskRequestResultFactory.getLocalInstance(ctx).save(this.getRequestResult());
- TaskFactory.getLocalInstance(ctx).save(this);
- } catch (BOSException e) {
- e.printStackTrace();
- } catch (EASBizException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 调用osf服务
- * 默认会将请求参数放带上,需要自己在osf服务配置对应的参数
- * @param param
- */
- public MessageResult callBackOSF(Map<String,Object> param){
- MessageResult result=null;
- try {
- //回调osf服务
- if(StrUtil.isNotBlank(this.getOsfServiceName())) {
- Object obj = MSFServiceFacadeFactory.getLocalInstance(this.ctx).processService(this.getOsfServiceName(), param);
- result = (MessageResult)obj;
- }else {
- result =MessageResult.SUCCESS();
- }
- } catch (BOSException e){
- //e.printStackTrace();
- logger.error(e);
- result = MessageResult.ERROR(e.getMessage());
- } catch (EASBizException e){
- //e.printStackTrace();
- logger.error(e);
- result = MessageResult.ERROR(e.getMessage());
- }finally {
- }
- return result;
- }
- public boolean stop(){
- this.state=false;
- if(null!=cdl) {
- cdl.countDown();
- }
- return SYUCronTaskUtil.getInstance().delSchedule(this);
- }
- }
|