XXL-JOB发送过期邮件提醒

简介

有了前面初次使用 XXL-JOB这篇文章就可顺着往下讲了

我的需求也很简单,通过一种方式发送即将过期的提醒,当然免费方式中最好的就是邮件啦!

我这里选择的方式就是利用XXL-JOB来查询然后分组发送,当然还有其他方式,我也非常乐意尝试考虑其他方法,合适的话。

留在最后讨论吧!

调度中心

首先需要有XXL-JOB调度中心,我前面已经搭建过了,不多提了。

可以参考一下官网或我之前的初次使用 XXL-JOB

执行器

依赖

除了必要的xxl-job-core,还有springbootmailthymeleaf依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

配置

执行器按自己需要配置,调度中心一定要对应上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://ip:port/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=default_token
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=${spring.application.name}
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=logs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30

顺带上我使用naocs的主要配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
server:
port: 3301
spring:
application:
name: @artifactId@
cloud:
nacos:
discovery:
server-addr: ${NACOS_HOST:nacos}:${NACOS_PORT:8848}
namespace: @nacosNamespace@
group: @nacos.group@
config:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
namespace: @nacosNamespace@
group: @nacos.group@
config:
import:
- optional:nacos:${spring.application.name}-${spring.profiles.active}.yml
- optional:nacos:${spring.application.name}-xxl-job-${spring.profiles.active}.properties

profiles:
active: @profiles.active@

logging:
config: classpath:log4j2-${spring.profiles.active}.xml

组件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/

}

过期邮件提醒执行器

过期邮件依赖于两个组件,一个是短链接mapper,另一个是邮件服务,在后面,别急

这里要注意,XxlJobHelperXXL-JOB提供的日志工具,通过它打印的日志可以在admin管理台中直接看到的,所以为方便调试,选择打印需要的信息哦

这个过期邮件提醒逻辑也是很简单的,查询过期时间在今天和3天后这段时间内的数据,然后通过强大的stream将其按邮箱分组发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import org.okay4cloud.okay.shortlink.api.entity.LinkMap;
import org.okay4cloud.okay.shortlink.email.EmailService;
import org.okay4cloud.okay.shortlink.mapper.LinkMapMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.*;
import java.util.stream.Collectors;

/**
* @author wnhyang
* @date 2023/3/24
**/
@Component
@RequiredArgsConstructor
public class MyXxlJob {
private static final Logger LOGGER = LoggerFactory.getLogger(MyXxlJob.class);

private final LinkMapMapper linkMapMapper;

private final EmailService emailService;

/**
* 扫表,3天内过期的进行邮件通知
*/
@XxlJob("emailJobHandler")
public void emailJobHandler() {
LOGGER.info("emailJobHandler exec......");
XxlJobHelper.log("XXL-JOB, emailJobHandler.");
LocalDate now = LocalDate.now();
LocalDate now3 = now.plusDays(3);
// 用户分组发邮件
List<LinkMap> linkMaps = linkMapMapper.selectList(new LambdaQueryWrapper<LinkMap>()
.ge(LinkMap::getExpireTime, now)
.le(LinkMap::getExpireTime, now3));
LOGGER.info("即将过期的短链接数量{}", linkMaps.size());
XxlJobHelper.log("即将过期的短链接数量{}", linkMaps.size());
if (!CollUtil.isEmpty(linkMaps)) {
Map<String, List<LinkMap>> collect = linkMaps.stream().collect(Collectors.groupingBy(LinkMap::getEmail));
LOGGER.info("分组后{}", collect);
for (Map.Entry<String, List<LinkMap>> entry : collect.entrySet()) {
String to = entry.getKey();
if (StrUtil.isNotBlank(to)) {
Map<String, Object> model = new HashMap<>(8);
model.put("subject", "短链接即将过期提醒");
model.put("title", "您的短链接将在三天内失效,请务必确认是否还需要,过期的前三天我们都会发送此邮件来向您确认,即将过期的短链接如下:");
List<LinkMap> list = entry.getValue();
model.put("list", list);
model.put("instance", UUID.randomUUID().toString());
emailService.sendHtmlMail(to, "短链接即将过期提醒", model);
LOGGER.info("发给 {} 数量{}", to, list.size());
XxlJobHelper.log("发给 {} 数量{}", to, list.size());
}
}
}
}
}

邮件发送

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
mail:
default-encoding: UTF-8
host: smtp.xxx.qq.com
username: xxxxx
password: xxxxx
properties:
mail:
smtp:
auth: true
ssl:
enable: true
starttls:
enable: true
required: true

邮件服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;

