欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Java如何利用线程池和Redis实现高效数据入库,

来源: javaer 分享于  点击 31510 次 点评:91

Java如何利用线程池和Redis实现高效数据入库,


目录
  • 利用线程池和Redis实现高效数据入库
  • 主要思路和组件介绍
    • 思路概述
    • 主要组件
  • 详细代码解析
    • 详细讲解
      • 总结

        利用线程池和Redis实现高效数据入库

        在高并发环境中,进行数据入库是一项具有挑战性的任务。

        本文将介绍如何利用线程池和Redis实现数据的实时缓存和批量入库处理,确保系统的性能和稳定性。

        主要思路和组件介绍

        思路概述

        在高并发情况下,数据入库需要解决两个主要问题:实时性和稳定性。

        通过将数据首先存储在Redis缓存中,可以快速响应和处理大量的数据请求,然后利用线程池定期批量将数据从Redis取出并入库,以减少数据库压力和提高整体性能。

        主要组件

        1. BatchDataStorageService:核心服务类,负责数据的实时缓存和定期批量入库处理。
        2. CacheService:简易缓存服务类,使用ConcurrentHashMap实现内存缓存,用于快速存取和处理数据。
        3. RedisUtils:封装了对Redis的操作,用于数据的持久化存储和高速读取。
        4. BatchWorker:实现了Runnable接口,处理从Redis中读取数据并执行批量入库的任务。
        5. BatchTimeoutCommitThread:定时监控数据是否达到设定的批次或超时时间,并触发数据入库操作。

        详细代码解析

        • BatchDataStorageService
        package io.jack.service.impl;
        
        import com.alibaba.fastjson.JSON;
        import com.alibaba.fastjson.JSONArray;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.beans.factory.InitializingBean;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.stereotype.Component;
        
        import javax.annotation.Resource;
        import java.util.ArrayList;
        import java.util.List;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        
        /**
         * 数据批量入库服务类
         */
        @Component
        @Slf4j
        public class BatchDataStorageService implements InitializingBean {
        
            /**
             * 最大批次数量
             */
            @Value("${app.db.maxBatchCount:800}")
            private int maxBatchCount;
        
            /**
             * 最大线程数
             */
            @Value("${app.db.maxBatchThreads:100}")
            private int maxBatchThreads;
        
            /**
             * 超时时间,单位毫秒
             */
            @Value("${app.db.batchTimeout:3000}")
            private int batchTimeout;
        
            /**
             * 当前批次数量
             */
            private int batchCount = 0;
        
            /**
             * 当前批次号
             */
            private static long batchNo = 0;
        
            /**
             * 线程池执行器
             */
            private ExecutorService executorService = null;
        
            /**
             * 缓存服务
             */
            @Resource
            private CacheService cacheService;
        
            /**
             * 设备实时服务
             */
            @Resource
            private DeviceRealTimeService deviceRealTimeService;
        
            /**
             * Redis工具类
             */
            @Resource
            private RedisUtils redisUtils;
        
            /**
             * 初始化线程池
             */
            @Override
            public void afterPropertiesSet() {
                executorService = Executors.newFixedThreadPool(maxBatchThreads);
            }
        
            /**
             * 保存设备实时数据
             * 
             * @param deviceRealTimeDTO 设备实时数据传输对象
             */
            public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
                final String failedCacheKey = "device:real_time:failed_records";
        
                try {
                    // 生成批次和持续时间的缓存键
                    String durationKey = "device:real_time:batchDuration" + batchNo;
                    String batchKey = "device:real_time:batch" + batchNo;
        
                    // 如果当前批次持续时间不存在,则创建并启动超时处理线程
                    if (!cacheService.exists(durationKey)) {
                        cacheService.put(durationKey, System.currentTimeMillis());
                        new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
                    }
        
                    // 将设备实时数据加入当前批次
                    cacheService.lPush(batchKey, deviceRealTimeDTO);
                    if (++batchCount >= maxBatchCount) {
                        // 达到最大批次,执行入库逻辑
                        dataStorage(durationKey, batchKey, failedCacheKey);
                    }
        
                } catch (Exception ex) {
                    log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex);
                    cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
                } finally {
                    updateRealTimeData(deviceRealTimeDTO);
                }
            }
        
            /**
             * 更新实时数据到Redis
             * 
             * @param deviceRealTimeDTO 设备实时数据传输对象
             */
            private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) {
                redisUtils.set("real_time:" + deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
            }
        
            /**
             * 批量入库处理
             * 
             * @param durationKey 持续时间标识
             * @param batchKey    批次标识
             * @param failedCacheKey 错误记录标识
             */
            private void dataStorage(String durationKey, String batchKey, String failedCacheKey) {
                batchNo++;
                batchCount = 0;
                cacheService.del(durationKey);
                if (batchNo >= Long.MAX_VALUE) {
                    batchNo = 0;
                }
                executorService.execute(new BatchWorker(batchKey, failedCacheKey));
            }
        
            /**
             * 批量工作线程
             */
            private class BatchWorker implements Runnable {
        
                private final String failedCacheKey;
                private final String batchKey;
        
                public BatchWorker(String batchKey, String failedCacheKey) {
                    this.batchKey = batchKey;
                    this.failedCacheKey = failedCacheKey;
                }
        
                @Override
                public void run() {
                    final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
                    try {
                        // 从缓存中获取批量数据
                        DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
                        while (deviceRealTimeDTO != null) {
                            deviceRealTimeDTOList.add(deviceRealTimeDTO);
                            deviceRealTimeDTO = cacheService.lPop(batchKey);
                        }
        
                        long timeMillis = System.currentTimeMillis();
        
                        try {
                            // 将DTO转换为实体对象并批量入库
                            List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
                            deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
                        } finally {
                            cacheService.del(batchKey);
                            log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms");
                        }
                    } catch (Exception e) {
                        log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e);
                        for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) {
                            cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
                        }
                    }
                }
            }
        
            /**
             * 批次超时提交线程
             */
            class BatchTimeoutCommitThread extends Thread {
        
                private final String batchKey;
                private final String durationKey;
                private final String failedCacheKey;
        
                public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) {
                    this.batchKey = batchKey;
                    this.durationKey = durationKey;
                    this.failedCacheKey = failedCacheKey;
                    this.setName("batch-thread-" + batchKey);
                }
        
                @Override
                public void run() {
                    try {
                        Thread.sleep(batchTimeout);
                    } catch (InterruptedException e) {
                        log.error("[DB] 内部错误,直接提交:" + e.getMessage());
                    }
        
                    if (cacheService.exists(durationKey)) {
                        // 达到最大批次的超时间,执行入库逻辑
                        dataStorage(durationKey, batchKey, failedCacheKey);
                    }
                }
            }
        }
        • CacheService
        package io.jack.service;
        
        import org.springframework.beans.factory.InitializingBean;
        import org.springframework.stereotype.Component;
        
        import java.util.LinkedList;
        import java.util.Map;
        import java.util.concurrent.ConcurrentHashMap;
        import java.util.concurrent.atomic.AtomicLong;
        
        /**
         * 缓存服务类,提供简易的缓存机制
         */
        @Component
        public class CacheService implements InitializingBean {
        
            /**
             * 内存缓存,用于存储数据
             */
            private Map<String, Object> objectCache = new ConcurrentHashMap<>();
        
            /**
             * 统计缓存,用于统计计数
             */
            private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();
        
            /**
             * 初始化统计缓存
             */
            @Override
            public void afterPropertiesSet() {
                statCache.put("terminals", new AtomicLong(0));
                statCache.put("connections", new AtomicLong(0));
            }
        
            /**
             * 增加指定统计项的计数
             * 
             * @param statName 统计项名称
             * @return 增加后的计数值
             */
            public long incr
        
        (String statName) {
                statCache.putIfAbsent(statName, new AtomicLong(0));
                return statCache.get(statName).incrementAndGet();
            }
        
            /**
             * 减少指定统计项的计数
             * 
             * @param statName 统计项名称
             * @return 减少后的计数值
             */
            public long decr(String statName) {
                statCache.putIfAbsent(statName, new AtomicLong(0));
                return statCache.get(statName).decrementAndGet();
            }
        
            /**
             * 获取指定统计项的当前计数值
             * 
             * @param statName 统计项名称
             * @return 当前计数值
             */
            public long stat(String statName) {
                statCache.putIfAbsent(statName, new AtomicLong(0));
                return statCache.get(statName).get();
            }
        
            /**
             * 存储数据
             * 
             * @param key 缓存键
             * @param object 缓存数据
             */
            public <T> void put(String key, T object) {
                objectCache.put(key, object);
            }
        
            /**
             * 获取数据
             * 
             * @param key 缓存键
             * @return 缓存数据
             */
            public <T> T get(String key) {
                return (T) objectCache.get(key);
            }
        
            /**
             * 删除数据
             * 
             * @param key 缓存键
             */
            public void remove(String key) {
                objectCache.remove(key);
            }
        
            /**
             * 存储哈希表数据
             * 
             * @param key 哈希表键
             * @param subkey 哈希表子键
             * @param value 哈希表值
             */
            public void hSet(String key, String subkey, Object value) {
                synchronized (objectCache) {
                    Map<String, Object> submap = (Map<String, Object>) objectCache.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
                    submap.put(subkey, value);
                }
            }
        
            /**
             * 获取哈希表数据
             * 
             * @param key 哈希表键
             * @param subkey 哈希表子键
             * @return 哈希表值
             */
            public <T> T hGet(String key, String subkey) {
                synchronized (objectCache) {
                    Map<String, Object> submap = (Map<String, Object>) objectCache.get(key);
                    return submap != null ? (T) submap.get(subkey) : null;
                }
            }
        
            /**
             * 判断哈希表子键是否存在
             * 
             * @param key 哈希表键
             * @param subkey 哈希表子键
             * @return 是否存在
             */
            public boolean hExists(String key, String subkey) {
                synchronized (objectCache) {
                    Map<String, Object> submap = (Map<String, Object>) objectCache.get(key);
                    return submap != null && submap.containsKey(subkey);
                }
            }
        
            /**
             * 将数据推入列表
             * 
             * @param key 列表键
             * @param value 列表值
             */
            public void lPush(String key, Object value) {
                synchronized (objectCache) {
                    LinkedList<Object> queue = (LinkedList<Object>) objectCache.computeIfAbsent(key, k -> new LinkedList<>());
                    queue.addLast(value);
                }
            }
        
            /**
             * 从列表中弹出数据
             * 
             * @param key 列表键
             * @return 列表值
             */
            public <T> T lPop(String key) {
                synchronized (objectCache) {
                    LinkedList<Object> queue = (LinkedList<Object>) objectCache.get(key);
                    return queue != null && !queue.isEmpty() ? (T) queue.removeLast() : null;
                }
            }
        
            /**
             * 删除缓存数据
             * 
             * @param key 缓存键
             */
            public void del(String key) {
                objectCache.remove(key);
            }
        
            /**
             * 判断缓存键是否存在
             * 
             * @param key 缓存键
             * @return 是否存在
             */
            public boolean exists(String key) {
                return objectCache.containsKey(key);
            }
        }

        详细讲解

        BatchDataStorageService

        字段和初始化

        • maxBatchCount:配置文件中指定的最大批次大小,用于控制每批处理的数据量。
        • maxBatchThreads:线程池的最大线程数,影响处理并发能力。
        • batchTimeout:批次超时时间,用于控制数据处理的最迟时间。
        • batchCount:当前批次中的数据条数,用于判断是否需要提交批次数据。
        • batchNo:批次号,用于标识不同的批次。
        • executorService:用于执行批量处理任务的线程池。
        • cacheServicedeviceRealTimeServiceredisUtils:分别用于缓存操作、数据存储和 Redis 操作。

        方法详解

        • afterPropertiesSet:初始化线程池,以便在后续操作中执行批量处理任务。
        • saveRealTimeData
          • 将实时数据推入缓存中,检查是否需要提交批次数据。
          • 如果超时或数据量达到阈值,则调用 dataStorage 方法处理数据。
        • updateRealTimeData:将数据更新到 Redis,确保实时数据的可用性。
        • dataStorage
          • 执行批量数据的存储操作,并提交工作线程处理数据。
        • BatchWorker
          • 从缓存中获取数据并执行入库操作,将成功的数据记录日志,将失败的数据放入失败缓存。
        • BatchTimeoutCommitThread
          • 处理批次超时逻辑,即使在未满批次的情况下也会提交数据,确保数据及时处理。

        CacheService

        字段

        • objectCache:用于存储普通缓存数据。
        • statCache:用于存储统计数据,例如计数器等。

        方法详解

        • put/get/remove:基本的缓存操作,支持存储、获取和删除数据。
        • hSet/hGet/hExists
          • 对哈希表进行操作,支持设置、获取和检查键值对。
        • lPush/lPop
          • 对列表进行操作,支持推入和弹出数据。
        • incr/decr/stat
          • 对统计数据进行操作,支持增加、减少和获取当前值。

        总结

        本文介绍了如何在高并发环境下利用线程池和Redis实现高效的数据入库。通过将数据首先存入Redis缓存,并利用线程池定期批量处理入库操作,能够有效提升系统的性能和稳定性。完整代码包括核心的BatchDataStorageService服务类、CacheService缓存服务类以及RedisUtils工具类,均提供了详细的注释和解析,以便读者理解和实现类似的高并发数据处理系统

        以上为个人经验,希望能给大家一个参考,也希望大家多多支持3672js教程。

        您可能感兴趣的文章:
        • Java创建线程池的几种方式代码示例
        • 简单剖析Java中动态线程池的扩容以及缩容操作
        • Java线程池的工作机制详解
        • Java创建线程池的方式实现
        • 使用Java Executors创建线程池的9种方法
        相关栏目:

        用户点评