背景
每个月导出当月的订单明细数据。(数据量大概1个月20W左右,附加场景: 每个月的订单数据并不是100%连续存储在数据库中的,存在极小部分数据因为后期数据修复分散在了数据表的各种角落中)
但是这些订单数据不仅仅是读SQL就行了,还需要做一些加工。举例,订单表中可能只存储了productId,但是最终导出的明细数据还需要商品名称、商品类目等,甚至是一些更复杂的加工处理。
机器性能限制: cpu 1核 + JVM内存 1G
思路
思路一
直接读出当月所有数据,然后逐条进行处理,伪代码如下: (ps: 均为伪代码,表达一下大概思想,不保证能运行)
1 2 3 4 5 6 7 8 9 10
| List<Result> results = new ArrayList(); OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); List<Order> orders = orderService.list(query); for (Order order : orders) { Result result = handle(order); results.add(result); } return results;
|
这个方法我没有真实去试验。但是大概能够比较明显的看出两个问题:
- 一次性读取的订单数据量太大,很容易导致内存不够 (没有实验,不排除真的能跑得动)
- 后续单条单条加工数据,性能会很慢
思路二
分页处理,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| List<Result> results = new ArrayList(); for (int page = 1; ; page++) { OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); query.setPage(page); query.setPageSize(100); List<Order> orders = orderService.list(query); if (orders.size() == 0) { break; } List<Result> orderResults = batchHandle(orders); results.addAll(orderResults); } return results;
|
到了这里,看起来我们解决了思路一的两个问题,终于可以松一口气了。但是相信明眼的大家已经发现,其实这玩意儿可以优化,例如完全可以多线程并发处理。于是有了思路三。
思路三
分页+多线程处理,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| int THREAD_NUM = 20; List<Result> results = new Vector(); CountDownLatch latch = new CountDownLatch(THREAD_NUM); for (int i = 1; i <= THREAD_NUM; i++) { new Thread(() -> { try { for (int page = i; ; page += THREAD_NUM) { OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); query.setPage(page); query.setPageSize(100); List<Order> orders = orderService.list(query); if (orders.size() == 0) { break; } List<Result> orderResults = batchHandle(orders); results.addAll(orderResults); } } finally { latch.countDown(); } }).start(); } latch.await(); return results;
|
这样子代码就看起来很完美了。但是实际上运行效果却非常不好。最最直白的结果就是:
等了好久,但报表却怎么也导不出来!同时数据库CPU持续在100%狂飙!
那么问题来了,为什么呢?不妨来猜猜。 (谜底在下方,白色的字,选中就能看清)
mysql在limit分页到后面时,就会很慢很慢,多线程并发执行更是加剧了数据库的压力
思路四
那么针对问题三,我想都没想,直接上了最朴素的思路。没错,就是优化分页SQL。
众所周知,分页SQL可以通过这种方式来进行优化,
例,原SQL:
1
| select * from `order` order by id desc limit 50,30;
|
优化SQL: (里面嵌了两层 select id 是因为mysql语法的限制)
1
| select * from `order` where id in (select id from (select id from `order` order by id desc limit 50,30) as t);
|
换了一下SQL,效果的确好了很多,但是仍然不如人意。
- 报表生成仍然得花1个小时
- 数据库CPU仍然保持在100%,并没有降低
思路五
我们知道分页SQL其实还有一种优化方式,即根据idFrom来优化,即每次查询获取最大id,下次查询时从这个id开始查询。又是完美的思路!!!
但是!当你开始激动地编码的时候,你发现了现实很残酷,你没法多线程来跑了。因为每次的idFrom依赖于上次的查询结果……那么接下来怎么做呢?
…… (建议先自己思考后再往下看)
……
……
……
……
……
思路六(最终思路,建议先自己思考一下后再看)
那其实思路六就是对思路五的优化,我们唯一需要思考的问题就是如何能够多线程并发跑思路五。
其实很简单,一个idFrom不够,那再加一个idTo不就可以了吗。
但是接下来,问题来了,这个idFrom和idTo该怎么设置呢?(注意: 背景中场景描述附加了一个条件,订单表中有小部分数据是离散的)
朴素思路
最朴素的想法,借鉴思路四的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| int THREAD_NUM = 20; int PAGE_SIZE = 100; List<Result> results = new Vector(); CountDownLatch latch = new CountDownLatch(THREAD_NUM); for (int i = 1; i <= THREAD_NUM; i++) { new Thread(() -> { try { for (int idFrom = i; ; idFrom += THREAD_NUM * PAGE_SIZE) { OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); query.setIdFrom(idFrom); query.setIdTo(idFrom + PAGE_SIZE); List<Order> orders = orderService.list(query); if (orders.size() == 0) { break; } List<Result> orderResults = batchHandle(orders); results.addAll(orderResults); } } finally { latch.countDown(); } }).start(); } latch.await(); return results;
|
但是这样写,显然是有一定问题的。你相当于把整个订单表都给读了一遍,本来只要1个月的数据,结果你读了所有的数据,那性能肯定要下降不少。
我的思路
想到idFrom和idTo,其实很容易想到一个算法,二分,二分不就是给一个左边界和右边界,然后进行计算的算法吗?
那同样的,我们这边也可以参照二分的思路来写,对应到这次的解法也就是分治。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| int PAGE_SIZE = 100;
void main() { Vector<Result> results = new Vector<>(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> binaryHandle(1, Integer.MAX_VALUE, latch, results)).start(); latch.await(); }
void binaryHandle(int left, int right, CountLatch latch, Vector<Result> results) { try { OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); query.setIdFrom(left); query.setIdTo(right); int count = orderService.count(query); if (count <= 0) { return; } else if (count <= PAGE_SIZE) { } else { CountDownLatch countDownLatch = new CountDownLatch(2); int mid = (left + right) / 2; new Thread(() -> binaryHandle(left, mid, countDownLatch, results)).start(); new Thread(() -> binaryHandle(mid, right, countDownLatch, results)).start(); countDownLatch.await(); } } finally { latch.countDown(); } }
|
你认为代码这样就行了吗?
不,不行,你会发现跑着跑着,线程就创建不了了,因为你稍微算一下,就会发现这种算法的线程量消耗是非常夸张的。
那我们怎么办?用线程池呗,但是线程池真的没问题吗?
是的,线程池是有问题的,会卡住。因为countDownLatch的原因子线程一直卡着父线程。
那么怎么解?自定义一个CountLatch (CountLatch的代码见附录),同时支持countDown()和countUp()操作,然后改改代码就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| int PAGE_SIZE = 100; ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
void main() { Vector<Result> results = new Vector<>(); CountLatch latch = new CountLatch(); latch.countUp(); pool.execute(() -> binaryHandle(1, Integer.MAX_VALUE, latch, results)); latch.await(); }
void binaryHandle(int left, int right, CountLatch latch, Vector<Result> results) { try { OrderQuery query = new OrderQuery(); query.setOrderTimeFrom("2021-02-01 00:00:00"); query.setOrderTimeTo("2021-03-01 00:00:00"); query.setIdFrom(left); query.setIdTo(right); int count = orderService.count(query); if (count <= 0) { return; } else if (count <= PAGE_SIZE) { } else { int mid = (left + right) / 2; latch.countUp(); pool.execute(() -> binaryHandle(left, mid, countDownLatch, results)); latch.countUp(); pool.execute(() -> binaryHandle(mid, right, countDownLatch, results)); } } finally { latch.countDown(); } }
|
最终整个报表跑完只花了5分钟!
优化完成,到此结束!!!虽然最终应用本身的cpu会跑到100%,但是调调线程池大小就可以很容易地处理了。
附录
CountLatch源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class CountLatch {
private volatile int count = 0; private final Object object = new Object();
public void countUp() { synchronized (object) { count += 1; } }
public void countDown() { synchronized (object) { count -= 1; if (count <= 0) { innerNotifyAll(); } } }
public void await() throws InterruptedException { if (count <= 0) { return; } innerWait(); }
private void innerWait() throws InterruptedException { synchronized (this) { this.wait(); } }
private void innerNotifyAll() { synchronized (this) { this.notifyAll(); } } }
|