Java如何利用线程池和Redis实现高效数据入库,
分享于 点击 31510 次 点评:91
Java如何利用线程池和Redis实现高效数据入库,
目录
- 利用线程池和Redis实现高效数据入库
- 主要思路和组件介绍
- 思路概述
- 主要组件
- 详细代码解析
- 详细讲解
- 总结
利用线程池和Redis实现高效数据入库
在高并发环境中,进行数据入库是一项具有挑战性的任务。
本文将介绍如何利用线程池和Redis实现数据的实时缓存和批量入库处理,确保系统的性能和稳定性。
主要思路和组件介绍
思路概述
在高并发情况下,数据入库需要解决两个主要问题:实时性和稳定性。
通过将数据首先存储在Redis缓存中,可以快速响应和处理大量的数据请求,然后利用线程池定期批量将数据从Redis取出并入库,以减少数据库压力和提高整体性能。
主要组件
- BatchDataStorageService:核心服务类,负责数据的实时缓存和定期批量入库处理。
- CacheService:简易缓存服务类,使用ConcurrentHashMap实现内存缓存,用于快速存取和处理数据。
- RedisUtils:封装了对Redis的操作,用于数据的持久化存储和高速读取。
- BatchWorker:实现了Runnable接口,处理从Redis中读取数据并执行批量入库的任务。
- BatchTimeoutCommitThread:定时监控数据是否达到设定的批次或超时时间,并触发数据入库操作。
详细代码解析
- BatchDataStorageService
package io.jack.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 数据批量入库服务类 */ @Component @Slf4j public class BatchDataStorageService implements InitializingBean { /** * 最大批次数量 */ @Value("${app.db.maxBatchCount:800}") private int maxBatchCount; /** * 最大线程数 */ @Value("${app.db.maxBatchThreads:100}") private int maxBatchThreads; /** * 超时时间,单位毫秒 */ @Value("${app.db.batchTimeout:3000}") private int batchTimeout; /** * 当前批次数量 */ private int batchCount = 0; /** * 当前批次号 */ private static long batchNo = 0; /** * 线程池执行器 */ private ExecutorService executorService = null; /** * 缓存服务 */ @Resource private CacheService cacheService; /** * 设备实时服务 */ @Resource private DeviceRealTimeService deviceRealTimeService; /** * Redis工具类 */ @Resource private RedisUtils redisUtils; /** * 初始化线程池 */ @Override public void afterPropertiesSet() { executorService = Executors.newFixedThreadPool(maxBatchThreads); } /** * 保存设备实时数据 * * @param deviceRealTimeDTO 设备实时数据传输对象 */ public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { final String failedCacheKey = "device:real_time:failed_records"; try { // 生成批次和持续时间的缓存键 String durationKey = "device:real_time:batchDuration" + batchNo; String batchKey = "device:real_time:batch" + batchNo; // 如果当前批次持续时间不存在,则创建并启动超时处理线程 if (!cacheService.exists(durationKey)) { cacheService.put(durationKey, System.currentTimeMillis()); new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start(); } // 将设备实时数据加入当前批次 cacheService.lPush(batchKey, deviceRealTimeDTO); if (++batchCount >= maxBatchCount) { // 达到最大批次,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } catch (Exception ex) { log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex); cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } finally { updateRealTimeData(deviceRealTimeDTO); } } /** * 更新实时数据到Redis * * @param deviceRealTimeDTO 设备实时数据传输对象 */ private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { redisUtils.set("real_time:" + deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO)); } /** * 批量入库处理 * * @param durationKey 持续时间标识 * @param batchKey 批次标识 * @param failedCacheKey 错误记录标识 */ private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { batchNo++; batchCount = 0; cacheService.del(durationKey); if (batchNo >= Long.MAX_VALUE) { batchNo = 0; } executorService.execute(new BatchWorker(batchKey, failedCacheKey)); } /** * 批量工作线程 */ private class BatchWorker implements Runnable { private final String failedCacheKey; private final String batchKey; public BatchWorker(String batchKey, String failedCacheKey) { this.batchKey = batchKey; this.failedCacheKey = failedCacheKey; } @Override public void run() { final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>(); try { // 从缓存中获取批量数据 DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey); while (deviceRealTimeDTO != null) { deviceRealTimeDTOList.add(deviceRealTimeDTO); deviceRealTimeDTO = cacheService.lPop(batchKey); } long timeMillis = System.currentTimeMillis(); try { // 将DTO转换为实体对象并批量入库 List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class); deviceRealTimeService.insertBatch(deviceRealTimeEntityList); } finally { cacheService.del(batchKey); log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms"); } } catch (Exception e) { log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e); for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } } } } /** * 批次超时提交线程 */ class BatchTimeoutCommitThread extends Thread { private final String batchKey; private final String durationKey; private final String failedCacheKey; public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { this.batchKey = batchKey; this.durationKey = durationKey; this.failedCacheKey = failedCacheKey; this.setName("batch-thread-" + batchKey); } @Override public void run() { try { Thread.sleep(batchTimeout); } catch (InterruptedException e) { log.error("[DB] 内部错误,直接提交:" + e.getMessage()); } if (cacheService.exists(durationKey)) { // 达到最大批次的超时间,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } } }
- CacheService
package io.jack.service; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; /** * 缓存服务类,提供简易的缓存机制 */ @Component public class CacheService implements InitializingBean { /** * 内存缓存,用于存储数据 */ private Map<String, Object> objectCache = new ConcurrentHashMap<>(); /** * 统计缓存,用于统计计数 */ private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>(); /** * 初始化统计缓存 */ @Override public void afterPropertiesSet() { statCache.put("terminals", new AtomicLong(0)); statCache.put("connections", new AtomicLong(0)); } /** * 增加指定统计项的计数 * * @param statName 统计项名称 * @return 增加后的计数值 */ public long incr (String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).incrementAndGet(); } /** * 减少指定统计项的计数 * * @param statName 统计项名称 * @return 减少后的计数值 */ public long decr(String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).decrementAndGet(); } /** * 获取指定统计项的当前计数值 * * @param statName 统计项名称 * @return 当前计数值 */ public long stat(String statName) { statCache.putIfAbsent(statName, new AtomicLong(0)); return statCache.get(statName).get(); } /** * 存储数据 * * @param key 缓存键 * @param object 缓存数据 */ public <T> void put(String key, T object) { objectCache.put(key, object); } /** * 获取数据 * * @param key 缓存键 * @return 缓存数据 */ public <T> T get(String key) { return (T) objectCache.get(key); } /** * 删除数据 * * @param key 缓存键 */ public void remove(String key) { objectCache.remove(key); } /** * 存储哈希表数据 * * @param key 哈希表键 * @param subkey 哈希表子键 * @param value 哈希表值 */ public void hSet(String key, String subkey, Object value) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>()); submap.put(subkey, value); } } /** * 获取哈希表数据 * * @param key 哈希表键 * @param subkey 哈希表子键 * @return 哈希表值 */ public <T> T hGet(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.get(key); return submap != null ? (T) submap.get(subkey) : null; } } /** * 判断哈希表子键是否存在 * * @param key 哈希表键 * @param subkey 哈希表子键 * @return 是否存在 */ public boolean hExists(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) objectCache.get(key); return submap != null && submap.containsKey(subkey); } } /** * 将数据推入列表 * * @param key 列表键 * @param value 列表值 */ public void lPush(String key, Object value) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) objectCache.computeIfAbsent(key, k -> new LinkedList<>()); queue.addLast(value); } } /** * 从列表中弹出数据 * * @param key 列表键 * @return 列表值 */ public <T> T lPop(String key) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) objectCache.get(key); return queue != null && !queue.isEmpty() ? (T) queue.removeLast() : null; } } /** * 删除缓存数据 * * @param key 缓存键 */ public void del(String key) { objectCache.remove(key); } /** * 判断缓存键是否存在 * * @param key 缓存键 * @return 是否存在 */ public boolean exists(String key) { return objectCache.containsKey(key); } }
详细讲解
BatchDataStorageService
字段和初始化:
maxBatchCount
:配置文件中指定的最大批次大小,用于控制每批处理的数据量。maxBatchThreads
:线程池的最大线程数,影响处理并发能力。batchTimeout
:批次超时时间,用于控制数据处理的最迟时间。batchCount
:当前批次中的数据条数,用于判断是否需要提交批次数据。batchNo
:批次号,用于标识不同的批次。executorService
:用于执行批量处理任务的线程池。cacheService
、deviceRealTimeService
、redisUtils
:分别用于缓存操作、数据存储和 Redis 操作。
方法详解:
afterPropertiesSet
:初始化线程池,以便在后续操作中执行批量处理任务。saveRealTimeData
:- 将实时数据推入缓存中,检查是否需要提交批次数据。
- 如果超时或数据量达到阈值,则调用
dataStorage
方法处理数据。
updateRealTimeData
:将数据更新到 Redis,确保实时数据的可用性。dataStorage
:- 执行批量数据的存储操作,并提交工作线程处理数据。
BatchWorker
:- 从缓存中获取数据并执行入库操作,将成功的数据记录日志,将失败的数据放入失败缓存。
BatchTimeoutCommitThread
:- 处理批次超时逻辑,即使在未满批次的情况下也会提交数据,确保数据及时处理。
CacheService
字段:
objectCache
:用于存储普通缓存数据。statCache
:用于存储统计数据,例如计数器等。
方法详解:
put/get/remove
:基本的缓存操作,支持存储、获取和删除数据。hSet/hGet/hExists
:- 对哈希表进行操作,支持设置、获取和检查键值对。
lPush/lPop
:- 对列表进行操作,支持推入和弹出数据。
incr/decr/stat
:- 对统计数据进行操作,支持增加、减少和获取当前值。
总结
本文介绍了如何在高并发环境下利用线程池和Redis实现高效的数据入库。通过将数据首先存入Redis缓存,并利用线程池定期批量处理入库操作,能够有效提升系统的性能和稳定性。完整代码包括核心的BatchDataStorageService服务类、CacheService缓存服务类以及RedisUtils工具类,均提供了详细的注释和解析,以便读者理解和实现类似的高并发数据处理系统
以上为个人经验,希望能给大家一个参考,也希望大家多多支持3672js教程。
您可能感兴趣的文章:- Java创建线程池的几种方式代码示例
- 简单剖析Java中动态线程池的扩容以及缩容操作
- Java线程池的工作机制详解
- Java创建线程池的方式实现
- 使用Java Executors创建线程池的9种方法
用户点评