目录
  1. 背景
  2. 思路
    1. 思路一
    2. 思路二
    3. 思路三
    4. 思路四
    5. 思路五
    6. 思路六(最终思路,建议先自己思考一下后再看)
      1. 朴素思路
      2. 我的思路
  3. 附录
    1. CountLatch源码
『分治算法』记一次明细报表统计优化

背景

每个月导出当月的订单明细数据。(数据量大概1个月20W左右,附加场景: 每个月的订单数据并不是100%连续存储在数据库中的,存在极小部分数据因为后期数据修复分散在了数据表的各种角落中)

但是这些订单数据不仅仅是读SQL就行了,还需要做一些加工。举例,订单表中可能只存储了productId,但是最终导出的明细数据还需要商品名称、商品类目等,甚至是一些更复杂的加工处理。

机器性能限制: cpu 1核 + JVM内存 1G

思路

思路一

直接读出当月所有数据,然后逐条进行处理,伪代码如下: (ps: 均为伪代码,表达一下大概思想,不保证能运行)

1
2
3
4
5
6
7
8
9
10
List<Result> results = new ArrayList();
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
List<Order> orders = orderService.list(query);
for (Order order : orders) {
Result result = handle(order);
results.add(result);
}
return results;

这个方法我没有真实去试验。但是大概能够比较明显的看出两个问题:

  1. 一次性读取的订单数据量太大,很容易导致内存不够 (没有实验,不排除真的能跑得动)
  2. 后续单条单条加工数据,性能会很慢

思路二

分页处理,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<Result> results = new ArrayList();
for (int page = 1; ; page++) {
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
query.setPage(page);
query.setPageSize(100); // 假设我们选择100作为分页数量
List<Order> orders = orderService.list(query);
if (orders.size() == 0) {
break;
}
List<Result> orderResults = batchHandle(orders);
results.addAll(orderResults);
}
return results;

到了这里,看起来我们解决了思路一的两个问题,终于可以松一口气了。但是相信明眼的大家已经发现,其实这玩意儿可以优化,例如完全可以多线程并发处理。于是有了思路三。

思路三

