TaskInfo.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package com.kingdee.eas.custom.shuiyou.task;
  2. import cn.hutool.core.util.StrUtil;
  3. import cn.hutool.cron.task.Task;
  4. import cn.hutool.json.JSONObject;
  5. import cn.hutool.json.JSONUtil;
  6. import com.google.common.collect.Maps;
  7. import com.kingdee.bos.BOSException;
  8. import com.kingdee.bos.Context;
  9. import com.kingdee.bos.dao.ormapping.ObjectUuidPK;
  10. import com.kingdee.bos.metadata.entity.*;
  11. import com.kingdee.bos.metadata.query.util.CompareType;
  12. import com.kingdee.eas.common.EASBizException;
  13. import com.kingdee.eas.custom.shuiyou.RequestStateEnum;
  14. import com.kingdee.eas.custom.shuiyou.RequestTypeEnum;
  15. import com.kingdee.eas.custom.shuiyou.uitls.ISYUtilsFacade;
  16. import com.kingdee.eas.custom.shuiyou.uitls.SYUCronTaskUtil;
  17. import com.kingdee.eas.custom.shuiyou.uitls.SYUtilsFacadeFactory;
  18. import com.kingdee.eas.util.app.DbUtil;
  19. import com.kingdee.shr.base.syssetting.MSFServiceFacadeFactory;
  20. import org.apache.log4j.Logger;
  21. import java.io.Serializable;
  22. import java.util.Date;
  23. import java.util.Map;
  24. import java.util.Optional;
  25. import java.util.concurrent.CountDownLatch;
  26. import java.util.function.Supplier;
  27. public class TaskInfo extends AbstractTaskInfo implements Serializable, Task
  28. {
  29. private static Logger logger =Logger.getLogger(TaskInfo.class);
  30. private StringBuilder msg=new StringBuilder();
  31. private boolean state=true;
  32. public static final String calculatingCode="100004";
  33. private int i=0;
  34. private Context ctx;
  35. private CountDownLatch cdl;
  36. public TaskInfo()
  37. {
  38. super();
  39. }
  40. protected TaskInfo(String pkField)
  41. {
  42. super(pkField);
  43. }
  44. /**
  45. * 设置上下文
  46. * @param context
  47. * @return
  48. */
  49. public TaskInfo setContext(Context context)
  50. {
  51. this.ctx=context;
  52. return this;
  53. }
  54. /**
  55. * 设置任务计数器
  56. * @param cdl
  57. * @return
  58. */
  59. public TaskInfo setCountDownLatch(CountDownLatch cdl)
  60. {
  61. this.cdl=cdl;
  62. return this;
  63. }
  64. /**
  65. * 当前任务状态
  66. * true 执行中
  67. * false 执行完
  68. * @return
  69. */
  70. public boolean isState() {
  71. return state;
  72. }
  73. /**
  74. * 消息
  75. * @return
  76. */
  77. public String getMsg() {
  78. return msg.toString();
  79. }
  80. @Override
  81. public void execute() {
  82. this.i=this.i+1;
  83. try {
  84. //更新为执行中
  85. if (TaskStateEnum.WAITTING.equals(this.getState())){
  86. this.setState(TaskStateEnum.RUNNING);
  87. TaskFactory.getLocalInstance(ctx).save(this);
  88. }
  89. //校验是否执行执行中
  90. TaskInfo taskPre = this.getTaskPre();
  91. //检查前置任务
  92. if(null!=taskPre&&null!=taskPre.getId()) {
  93. ITask iTask = TaskFactory.getLocalInstance(ctx);
  94. SelectorItemCollection sc = new SelectorItemCollection();
  95. sc.add(new SelectorItemInfo("id"));
  96. sc.add(new SelectorItemInfo("state"));
  97. taskPre = iTask.getTaskInfo(new ObjectUuidPK(this.getId()), sc);
  98. TaskStateEnum taskStateEnum = taskPre.getState();
  99. if (TaskStateEnum.FAIDED.equals(taskStateEnum)) {
  100. //前置任务执行失败后续任务标记失败
  101. this.setState(TaskStateEnum.FAIDED);
  102. this.setExecutionStatus(ExecutionStatusEnum.failed);
  103. //移除任务
  104. this.stop();
  105. }
  106. if (TaskStateEnum.DELETE.equals(taskStateEnum)) {
  107. //前置任务执行删除后续任务标记删除
  108. this.setState(TaskStateEnum.DELETE);
  109. this.setExecutionStatus(ExecutionStatusEnum.failed);
  110. //移除任务
  111. this.stop();
  112. }
  113. if (!TaskStateEnum.FINISHED.equals(taskStateEnum)) {
  114. //前置任务未执行完成等待下次执行
  115. return;
  116. }
  117. }
  118. Map<String,Object> paramOsf = Maps.newHashMap();
  119. Optional<TaskRequestParamInfo> requestParamInfo = Optional.ofNullable(this.getRequestParam());
  120. String param = requestParamInfo.orElse((new TaskRequestParamInfo())).getParamContent();
  121. // //拼接参数
  122. // if(StrUtil.isNotBlank(param)){
  123. // JSONObject jsonObject = JSONUtil.parseObj(param);
  124. // for(Map.Entry<String,Object> st : jsonObject.entrySet()){
  125. // Optional<Object> op = Optional.ofNullable(st.getValue());
  126. // paramOsf.put(st.getKey(),op.orElse("").toString());
  127. // }
  128. // }
  129. this.setCalTime(new Date());
  130. paramOsf.put("this_taskId",this.getId().toString());
  131. paramOsf.put("paramData",param);
  132. logger.info("任务:"+this.getId().toString()+",请求地址:"+this.getUrl()+",请求参数:"+param+"。");
  133. //调用业务实现
  134. MessageResult messageResult = this.callBackOSF(paramOsf);
  135. if(ExecutionStatusEnum.success.equals(messageResult.getStatus())){
  136. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  137. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  138. requestResult.setResultContent(messageResult.getMsg());
  139. requestResult.setTask(this);
  140. this.setRequestResult(requestResult);
  141. //设置成功
  142. this.setRequestState(RequestStateEnum.Successful);
  143. this.setState(TaskStateEnum.FINISHED);
  144. this.stop();
  145. }else if (ExecutionStatusEnum.again.equals(messageResult.getStatus())){
  146. //设置继续
  147. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  148. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  149. requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
  150. requestResult.setTask(this);
  151. }else if (ExecutionStatusEnum.await_w.equals(messageResult.getStatus())){
  152. //设置等待,会停止这个任务。TaskStateEnum状态需要后续子任务更新。
  153. // 即在任务中增加新的子任务,又需要等待子任务执行完成,才能继续的情况。
  154. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  155. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  156. requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
  157. requestResult.setTask(this);
  158. this.setRequestState(RequestStateEnum.Successful);
  159. this.setState(TaskStateEnum.RUNNING);
  160. this.stop();
  161. }else {
  162. //设置失败
  163. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  164. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  165. requestResult.setResultContent(requestResult.getResultContent()+";"+messageResult.getMsg());
  166. requestResult.setTask(this);
  167. this.setRequestResult(requestResult);
  168. this.setRequestState(RequestStateEnum.Failed);
  169. this.setState(TaskStateEnum.FAIDED);
  170. this.stop();
  171. }
  172. this.setExecutionStatus(messageResult.getStatus());
  173. }catch (Exception e){
  174. e.printStackTrace();
  175. logger.error(e);
  176. //设置失败
  177. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  178. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  179. requestResult.setResultContent(requestResult.getResultContent()+"----------"+e.getMessage());
  180. requestResult.setTask(this);
  181. //设置失败
  182. this.setRequestResult(requestResult);
  183. this.setRequestState(RequestStateEnum.Failed);
  184. this.setState(TaskStateEnum.FAIDED);
  185. this.setExecutionStatus(ExecutionStatusEnum.error);
  186. this.stop();
  187. }finally {
  188. //更新任务状态
  189. try {
  190. TaskRequestResultFactory.getLocalInstance(ctx).save(this.getRequestResult());
  191. TaskFactory.getLocalInstance(ctx).save(this);
  192. } catch (BOSException e) {
  193. e.printStackTrace();
  194. } catch (EASBizException e) {
  195. e.printStackTrace();
  196. }
  197. }
  198. }
  199. /**
  200. * 任务调用实现
  201. */
  202. @Deprecated
  203. public void execute_old() {
  204. this.i=this.i+1;
  205. try {
  206. //设置执行中
  207. ISYUtilsFacade syu = SYUtilsFacadeFactory.getLocalInstance(this.ctx);
  208. Map<String,String> map = Maps.newHashMap();
  209. map.put("requestId",this.getRequestId());
  210. Optional<TaskRequestParamInfo> requestParamInfo = Optional.ofNullable(this.getRequestParam());
  211. String param = requestParamInfo.orElse((new TaskRequestParamInfo())).getParamContent();
  212. //拼接参数
  213. if(StrUtil.isNotBlank(param)){
  214. JSONObject jsonObject = JSONUtil.parseObj(param);
  215. for(Map.Entry<String,Object> st : jsonObject.entrySet()){
  216. Optional<Object> op = Optional.ofNullable(st.getValue());
  217. map.put(st.getKey(),op.orElse("").toString());
  218. }
  219. }
  220. String getParam = JSONUtil.toJsonStr(map);
  221. logger.info("任务:"+this.getId().toString()+",请求地址:"+this.getUrl()+",请求参数:"+getParam+"。");
  222. //调用远程接口
  223. String string = null;
  224. this.setCalTime(new Date());
  225. if(RequestTypeEnum.GET_VALUE.equals(this.getRequestType().getValue())){
  226. string = syu.get(this.getUrl(),getParam);
  227. }else{
  228. string = syu.post(this.getUrl(),getParam);
  229. }
  230. JSONObject jsonObject = JSONUtil.parseObj(string);
  231. JSONObject head = jsonObject.getJSONObject("head");
  232. if(null!=head){
  233. String status = (String) head.get("status");
  234. String code = (String) head.get("code");
  235. if("Y".equalsIgnoreCase(status)){
  236. logger.info("请求成功"+(this.i)+",结果:"+string);
  237. //请求成功停止这个任务
  238. this.stop();
  239. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  240. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  241. requestResult.setResultContent(string);
  242. requestResult.setTask(this);
  243. this.setRequestResult(requestResult);
  244. //设置成功
  245. this.setRequestState(RequestStateEnum.Successful);
  246. this.setState(TaskStateEnum.FINISHED);
  247. Map<String,Object> paramOsf = Maps.newHashMap();
  248. paramOsf.put("paramData",getParam);
  249. paramOsf.put("taskId",this.getId().toString());
  250. paramOsf.put("result",string);
  251. this.callBackOSF(paramOsf);
  252. } else if(calculatingCode.equals(code)&&StrUtil.isNotBlank(this.getRequestId())){
  253. logger.info("请求"+(this.i)+"计算中,结果:"+string);
  254. //继续
  255. } else {
  256. logger.info("请求"+(this.i)+"失败,结果:"+string);
  257. //请求失败停止这个任务
  258. this.stop();
  259. //设置返回值
  260. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  261. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  262. requestResult.setResultContent(string);
  263. requestResult.setTask(this);
  264. //设置失败
  265. this.setRequestResult(requestResult);
  266. this.setRequestState(RequestStateEnum.Successful);
  267. this.setState(TaskStateEnum.FAIDED);
  268. Map<String,Object> paramOsf = Maps.newHashMap();
  269. paramOsf.put("paramData",getParam);
  270. paramOsf.put("taskId",this.getId().toString());
  271. paramOsf.put("result",string);
  272. this.callBackOSF(paramOsf);
  273. }
  274. }
  275. }catch (Exception e){
  276. e.printStackTrace();
  277. logger.error(e);
  278. //设置失败
  279. Optional<TaskRequestResultInfo> resultInfo = Optional.ofNullable(this.getRequestResult());
  280. TaskRequestResultInfo requestResult = resultInfo.orElse(new TaskRequestResultInfo());
  281. requestResult.setResultContent(requestResult.getResultContent()+"----------"+e.getMessage());
  282. requestResult.setTask(this);
  283. //设置失败
  284. this.setRequestResult(requestResult);
  285. this.setRequestState(RequestStateEnum.Failed);
  286. this.setState(TaskStateEnum.FAIDED);
  287. this.stop();
  288. }finally {
  289. //更新任务状态
  290. try {
  291. TaskRequestResultFactory.getLocalInstance(ctx).save(this.getRequestResult());
  292. TaskFactory.getLocalInstance(ctx).save(this);
  293. } catch (BOSException e) {
  294. e.printStackTrace();
  295. } catch (EASBizException e) {
  296. e.printStackTrace();
  297. }
  298. }
  299. }
  300. /**
  301. * 调用osf服务
  302. * 默认会将请求参数放带上,需要自己在osf服务配置对应的参数
  303. * @param param
  304. */
  305. public MessageResult callBackOSF(Map<String,Object> param){
  306. MessageResult result=null;
  307. try {
  308. //回调osf服务
  309. if(StrUtil.isNotBlank(this.getOsfServiceName())) {
  310. Object obj = MSFServiceFacadeFactory.getLocalInstance(this.ctx).processService(this.getOsfServiceName(), param);
  311. result = (MessageResult)obj;
  312. }else {
  313. result =MessageResult.SUCCESS();
  314. }
  315. } catch (BOSException e){
  316. //e.printStackTrace();
  317. logger.error(e);
  318. result = MessageResult.ERROR(e.getMessage());
  319. } catch (EASBizException e){
  320. //e.printStackTrace();
  321. logger.error(e);
  322. result = MessageResult.ERROR(e.getMessage());
  323. }finally {
  324. }
  325. return result;
  326. }
  327. public boolean stop(){
  328. this.state=false;
  329. if(null!=cdl) {
  330. cdl.countDown();
  331. }
  332. return SYUCronTaskUtil.getInstance().delSchedule(this);
  333. }
  334. }