前言
之前呆过一家公司做的生鲜配送,有个业务要针对每个用户的购买信息给他打上标签便于更好的营销
有一堆用户集合,然后去查询每个用户的最近购买、购买频次、下单金额等,再去计算他的标签
这是一个复杂的业务,只能对用户一个一个的去生成
我们当然能想到开定时任务去处理,每天或周一凌晨去执行
但是在海量用户下,每次生成一个用户标签肯定是很慢的
比如每次开10个线程,每个同时处理10条数据,这样的话一次就能处理100条数据,大大提高生产力
思路
- 对要处理的数据集合进行分页获取数据
- 根据自己设置的值创建线程集合,将分页后的数据传到对应线程去执行
- 执行线程集合收集结果
- 校验结果为有效值,继续执行;无效值为(分页后的数据为空或size<pagesize)认为执行完成
代码
批量线程处理器核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175package com.mwk.thread.task;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.mwk.utils.Pager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 批量线程处理器
*
* @author MinWeikai
* @date 2021/7/22 11:03
*/
public class ThreadPoolTaskExecutorBatch {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolTaskExecutorBatch.class);
/**
* 每轮线程数
*/
private int poolSize = 10;
/**
* 每页数量,每轮处理数据量
*/
private int pageSize = 10;
private int maxPoolSize = 10;
private int maxPageSize = 10;
private Class abstractBatchCallable;
/**
* 自动分配线程数
*/
private boolean autoPoolSize = true;
/**
* 需要批量处理的数据集
*/
private List list;
public static ThreadPoolTaskExecutorBatch build() {
return new ThreadPoolTaskExecutorBatch();
}
public ThreadPoolTaskExecutorBatch setAbstractBatchCallable(Class abstractBatchCallable) {
this.abstractBatchCallable = abstractBatchCallable;
return this;
}
public static AbstractBatchCallable getInstance(Class batchCallable) throws Exception {
return (AbstractBatchCallable) batchCallable.newInstance();
}
public ThreadPoolTaskExecutorBatch setList(List list) {
this.list = list;
return this;
}
public ThreadPoolTaskExecutorBatch setPoolSize(int poolSize) {
this.poolSize = poolSize;
return this;
}
public ThreadPoolTaskExecutorBatch setPageSize(int pageSize) {
this.pageSize = pageSize;
return this;
}
public ThreadPoolTaskExecutorBatch setAutoPoolSize(boolean autoPoolSize) {
this.autoPoolSize = autoPoolSize;
return this;
}
public ThreadPoolTaskExecutorBatch setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
return this;
}
public ThreadPoolTaskExecutorBatch setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
return this;
}
public void start() {
TimeInterval timer = DateUtil.timer();
log.info("----开始生成数据----");
if(!this.autoPoolSizeByList()){
return;
}
log.debug("批任务处理信息:autoPoolSize={} poolSize={} pageSize={} maxPoolSize={} maxPageSize={}",
this.autoPoolSize, this.poolSize, this.pageSize, this.maxPoolSize, this.maxPageSize);
//是否继续
boolean proceed = true;
//线程创建轮数
int rounds = 0;
//起始页数
int page = 0;
List<Callable<Integer>> list = new ArrayList<>();
Pager pager;
while (proceed) {
rounds++;
int temp = 0;
for (int k = 0; k < this.poolSize; k++) {
page++;
try {
pager = new Pager<>(this.list, page, pageSize);
if(pager.getContent().size() == 0){
break;
}
list.add(getInstance(abstractBatchCallable).setPager(pager));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
ExecutorService executor = Executors.newFixedThreadPool(this.poolSize);
List<Future<Integer>> results = executor.invokeAll(list);
executor.shutdown();
for (Future<Integer> result : results) {
temp += result.get();
}
} catch (Exception e) {
log.error("生成数据出错" + e.getMessage(), e);
}
log.info("----线程创建轮【" + rounds + "】,页数:" + page + ",当前轮结束状态" + temp);
if (temp < this.poolSize) {
proceed = false;
}
list.clear();
}
log.info("----总轮数:" + rounds + ",总页数:" + page + ",耗时:" + timer.intervalMs());
}
/**
* 自动计算批任务执行线程数、每线程执行任务数
* @return
*/
private boolean autoPoolSizeByList(){
if(!this.autoPoolSize){
return true;
}
int allSize = this.list.size();
if(allSize == 0){
return false;
}
// 任务总数小于等于最大线程数,则创建任务数线程,每线程执行1任务
if(allSize <= this.maxPoolSize){
this.setPoolSize(allSize);
this.setPageSize(1);
return true;
}
// 任务总数小于等于最大线程数*最大线程执行任务数,则线程数最大值,每线程执行任务总数除以线程数最大值进位值
int rem = allSize % this.maxPoolSize;
int value = allSize / this.maxPoolSize;
if(allSize <= this.maxPoolSize * this.maxPageSize){
this.setPoolSize(this.maxPoolSize);
this.setPageSize(rem == 0 ? value : (value + 1));
return true;
}
return true;
}
}线程批量处理执行抽象类,需要批量处理数据的任务都可以继承此抽象类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package com.mwk.thread.task;
import com.mwk.utils.Pager;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.Callable;
/**
* 线程批量处理执行抽象类,需要批量处理数据的任务都可以继承此抽象类
*
* @author MinWeikai
* @date 2021/8/7 10:44
*/
public abstract class AbstractBatchCallable implements Callable<Integer> {
/**
* 需要批量处理的数据集
*/
protected List list;
private Pager pager;
public Integer call() {
if (CollectionUtils.isEmpty(list)) {
return 0;
}
this.exec();
return list.size() < pager.getPageSize() ? 0 : 1;
}
/**
* 自定义的执行方法
*/
protected abstract void exec();
public AbstractBatchCallable setPager(Pager pager) {
this.pager = pager;
this.list = pager.getContent();
return this;
}
}测试任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.mwk.thread.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 测试任务类
*
* @author MinWeikai
* @date 2021/8/7 10:48
*/
public class MytBatchCallableTest extends AbstractBatchCallable {
private static final Logger log = LoggerFactory.getLogger(MytBatchCallableTest.class);
public void exec() {
List<Integer> list = (List<Integer>) this.list;
log.debug("集合值:{}", list);
}
}测试处理集合调用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.mwk.thread.task;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 测试处理集合调用方法
*
* @author MinWeikai
* @date 2021/8/13 22:04
*/
public class ThreadPoolTaskExecutorBatchTest {
public static void main(String[] args) {
List<Integer> list = IntStream.range(1, 1995).boxed().collect(Collectors.toList());
System.out.println("待执行集合:" + list);
ThreadPoolTaskExecutorBatch
.build()
.setAbstractBatchCallable(MytBatchCallableTest.class)
.setList(list)
// .setAutoPoolSize(false)
// .setPoolSize(10)
// .setPageSize(50)
.start();
}
}