中介绍了几种海量数据实时排名的方法
本文章将介绍使用Redis sorted-set及自己开发的多线程算法实现
方法一:使用Redis sorted-set
/**
* 多线程-根据Redis缓存添加往排行榜添加元素,并获取排行前10的元素* @param rankService* @throws InterruptedException*/private void test12(final IRankService rankService) throws InterruptedException{final String rankName = "rank_a";
final int threadCount = 100;final int dataCountPerThread = 100000;final int maxId = 100000;final int maxValue = 1000000;Thread[] threads = new Thread[threadCount];
final int[][] ids = new int[threadCount][];final long[][] values = new long[threadCount][];final JedisPool pool = getRedisPool(threadCount);final CountDownLatch latch = new CountDownLatch(threadCount);rankService.createRank(rankName);// 生成id和数据int index = 0;for(int i = 0;i<threadCount;i++){ ids[i] = new int[dataCountPerThread];values[i] = new long[dataCountPerThread];for(int j = 0;j<dataCountPerThread;j++){ ids[i][j] = ++index;/*randomId(maxId);*/values[i][j] = randomValue(maxValue);}}// 生成线程for(int threadI=0;threadI<threadCount ;threadI++){ final int threadIndex = threadI;Thread thread = new Thread("threadIndex"+threadIndex){ @Overridepublic void run(){ final Jedis jedis = pool.getResource();for(int i=0;i<dataCountPerThread;i++){ jedis.zadd(rankName, values[threadIndex][i], ""+ids[threadIndex][i]);}jedis.close();latch.countDown();}};threads[threadI] = thread;}// 执行long t1 = System.nanoTime();for(int threadI=0;threadI<threadCount ;threadI++){ threads[threadI].start();}latch.await();long t2 = System.nanoTime();log.info("put useTime:"+(t2-t1)/1000000);// getfinal Jedis jedis = pool.getResource();/*int testId=30;Long jedisValue = jedis.zrevrank(rankName, ""+testId);log.info("redis:"+jedisValue);*/Set<String> set = jedis.zrangeByScore(rankName, 1, 10);int index1 = 0;for(String string : set){ log.info("Rank"+(++index1)+":"+string);/*Long jedisValue = jedis.zrevrank(rankName, string);log.info("redis:"+jedisValue);*/}long t3 = System.nanoTime();log.info("get useTime:"+(t3-t2)/1000000);}方法二:自己开发的多线程算法实现海量数据实时排行(码云Git地址:https://gitee.com/barrywang/hqrank.git)
实现算法描述:
(1) node分为elementNode和rankElementNode,上诉图中的node为elementNode
(2)elementNode链表按照从大到小记录所有value,每个elementNode节点记录一个分数,保存一个element节点链表
(3)每个element链表按照时间从小到大记录该value的id,每个element节点记录一个id
(4)step链表(1)是对node链表的索引,提高对node的定位速度
(5)step链表(2)是对step链表(1)的索引,提高对step(1)的定位速度,从而提高对node的定位速度
(6)step链表(3)是对element链表的索引,提高对element的定位速度
(7)step和node中保存有其索引范围内的element数量,即排行所需数据
(8)对于多字段查询,除了最后一个字段层级使用elementNode用于存储element,其余字段层级使用rankElementNode,每个rankElementNode保存一个新的rank
/**
* 多线程-多线程添加往排行榜添加元素,并获取排行前10的元素* @param rankService* @throws InterruptedException*/private void test11(final IRankService rankService) throws InterruptedException{ Thread[] threads = new Thread[threadCount];final int[][] ids = new int[threadCount][];final long[][] values = new long[threadCount][];final CountDownLatch latch = new CountDownLatch(threadCount);rankService.createRank(rankName);// 生成id和数据int index = 0;for(int i = 0;i<threadCount;i++){ ids[i] = new int[dataCountPerThread];values[i] = new long[dataCountPerThread];for(int j = 0;j<dataCountPerThread;j++){ ids[i][j] = ++index;/*randomId(maxId);*/values[i][j] = randomValue(maxValue);}}// 生成线程for(int threadI=0;threadI<threadCount ;threadI++){ final int threadIndex = threadI;Thread thread = new Thread("threadIndex"+threadIndex){ @Overridepublic void run(){ for(int i=0;i<dataCountPerThread;i++){ rankService.put(rankName, ids[threadIndex][i], values[threadIndex][i]);}latch.countDown();}};threads[threadI] = thread;}// 执行long t1 = System.nanoTime();for(int threadI=0;threadI<threadCount ;threadI++){ threads[threadI].start();}latch.await();long t2 = System.nanoTime();log.info("put useTime:"+(t2-t1)/1000000);// get/*int testId=30;for(int i=0;i<10;i++){ RankData rankData = rankService.getRankDataById(rankName, testId+i);log.info("rankData1:"+rankData);}rankService.put(rankName, testId, 1);RankData rankData2 = rankService.getRankDataById(rankName, testId);log.info("rankData2:"+rankData2);*/for(int i=1; i<=10; i++){ RankData rankData = rankService.getRankDataByRankNum(rankName, i);log.info("Rank"+i+":["+rankData+"]");}long t3 = System.nanoTime();log.info("get useTime:"+(t3-t2)/1000000);}
实测(MacBook Pro,Core(TM) i7-3520M CPU @ 2.90GHz 双核(四逻辑核心),DDR3 8G 1600HZ,-Xmx4096m):1000万数据排行,200线程共每分钟访问50万(均匀访问)排行请求,cpu利用率维持在100%(共400%),访问复杂度基本不变,注意:当内存即将耗尽,CPU会更加多的走高,直到资源耗尽(gc频率增加)
自己实现的方式比Redis实现的性能要好很多