本文介绍hazelcast中Ringbuffer的使用。
Ringbuffer的基础使用
首先放上基础代码
示例代码
代码主要包含三部分:
- main方法 对Ringbuffer进行配置,开启两个线程分别进行Ringbuffer的消费和生产。
- 消费者方法 sampleRingbufferConsumer 消耗Ringbuffer中的内容。
- 生产者方法 包括两种 sampleRingbufferProducer 和sampleRingbufferProducerWithFail 主要区别是OverflowPolicy不一样。
通过运行main方法即可以看到Ringbuffer的生产和消费,虽然有报错异常,但是用来说明Ringbuffer的生产、消费机制已经足够了。
这里注意一下Ringbuffer与Queue的区别
,Queue被消耗后就不存在了,但是Ringbuffer可以让不同消费者以不同的offset进行消费,只要这个指针大于等于headSequence,小于等于tailSequence即可。
Ringbuffer的使用主要是关注headSequence,tailSequence。
headSequence就是Ringbuffer最老数据的一端,如果Ringbuffer为空的话,那么headSequence会比tailSequence大一,初始化值为0。
tailSequence就是Ringbuffer正在插入数据的一端,初始值为-1。
Ringbuffer的TTL
我们在示例代码中提供了一个capacity为100
的,名称为rb
,TTL为10秒
的Ringbuffer。为了显示TTL的作用,我们注释掉这行代码(25行)
1 | //ringbufferConfig.setTimeToLiveSeconds(10); |
我们对比之后发现主要的区别是
1 | rb0 headSequence is 0 tailSequence is 79 nowSeq is 1 remain capacity is 100 |
capacity 一直是100,这是为什么呢?
在没有设置TTL的时候,这个剩余容量永远等于初始容量。
这是TTL被影响的第一直观部分。
还有就是在com.hazelcast.ringbuffer.Ringbuffer#addAsync的参数有一个OverflowPolicy,目前有两种实现:
- OverflowPolicy.OVERWRITE 在设置为该类型后,队列满后重写最久的元素,是add的默认策略。
- OverflowPolicy.FAIL 新写入的会失败,如果没写入则返回-1,程序负责后续处理。
这两种抛弃策略都是依赖于设置了TTL才有。并且可以通过选择注释下面两行来切换观察这两种抛弃策略。
1 | executor.submit(() -> sampleRingbufferProducer(instance)); |
可以看出OVERWRITE会出现比较大的跳数,很多数据还没被消费就跳过了。官方文档也给出要使用OverflowPolicy.FAIL ,并且采用了跟示例代码类似的重试机制。
Ringbuffer的发送重试
发送重试是在设置了TTL,并且采用OverflowPolicy.FAIL的时候才进行。
当addAsync返回结果为-1,表示插入失败。则随机sleep一段时间,然后再进行插入。具体逻辑见方法sampleRingbufferProducerWithFail。
Ringbuffer的移动
对于capacity为100,理论上是headSequence,tailSequence在100以内,但实际情况是tailSequence-headSequence差值不超过100。
Ringbuffer的正确读取
示例代码V2
与示例代码相比V2增加了headSequence和tailSequence的判断,这样发现就不会出现读取时不在sequence范围内的问题了。
Ringbuffer的存疑
经过V2修正后,发现读取数据到98的时候,sequence已经切换到100了,中间的98、99已经没有了。如果要进一步支持,需要设置ringbufferStoreConfig,把内容持久化下来,这样进行消费的时候才能够准确找到。
Ringbuffer总结
- Ringbuffer可以提供一个重复消费的队列。
- 设置TTL以使OverflowPolicy生效。
- 为了能够不发生跳数,漏数据,最好设置RingbufferStore。
通过以上三步才能获取一个生产可用的环形缓存,虽然其实实现不是一个标准的Ringbuffer。