import javax.mail.internet.MimeMessage;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* @author wnhyang
* @date 2023/3/25
**/
@Component
@RequiredArgsConstructor
public class EmailService {
private static final Logger LOGGER = LoggerFactory.getLogger(EmailService.class);

private final JavaMailSenderImpl javaMailSender;

private final TemplateEngine templateEngine;

@Value("${spring.mail.username}")
private String sender;

@Async
public void sendHtmlMail(String to, String subject, Map<String, Object> model) {
MimeMessage mimeMailMessage;
try {
mimeMailMessage = javaMailSender.createMimeMessage();
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMailMessage,
MimeMessageHelper.MULTIPART_MODE_MIXED_RELATED, StandardCharsets.UTF_8.name());
mimeMessageHelper.setFrom(sender);
mimeMessageHelper.setTo(to);
mimeMessageHelper.setSubject(subject);
Context context = new Context();
context.setVariables(model);
String html = templateEngine.process("expire_mail", context);
mimeMessageHelper.setText(html, true);
javaMailSender.send(mimeMailMessage);
LOGGER.info("sendHtmlMail邮件发送成功 {};{};{}", to, subject, model);
} catch (Exception e) {
LOGGER.error("sendHtmlMail邮件发送成功 {};{};{}", to, subject, model);
e.printStackTrace();
}
}
}

过期邮件模版

使用thymeleaf的原因就是需要能根据模版数据动态生成邮件内容,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>${subject}</title>
</head>
<body>
<p th:text="${title}"></p>
<table border="1" cellpadding="3" style="border-collapse:collapse; width:80%;">
<thead style="font-weight: bold;color: #ffffff;background-color: #ff8c00;">
<tr>
<td style="width: auto">短链接</td>
<td style="width: auto">链接</td>
<td style="width: auto">描述</td>
<td style="width: auto">过期时间</td>
</tr>
</thead>
<tbody>
<!--/*@thymesVar id="list" type="List<org.okay4cloud.okay.shortlink.api.entity.LinkMap>"*/-->
<tr th:each="item : ${list}">
<td th:text="${item.code}"></td>
<td th:text="${item.link}"></td>
<td th:text="${item.remark}"></td>
<td th:text="${item.expireTime}"></td>
</tr>
</tbody>
</table>
</body>
</html>

简单的异步

不知道你有没有注意上面邮件服务有一个注解@Async,这个是SpringBoot提供一种很简单的异步方法调用,被@Async注解的方法会在调用时创建一个新的线程,并在新的线程中执行方法体,不会阻塞主线程。

使用方法也很简单。

配置

不需要额外的依赖

1
2
3
4
5
6
7
8
spring:
task:
execution:
pool:
core-size: 8
max-size: 20
queue-capacity: 100
keep-alive: 60

配置类

这里就创建了并开启了异步配置,一个核心线程数为10,最大线程数为20,队列容量为100,活动秒数为60,直接拒绝的线程池。

使用的话就加上@Async注解即可,验证的话可以查看日志中线程名是不是我们配置的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* @author wnhyang
* @date 2023/3/25
**/
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

@Value("${spring.task.execution.pool.core-size}")
private int corePoolSize;

@Value("${spring.task.execution.pool.max-size}")
private int maxPoolSize;

@Value("${spring.task.execution.pool.queue-capacity}")
private int queueCapacity;

@Value("${spring.task.execution.pool.keep-alive}")
private int keepAliveSeconds;

@Bean("asyncThreadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}

调度中心配置与日志

这个同上吧,可以参考一下官网或我之前的初次使用 XXL-JOB

重复说同一件事实在没必要。

我目前配置的是每天8点0 0 8 * * ?

总结

关于做这个过期邮件提醒的方案我也想过很多:

1、新建/修改数据时创建/更新定时通知任务

  • 直观逻辑上也很容易想到,我设置过期时间,就在过期时间提醒我嘛,但是,这是要维护好多的执行器吗?还是怎么做,我是没想明白。

2、消息队列延时消息

  • 消息队列又可以削峰解耦,还是异步,很适合呀,但,消息生产是创建和修改的动作吗,然后消费者发送时再检查一下是否需要发送吗?另外有这么久的延时消息啊,额。。。

3、定时扫表

  • 这个在我看来还是比较能接受的,缺点可能就是数据量过大时的查表问题吧

取长补短,优化一下:

定时扫表+消息队列

定时扫表需要解决的是拆分,不管是通过底层的拆表还是上层sql和执行器都有可操作的空间

通过定时扫表将任务细化送给消息队列,消费者发送前再次检查,体验感会更强

抛砖引玉了。。。