分页+多线程处理,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int THREAD_NUM = 20;
List<Result> results = new Vector();
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
for (int i = 1; i <= THREAD_NUM; i++) {
new Thread(() -> {
try {
for (int page = i; ; page += THREAD_NUM) {
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
query.setPage(page);
query.setPageSize(100);
List<Order> orders = orderService.list(query);
if (orders.size() == 0) {
break;
}
List<Result> orderResults = batchHandle(orders);
results.addAll(orderResults);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
return results;

这样子代码就看起来很完美了。但是实际上运行效果却非常不好。最最直白的结果就是:

等了好久,但报表却怎么也导不出来!同时数据库CPU持续在100%狂飙!

那么问题来了,为什么呢?不妨来猜猜。 (谜底在下方,白色的字,选中就能看清)

mysql在limit分页到后面时,就会很慢很慢,多线程并发执行更是加剧了数据库的压力

思路四

那么针对问题三,我想都没想,直接上了最朴素的思路。没错,就是优化分页SQL。

众所周知,分页SQL可以通过这种方式来进行优化,

例,原SQL:

1
select * from `order` order by id desc limit 50,30;

优化SQL: (里面嵌了两层 select id 是因为mysql语法的限制)

1
select * from `order` where id in (select id from (select id from `order` order by id desc limit 50,30) as t);

换了一下SQL,效果的确好了很多,但是仍然不如人意。

  1. 报表生成仍然得花1个小时
  2. 数据库CPU仍然保持在100%,并没有降低

思路五

我们知道分页SQL其实还有一种优化方式,即根据idFrom来优化,即每次查询获取最大id,下次查询时从这个id开始查询。又是完美的思路!!!

但是!当你开始激动地编码的时候,你发现了现实很残酷,你没法多线程来跑了。因为每次的idFrom依赖于上次的查询结果……那么接下来怎么做呢?

…… (建议先自己思考后再往下看)

……

……

……

……

……

思路六(最终思路,建议先自己思考一下后再看)

那其实思路六就是对思路五的优化,我们唯一需要思考的问题就是如何能够多线程并发跑思路五。

其实很简单,一个idFrom不够,那再加一个idTo不就可以了吗。

但是接下来,问题来了,这个idFrom和idTo该怎么设置呢?(注意: 背景中场景描述附加了一个条件,订单表中有小部分数据是离散的)

朴素思路

最朴素的想法,借鉴思路四的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int THREAD_NUM = 20;
int PAGE_SIZE = 100;
List<Result> results = new Vector();
CountDownLatch latch = new CountDownLatch(THREAD_NUM);
for (int i = 1; i <= THREAD_NUM; i++) {
new Thread(() -> {
try {
for (int idFrom = i; ; idFrom += THREAD_NUM * PAGE_SIZE) {
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
query.setIdFrom(idFrom);
query.setIdTo(idFrom + PAGE_SIZE);
List<Order> orders = orderService.list(query);
if (orders.size() == 0) {
break;
}
List<Result> orderResults = batchHandle(orders);
results.addAll(orderResults);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
return results;

但是这样写,显然是有一定问题的。你相当于把整个订单表都给读了一遍,本来只要1个月的数据,结果你读了所有的数据,那性能肯定要下降不少。

我的思路

想到idFrom和idTo,其实很容易想到一个算法,二分,二分不就是给一个左边界和右边界,然后进行计算的算法吗?

那同样的,我们这边也可以参照二分的思路来写,对应到这次的解法也就是分治。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int PAGE_SIZE = 100;

void main() {
Vector<Result> results = new Vector<>();
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> binaryHandle(1, Integer.MAX_VALUE, latch, results)).start();
latch.await();
}

void binaryHandle(int left, int right, CountLatch latch, Vector<Result> results) {
try {
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
query.setIdFrom(left);
query.setIdTo(right);
int count = orderService.count(query);
if (count <= 0) {
return;
} else if (count <= PAGE_SIZE) { // 少于分页数量,直接处理
// handle
} else {
CountDownLatch countDownLatch = new CountDownLatch(2);
int mid = (left + right) / 2;
new Thread(() -> binaryHandle(left, mid, countDownLatch, results)).start();
new Thread(() -> binaryHandle(mid, right, countDownLatch, results)).start();
countDownLatch.await();
}
} finally {
latch.countDown();
}
}

你认为代码这样就行了吗?

不,不行,你会发现跑着跑着,线程就创建不了了,因为你稍微算一下,就会发现这种算法的线程量消耗是非常夸张的。

那我们怎么办?用线程池呗,但是线程池真的没问题吗?

是的,线程池是有问题的,会卡住。因为countDownLatch的原因子线程一直卡着父线程。

那么怎么解?自定义一个CountLatch (CountLatch的代码见附录),同时支持countDown()和countUp()操作,然后改改代码就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int PAGE_SIZE = 100;
ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // 等待队列长度无限


void main() {
Vector<Result> results = new Vector<>();
CountLatch latch = new CountLatch();
latch.countUp();
pool.execute(() -> binaryHandle(1, Integer.MAX_VALUE, latch, results));
latch.await();
}

void binaryHandle(int left, int right, CountLatch latch, Vector<Result> results) {
try {
OrderQuery query = new OrderQuery();
query.setOrderTimeFrom("2021-02-01 00:00:00");
query.setOrderTimeTo("2021-03-01 00:00:00");
query.setIdFrom(left);
query.setIdTo(right);
int count = orderService.count(query);
if (count <= 0) {
return;
} else if (count <= PAGE_SIZE) {
// handle
} else {
int mid = (left + right) / 2;
latch.countUp();
pool.execute(() -> binaryHandle(left, mid, countDownLatch, results));
latch.countUp();
pool.execute(() -> binaryHandle(mid, right, countDownLatch, results));
}
} finally {
latch.countDown();
}
}

最终整个报表跑完只花了5分钟!

优化完成,到此结束!!!虽然最终应用本身的cpu会跑到100%,但是调调线程池大小就可以很容易地处理了。

附录

CountLatch源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CountLatch {

private volatile int count = 0;
private final Object object = new Object();

public void countUp() {
synchronized (object) {
count += 1;
}
}

public void countDown() {
synchronized (object) {
count -= 1;
if (count <= 0) {
innerNotifyAll();
}
}
}

public void await() throws InterruptedException {
if (count <= 0) {
return;
}
innerWait();
}

private void innerWait() throws InterruptedException {
synchronized (this) {
this.wait();
}
}

private void innerNotifyAll() {
synchronized (this) {
this.notifyAll();
}
}
}
文章作者: 谷河
文章链接: https://www.lyytaw.com/java/%E3%80%8E%E5%88%86%E6%B2%BB%E7%AE%97%E6%B3%95%E3%80%8F%E8%AE%B0%E4%B8%80%E6%AC%A1%E6%8A%A5%E8%A1%A8%E7%BB%9F%E8%AE%A1%E4%BC%98%E5%8C%96/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 谷河|BLOG