package com.energy.online.data.service.util; import java.util.*; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 批量处理工具 * * @author wangwenbing * @date 2022-03-29 */ public class BatchUtil { /** * 分批处理 * * @param data 要处理的数据 * @param batchNum 每次处理数量 * @param consumer 处理方法 * @param * @return */ public static void batchHandle(List data, int batchNum, Consumer> consumer){ if (data == null || data.isEmpty()) { return; } int len = data.size(); int fromIdx = 0; int toIdx = batchNum; List partialData = null; while (true) { toIdx = toIdx > len ? len : toIdx; partialData = data.subList(fromIdx, toIdx); if (partialData.isEmpty()) { break; } fromIdx = toIdx; toIdx = fromIdx + batchNum; consumer.accept(partialData); } } /** * 提取新增数据 * * @param newData * @param oldData * @return */ public static List filterInsertData( List newData, List oldData, Function keyFunction) { Set newKeys = newData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Set oldKeys = oldData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Set needInsertKeys = new HashSet<>(newKeys); needInsertKeys.removeAll(oldKeys); List result = new ArrayList<>(); newData.stream().forEach(e -> { Object key = keyFunction.apply(e); if (needInsertKeys.contains(key)) { result.add(e); } }); return result; } /** * 提取更新数据 * * @param newData * @param oldData * @param keyFunction * @param isChange old,new * @param idSetter * @param idGetter * @return */ public static List filterUpdateData( List newData, List oldData, Function keyFunction, BiFunction isChange, BiConsumer idSetter, Function idGetter) { Set newKeys = newData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Set oldKeys = oldData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Map oldDataMap = oldData .stream().collect(Collectors.toMap(e -> keyFunction.apply(e), e -> e, (o, n) -> n)); Set needUpdateKeys = new HashSet<>(newKeys); needUpdateKeys.retainAll(oldKeys); List result = new ArrayList<>(); newData.stream().forEach(e -> { Object key = keyFunction.apply(e); if (needUpdateKeys.contains(key)) { T oldRecord = oldDataMap.get(key); if (oldRecord != null && isChange.apply(oldRecord, e)) { idSetter.accept(e, idGetter.apply(oldRecord)); result.add(e); } } }); return result; } /** * 提取删除数据 * * @param newData * @param oldData * @return */ public static List filterDeleteData( List newData, List oldData, Function keyFunction) { Set newKeys = newData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Set oldKeys = oldData.stream().map(e -> keyFunction.apply(e)).collect(Collectors.toSet()); Set needDeleteKeys = new HashSet<>(oldKeys); needDeleteKeys.removeAll(newKeys); List result = new ArrayList<>(); oldData.stream().forEach(e -> { Object key = keyFunction.apply(e); if (needDeleteKeys.contains(key)) { result.add(e); } }); return result; } }