Commit 1ee4839b by zhangxingmin

push

parent 0f54f404
...@@ -6,6 +6,7 @@ import com.yd.common.utils.RandomStringGenerator; ...@@ -6,6 +6,7 @@ import com.yd.common.utils.RandomStringGenerator;
import com.yd.csf.api.dto.AlgorithmCollectResDto; import com.yd.csf.api.dto.AlgorithmCollectResDto;
import com.yd.csf.api.dto.AlgorithmResDto; import com.yd.csf.api.dto.AlgorithmResDto;
import com.yd.csf.api.service.ApiExpectedFortuneService; import com.yd.csf.api.service.ApiExpectedFortuneService;
import com.yd.csf.api.service.XxlJobService;
import com.yd.csf.feign.request.expectedfortune.ApiGenerateExpectedFortuneRequest; import com.yd.csf.feign.request.expectedfortune.ApiGenerateExpectedFortuneRequest;
import com.yd.csf.feign.response.expectedfortune.ApiGenerateExpectedFortuneResponse; import com.yd.csf.feign.response.expectedfortune.ApiGenerateExpectedFortuneResponse;
import com.yd.csf.service.model.CalmTask; import com.yd.csf.service.model.CalmTask;
...@@ -46,6 +47,8 @@ public class ApiExpectedFortuneAsyncService { ...@@ -46,6 +47,8 @@ public class ApiExpectedFortuneAsyncService {
private PolicyService policyService; private PolicyService policyService;
@Resource @Resource
private ICalmTaskService iCalmTaskService; private ICalmTaskService iCalmTaskService;
@Resource
private XxlJobService xxlJobService;
/** /**
* 异步处理-执行预计发拥数据 * 异步处理-执行预计发拥数据
...@@ -158,6 +161,13 @@ public class ApiExpectedFortuneAsyncService { ...@@ -158,6 +161,13 @@ public class ApiExpectedFortuneAsyncService {
log.info("完成基本法分组计算,policyNo: {}, 分组后用户数: {}", policyNo, resultList.size()); log.info("完成基本法分组计算,policyNo: {}, 分组后用户数: {}", policyNo, resultList.size());
// 构建冷静期定时任务逻辑 // 构建冷静期定时任务逻辑
//创建XXL-Job定时任务
if (CollectionUtils.isNotEmpty(resultList)) {
//循环创建定时任务(多个冷静期的定时任务)
for (CalmTask calmTask : resultList) {
xxlJobService.addScheduleJob(calmTask.getCalmTaskBizId(),"冷静期定时发送任务-","calmSendJobHandler",calmTask.getCoolingOffEndDate());
}
}
} }
......
package com.yd.csf.api.handler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 冷静期定时发送任务处理器 - XXL-Job定时任务执行器
* 使用@XxlJob注解方式
*/
@Component
@Slf4j
public class CalmSendJobHandler {
/**
* XXL-Job任务执行入口方法
*/
@XxlJob("calmSendJobHandler")
public void execute() throws Exception {
}
}
\ No newline at end of file
package com.yd.csf.api.service;
import java.util.Date;
public interface XxlJobService {
String addScheduleJob(String taskBizId, String jobDesc, String executorHandler, Date scheduleTime);
}
package com.yd.csf.api.service.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yd.csf.api.service.XxlJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.util.Calendar;
import java.util.Date;
@Service
@Slf4j
public class XxlJobServiceImpl implements XxlJobService {
@Value("${xxl.job.admin.addresses:http://139.224.145.34:8686/xxl-job-admin}")
private String adminAddresses;
@Value("${xxl.job.executor.appname:csf-executor}")
private String appName;
@Value("${xxl.job.admin.username:admin}")
private String adminUsername;
@Value("${xxl.job.admin.password:123456}")
private String adminPassword;
@Autowired
private RestTemplate restTemplate;
// 添加认证Cookie存储
private String authCookie;
// 添加API路径常量
private static final String API_LOGIN = "/login";
private static final String API_JOB_GROUP_LIST = "/jobgroup/pageList";
private static final String API_JOB_ADD = "/jobinfo/add";
private static final String API_JOB_START = "/jobinfo/start";
/**
* 登录XXL-Job获取认证Cookie
* @return
*/
private boolean loginXxlJob() {
try {
String loginUrl = adminAddresses + "/login";
MultiValueMap<String, String> loginParams = new LinkedMultiValueMap<>();
loginParams.add("userName", adminUsername);
loginParams.add("password", adminPassword);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<MultiValueMap<String, String>> loginEntity = new HttpEntity<>(loginParams, headers);
ResponseEntity<String> loginResponse = restTemplate.postForEntity(loginUrl, loginEntity, String.class);
if (loginResponse.getStatusCode().is2xxSuccessful()) {
// 从响应头中获取Cookie
HttpHeaders responseHeaders = loginResponse.getHeaders();
if (responseHeaders.containsKey("Set-Cookie")) {
authCookie = responseHeaders.getFirst("Set-Cookie");
log.info("XXL-Job登录成功");
return true;
}
}
log.error("XXL-Job登录失败: {}", loginResponse.getStatusCode());
return false;
} catch (Exception e) {
log.error("XXL-Job登录异常", e);
return false;
}
}
/**
* 创建XXL-Job定时任务
* @param taskBizId 任务唯一业务ID(任务表的唯一业务ID)
* @param jobDesc 任务描述(此次任务的名称)
* @param executorHandler 监听类(不同业务的定时,不一样,单独实现)
* @param scheduleTime 定时时间(定时发送的时间)
* @return
*/
@Override
public String addScheduleJob(String taskBizId, String jobDesc, String executorHandler, Date scheduleTime) {
try {
// 登录认证
if (authCookie == null && !loginXxlJob()) {
throw new RuntimeException("XXL-Job认证失败");
}
// 先获取执行器ID
Integer jobGroupId = getExecutorGroupId(appName);
if (jobGroupId == null) {
throw new RuntimeException("获取执行器ID失败,执行器可能未注册: " + appName);
}
String cronExpression = convertDateToCron(scheduleTime);
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("jobGroup", jobGroupId.toString()); // 使用动态获取的执行器ID
// params.add("jobDesc", "邮件发送任务-" + taskBizId);
params.add("jobDesc", jobDesc + taskBizId);
params.add("author", "system");
params.add("scheduleType", "CRON");
params.add("scheduleConf", cronExpression);
params.add("glueType", "BEAN");
// params.add("executorHandler", "emailSendJobHandler");
params.add("executorHandler", executorHandler);
params.add("executorParam", taskBizId);
params.add("executorRouteStrategy", "FIRST");
params.add("misfireStrategy", "DO_NOTHING");
params.add("executorBlockStrategy", "SERIAL_EXECUTION");
String url = adminAddresses + "/jobinfo/add";
HttpHeaders headers = new HttpHeaders();
headers.add("Cookie", authCookie);
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<MultiValueMap<String, String>> entity = new HttpEntity<>(params, headers);
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
String jobId = extractJobId(response.getBody());
log.info("创建XXL-Job任务成功, taskId: {}, jobId: {}", taskBizId, jobId);
// 创建成功后自动启动任务
if (startJob(jobId)) {
log.info("自动启动XXL-Job任务成功, jobId: {}", jobId);
} else {
log.warn("自动启动XXL-Job任务失败, jobId: {}", jobId);
}
return jobId;
} else {
throw new RuntimeException("XXL-Job API调用失败: " + response.getStatusCode());
}
} catch (Exception e) {
log.error("创建XXL-Job任务失败", e);
throw new RuntimeException("创建定时任务失败: " + e.getMessage());
}
}
// 添加版本检测方法
private void checkApiPaths() {
log.info("正在检测XXL-Job Admin API路径...");
// 可以添加API路径检测逻辑
}
/**
* 根据执行器AppName获取执行器ID
* @param appName
* @return
*/
private Integer getExecutorGroupId(String appName) {
try {
// 尝试不同的API路径
String[] possiblePaths = {
"/jobgroup/pageList", // XXL-Job 2.3.0+
"/jobgroup/list", // 旧版本
"/jobgroup/listByApp" // 其他可能路径
};
for (String path : possiblePaths) {
Integer groupId = tryGetExecutorGroupId(appName, path);
if (groupId != null) {
log.info("成功获取执行器ID: {},使用的API路径: {}", groupId, path);
return groupId;
}
}
log.error("所有API路径尝试均失败,无法获取执行器ID");
return null;
} catch (Exception e) {
log.error("获取执行器ID异常", e);
return null;
}
}
/**
* 获取执行器ID
* @param appName
* @param apiPath
* @return
*/
private Integer tryGetExecutorGroupId(String appName, String apiPath) {
try {
String url = adminAddresses + apiPath;
log.info("尝试API路径: {}", url);
HttpHeaders headers = new HttpHeaders();
headers.add("Cookie", authCookie);
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
// 对于分页接口可能需要添加参数
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
if (apiPath.equals("/jobgroup/pageList")) {
params.add("start", "0");
params.add("length", "100");
params.add("appname", appName);
}
HttpEntity<?> entity = params.isEmpty() ?
new HttpEntity<>(headers) :
new HttpEntity<>(params, headers);
ResponseEntity<String> response = restTemplate.exchange(
url, HttpMethod.POST, entity, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(response.getBody());
log.info("API响应: {}", response.getBody());
// 解析响应,根据不同的API路径处理
if (jsonNode.has("data")) {
JsonNode dataNode = jsonNode.get("data");
if (dataNode.isArray()) {
// 数组格式的响应
for (JsonNode item : dataNode) {
if (item.has("appname") && appName.equals(item.get("appname").asText())) {
return item.get("id").asInt();
}
}
} else if (dataNode.has("data")) {
// 分页格式的响应
JsonNode dataArray = dataNode.get("data");
if (dataArray.isArray()) {
for (JsonNode item : dataArray) {
if (item.has("appname") && appName.equals(item.get("appname").asText())) {
return item.get("id").asInt();
}
}
}
}
}
}
return null;
} catch (Exception e) {
log.warn("API路径 {} 尝试失败: {}", apiPath, e.getMessage());
return null;
}
}
/**
* 启动XXL-Job任务
* @param jobId
* @return
*/
private boolean startJob(String jobId) {
try {
String startUrl = adminAddresses + "/jobinfo/start";
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("id", jobId);
HttpHeaders headers = new HttpHeaders();
headers.add("Cookie", authCookie);
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<MultiValueMap<String, String>> entity = new HttpEntity<>(params, headers);
ResponseEntity<String> response = restTemplate.exchange(startUrl, HttpMethod.POST, entity, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
log.info("启动任务成功, jobId: {}", jobId);
return true;
} else {
log.error("启动任务失败, jobId: {}, 状态码: {}, 响应: {}", jobId, response.getStatusCode(), response.getBody());
return false;
}
} catch (Exception e) {
log.error("启动任务异常, jobId: {}", jobId, e);
return false;
}
}
/**
* date转成Cron表达式
* @param date
* @return
*/
private String convertDateToCron(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int second = calendar.get(Calendar.SECOND);
int minute = calendar.get(Calendar.MINUTE);
int hour = calendar.get(Calendar.HOUR_OF_DAY);
int day = calendar.get(Calendar.DAY_OF_MONTH);
int month = calendar.get(Calendar.MONTH) + 1;
int year = calendar.get(Calendar.YEAR);
return String.format("%d %d %d %d %d ? %d", second, minute, hour, day, month, year);
}
/**
* 从XXL-Job响应中提取真实的jobId
* 响应格式: {"code":200,"msg":null,"content":"6"}
*/
private String extractJobId(String responseBody) {
try {
log.info("XXL-Job响应: {}", responseBody);
if (responseBody == null || responseBody.trim().isEmpty()) {
log.warn("响应体为空");
return String.valueOf(System.currentTimeMillis());
}
// 使用JSON解析器正确解析响应
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(responseBody);
if (jsonNode.has("code") && jsonNode.get("code").asInt() == 200) {
if (jsonNode.has("content")) {
String jobId = jsonNode.get("content").asText();
log.info("提取到真实jobId: {}", jobId);
return jobId;
}
}
log.warn("无法从响应中解析jobId,响应: {}", responseBody);
return String.valueOf(System.currentTimeMillis());
} catch (Exception e) {
log.error("解析jobId异常", e);
return String.valueOf(System.currentTimeMillis());
}
}
}
\ No newline at end of file
...@@ -47,5 +47,11 @@ ...@@ -47,5 +47,11 @@
<artifactId>yd-base-feign</artifactId> <artifactId>yd-base-feign</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- XXL-Job 核心依赖 -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment