0
点赞
收藏
分享

微信扫一扫

MongoDB批量导入Redis优化迭代笔记

背景

统计最近五天所有content信息的正文字节数(正文字段占用较多),然后根据这个大小,推送存在redis要配置多少的内存。

统计方法

1.在mongodb中查询

db.content_.aggregate([
  {
    $match: {
      updatetime: {
        $gte: 1686134400000,  // 对应日期 "2023-06-07T00:00:00Z" 的毫秒数
        $lte: 1686671999000   // 对应日期 "2023-06-12T23:59:59Z" 的毫秒数
      }
    }
  }
  ,
  {
    $group: {
      _id: null,
      avgSize: {
        $avg: { $strLenBytes: "$content", $encoding: "utf8" }
      }
    }
  }
])

**测试结果:**content字段每条约占3000字节,一天接近20万数据,redis每天存最新一天的数据,设置过期时间5天, 5天x20万数据x3000字节约等于3GB数据。

同步redis采用生产消费者模式

static MyBlockingQueue<Iterator<DBObject>> queue = new MyBlockingQueue<>(24);

private static final ExecutorService executorService = Executors.newFixedThreadPool(6*4); //创建一个固定大小的线程池,线程池大小,根据任务类型,因为都是io查询,线程大部分在等待带宽,所以线程数=cupx2或者3,4都可以,需要测试哪种性能最好。我本地cup6核。
   public static void main(String[] args) throws UnknownHostException {
        long startTime = System.currentTimeMillis();

        //1.连接mongodb,查询时间范围的数据
        int totalPages = 0;
        // 输入日期
        long startTimestamp = LocalDate.parse("2023-06-15").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long endTimestamp = LocalDate.parse("2023-06-16").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long ExTimestamp = LocalDate.parse("2023-06-11").atStartOfDay(ZoneOffset.UTC).toEpochSecond();

        // 创建 MongoTemplate 实例
        MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:2580/zb_contxxx");
        SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
        MongoTemplate mongoTemplate = new MongoTemplate(factory);

        System.out.println("开始查询");
        //添加一个标志,用来判断是否为最后一个查询任务
        Object queryFinishedMarker = new Object();
        for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
            long startTimestamp2 = i;
            executorService.execute(() -> {
                Query query = new Query(Criteria.where("updatetime").gte(startTimestamp2).lte(endTimestamp));

                Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
                        //组装查询条件
                        .find(query.getQueryObject())
                        //设置批量从数据库中获取的数据量
                        .batchSize(2000)
                        .iterator();
                try {
                    queue.put(cursor);
                     // 判断是否为最后一个查询任务,是则添加标记对象
                    if (startTimestampTemp + 3600 >= endTimestamp) {
                        queue.put((Iterator<DBObject>) queryFinishedMarker);
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        System.out.println("查询完毕");
        //3.添加到redis中
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        JedisPool jedisPool = new JedisPool(poolConfig, "172.0.0.1", 6379);
        Jedis jedis;
        jedis = jedisPool.getResource();
        Pipeline pipeline = jedis.pipelined();
        try {
            while (true){
                Iterator<DBObject> cursor = queue.take();
                if (cursor == queryFinishedMarker) {
                    // 所有数据已经消费完毕,退出循环
                    break;
                }
                while (cursor.hasNext()) {
                    DBObject document = cursor.next();
                    //过期时间为数据跟新时间减去自定义时间达到,早入库的早过期,晚入库的晚过期的效果,比如今天是20号,20号入库的数据5天后过期,19号入库的数据4天后过期。
                    pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
                    System.out.println(document.toString());
                    System.out.println("总数:" + totalPages++);
                    pipeline.sync();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 异常处理
        } finally {
            if (jedis != null) {
                jedis.close(); // 释放连接
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("执行总耗时:"+(endTime - startTime));
    }

在上述代码中:

1.我们采用redis pipeline管道进行数据批量提交,而不是一条一条提交,提高效率。

2.将mongodb查库采用每小时一个查询,弃用翻页查询采用游标查询,一次查询2000条。

3.采用多线程生产者消费者模式,每个线程查询一个小时的数据,将结果加入到队列中,redis负责消费队列中的数据。

测试结果:

消费者一直处于饥饿状态,生产者的速度太慢,生产消费者模式效果达不到预期。

改进建议:

删除生产者消费者模式,改为多线程查询数据直接插入redis中。

最终完整代码:

public static void main(String[] args) throws UnknownHostException {
        long startTime = System.currentTimeMillis();

        //1.连接mongodb,查询时间范围的数据,翻页查询
        int totalPages = 0;
        // 输入日期
        long startTimestamp = LocalDate.parse("2023-06-17").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long endTimestamp = LocalDate.parse("2023-06-18").atStartOfDay(ZoneOffset.UTC).toEpochSecond();
        long ExTimestamp = LocalDate.parse("2023-06-12").atStartOfDay(ZoneOffset.UTC).toEpochSecond();

        //查询语句

        // 创建 MongoTemplate 实例
        MongoClientURI uri = new MongoClientURI("mongodb://127.0.0.1:xxx/zb_contxxx");
        SimpleMongoDbFactory factory = new SimpleMongoDbFactory(uri);
        MongoTemplate mongoTemplate = new MongoTemplate(factory);

        System.out.println("开始查询");
        //添加一个标志,用来判断是否为最后一个查询任务
        Object queryFinishedMarker = new Object();

        JedisPoolConfig poolConfig = new JedisPoolConfig();
        JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379);
        Jedis jedis;
        jedis = jedisPool.getResource();

        for (long i = startTimestamp; i <endTimestamp ; i+=3600) {
            long startTimestampTemp = i;
            //线程池数量,线程池数量根据任务类型,你是io型任务还是计算密集类型,大部分时间是花在io等待数据返回,所以线程池数量可以开到cpu数 x2最少,你可以从x2开始往上试,×3,x4,甚至x5
            executorService.execute(() -> {
                //看下updatetime字段的索引,mongodb的底层也是b+树,看看他有没有索引,索引的区分度高不高,如果不高,需要换查询方法,换成id之类的区分度高的有索引的。
                Query query = new Query(Criteria.where("updatetime").gte(startTimestampTemp).lte(endTimestamp));

                Iterator<DBObject> cursor = mongoTemplate.getCollection("zb_content_document")
                        //组装查询条件
                        .find(query.getQueryObject())
                        //设置批量从数据库中获取的数据量
                        .batchSize(5000)
                        .iterator();

                Pipeline pipeline = jedis.pipelined();
                while (cursor.hasNext()) {
                    DBObject document = cursor.next();
                    pipeline.setex(document.get("_id").toString(), (int) ((Long)document.get("updatetime") - ExTimestamp), (String) document.get("content"));
                    System.out.println(document.toString());
                    pipeline.sync();
                }
            });

        }
        long endTime = System.currentTimeMillis();
        System.out.println("执行总耗时:"+(endTime - startTime));
    }

代码导入版本迭代记录

  1. 将mongodb查出来,redis插入数据
  2. redis改成管道,一次批量提交数据
  3. 优化mongodb查询,mongodb底层是b+树,需要我们看一下查询时的索引是否存在,以及索引的分辨度高不高。
  4. mongodb改成使用迭代器分批查询 75万数据80分钟
  5. 添加线程池,使用多线程生产消费者模式
  6. 添加标志位用来结束程序
  7. 实际测试,消费者一直处于饥饿状态,删除生产消费者模式,改为mongodb和redis用同一个线程
  8. 优化线程池数量,线程执行任务都是io处理,所以线程的大部分时间是等待io返回,带宽处理,所以线程数可以多开一些,从cup数x2往上测试。

结果总结

经过五天的测试,每天加入一天的数据,并设置5天过期时间,加上内存碎片率的为1.4,实际需要内存最少3GBx1.4=4.2GB内存,为了增加一些冗余,设置redis内存5个GB。

举报

相关推荐

0 条评论