影落离风

The shadow falls away from the wind

0%

伴随我经历过三家公司优化过各种线程执行效率代码示例

前言

之前呆过一家公司做的生鲜配送,有个业务要针对每个用户的购买信息给他打上标签便于更好的营销

有一堆用户集合,然后去查询每个用户的最近购买、购买频次、下单金额等,再去计算他的标签

这是一个复杂的业务,只能对用户一个一个的去生成

我们当然能想到开定时任务去处理,每天或周一凌晨去执行

但是在海量用户下,每次生成一个用户标签肯定是很慢的

由此我们可以想到开多个线程,每次处理多条数据

比如每次开10个线程,每个同时处理10条数据,这样的话一次就能处理100条数据,大大提高生产力

思路

  1. 对要处理的数据集合进行分页获取数据
  2. 根据自己设置的值创建线程集合,将分页后的数据传到对应线程去执行
  3. 执行线程集合收集结果
  4. 校验结果为有效值,继续执行;无效值为(分页后的数据为空或size<pagesize)认为执行完成

代码

  1. 批量线程处理器核心代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    package com.mwk.thread.task;

    import cn.hutool.core.date.DateUtil;
    import cn.hutool.core.date.TimeInterval;
    import com.mwk.utils.Pager;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    /**
    * 批量线程处理器
    *
    * @author MinWeikai
    * @date 2021/7/22 11:03
    */
    public class ThreadPoolTaskExecutorBatch {

    private static final Logger log = LoggerFactory.getLogger(ThreadPoolTaskExecutorBatch.class);

    /**
    * 每轮线程数
    */
    private int poolSize = 10;

    /**
    * 每页数量,每轮处理数据量
    */
    private int pageSize = 10;


    private int maxPoolSize = 10;

    private int maxPageSize = 10;

    private Class abstractBatchCallable;

    /**
    * 自动分配线程数
    */
    private boolean autoPoolSize = true;

    /**
    * 需要批量处理的数据集
    */
    private List list;


    public static ThreadPoolTaskExecutorBatch build() {
    return new ThreadPoolTaskExecutorBatch();
    }

    public ThreadPoolTaskExecutorBatch setAbstractBatchCallable(Class abstractBatchCallable) {
    this.abstractBatchCallable = abstractBatchCallable;
    return this;
    }

    public static AbstractBatchCallable getInstance(Class batchCallable) throws Exception {
    return (AbstractBatchCallable) batchCallable.newInstance();
    }

    public ThreadPoolTaskExecutorBatch setList(List list) {
    this.list = list;
    return this;
    }

    public ThreadPoolTaskExecutorBatch setPoolSize(int poolSize) {
    this.poolSize = poolSize;
    return this;
    }

    public ThreadPoolTaskExecutorBatch setPageSize(int pageSize) {
    this.pageSize = pageSize;
    return this;
    }

    public ThreadPoolTaskExecutorBatch setAutoPoolSize(boolean autoPoolSize) {
    this.autoPoolSize = autoPoolSize;
    return this;
    }

    public ThreadPoolTaskExecutorBatch setMaxPoolSize(int maxPoolSize) {
    this.maxPoolSize = maxPoolSize;
    return this;
    }

    public ThreadPoolTaskExecutorBatch setMaxPageSize(int maxPageSize) {
    this.maxPageSize = maxPageSize;
    return this;
    }

    public void start() {
    TimeInterval timer = DateUtil.timer();
    log.info("----开始生成数据----");
    if(!this.autoPoolSizeByList()){
    return;
    }
    log.debug("批任务处理信息:autoPoolSize={} poolSize={} pageSize={} maxPoolSize={} maxPageSize={}",
    this.autoPoolSize, this.poolSize, this.pageSize, this.maxPoolSize, this.maxPageSize);
    //是否继续
    boolean proceed = true;
    //线程创建轮数
    int rounds = 0;
    //起始页数
    int page = 0;
    List<Callable<Integer>> list = new ArrayList<>();
    Pager pager;
    while (proceed) {
    rounds++;
    int temp = 0;
    for (int k = 0; k < this.poolSize; k++) {
    page++;
    try {
    pager = new Pager<>(this.list, page, pageSize);
    if(pager.getContent().size() == 0){
    break;
    }
    list.add(getInstance(abstractBatchCallable).setPager(pager));
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    try {

    ExecutorService executor = Executors.newFixedThreadPool(this.poolSize);
    List<Future<Integer>> results = executor.invokeAll(list);
    executor.shutdown();
    for (Future<Integer> result : results) {
    temp += result.get();
    }
    } catch (Exception e) {
    log.error("生成数据出错" + e.getMessage(), e);
    }
    log.info("----线程创建轮【" + rounds + "】,页数:" + page + ",当前轮结束状态" + temp);
    if (temp < this.poolSize) {
    proceed = false;
    }
    list.clear();
    }

    log.info("----总轮数:" + rounds + ",总页数:" + page + ",耗时:" + timer.intervalMs());
    }

    /**
    * 自动计算批任务执行线程数、每线程执行任务数
    * @return
    */
    private boolean autoPoolSizeByList(){
    if(!this.autoPoolSize){
    return true;
    }
    int allSize = this.list.size();
    if(allSize == 0){
    return false;
    }
    // 任务总数小于等于最大线程数,则创建任务数线程,每线程执行1任务
    if(allSize <= this.maxPoolSize){
    this.setPoolSize(allSize);
    this.setPageSize(1);
    return true;
    }
    // 任务总数小于等于最大线程数*最大线程执行任务数,则线程数最大值,每线程执行任务总数除以线程数最大值进位值
    int rem = allSize % this.maxPoolSize;
    int value = allSize / this.maxPoolSize;
    if(allSize <= this.maxPoolSize * this.maxPageSize){
    this.setPoolSize(this.maxPoolSize);
    this.setPageSize(rem == 0 ? value : (value + 1));
    return true;
    }
    return true;
    }

    }
  2. 线程批量处理执行抽象类,需要批量处理数据的任务都可以继承此抽象类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package com.mwk.thread.task;


    import com.mwk.utils.Pager;
    import org.springframework.util.CollectionUtils;

    import java.util.List;
    import java.util.concurrent.Callable;

    /**
    * 线程批量处理执行抽象类,需要批量处理数据的任务都可以继承此抽象类
    *
    * @author MinWeikai
    * @date 2021/8/7 10:44
    */
    public abstract class AbstractBatchCallable implements Callable<Integer> {

    /**
    * 需要批量处理的数据集
    */
    protected List list;

    private Pager pager;

    @Override
    public Integer call() {
    if (CollectionUtils.isEmpty(list)) {
    return 0;
    }
    this.exec();
    return list.size() < pager.getPageSize() ? 0 : 1;
    }

    /**
    * 自定义的执行方法
    */
    protected abstract void exec();

    public AbstractBatchCallable setPager(Pager pager) {
    this.pager = pager;
    this.list = pager.getContent();
    return this;
    }
    }
  3. 测试任务类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.mwk.thread.task;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.List;

    /**
    * 测试任务类
    *
    * @author MinWeikai
    * @date 2021/8/7 10:48
    */
    public class MytBatchCallableTest extends AbstractBatchCallable {

    private static final Logger log = LoggerFactory.getLogger(MytBatchCallableTest.class);

    @Override
    public void exec() {
    List<Integer> list = (List<Integer>) this.list;
    log.debug("集合值:{}", list);
    }
    }
  4. 测试处理集合调用方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package com.mwk.thread.task;

    import java.util.List;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;

    /**
    * 测试处理集合调用方法
    *
    * @author MinWeikai
    * @date 2021/8/13 22:04
    */
    public class ThreadPoolTaskExecutorBatchTest {

    public static void main(String[] args) {
    List<Integer> list = IntStream.range(1, 1995).boxed().collect(Collectors.toList());
    System.out.println("待执行集合:" + list);
    ThreadPoolTaskExecutorBatch
    .build()
    .setAbstractBatchCallable(MytBatchCallableTest.class)
    .setList(list)
    // .setAutoPoolSize(false)
    // .setPoolSize(10)
    // .setPageSize(50)
    .start();
    }
    }

代码

博客代码路径

用到的分页工具