RedissonDelayedQueueService
/**
* Redisson 延迟队列服务
*/
@Service
@RequiredArgsConstructor
public class RedissonDelayedQueueService {
private final RedissonClient redissonClient;
/**
* 向指定的队列添加一条延迟消息
*
* @param e 消息对象
* @param delay 延迟时间
* @param timeUnit 时间单位
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void add(E e, long delay, TimeUnit timeUnit, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(e, delay, timeUnit);
}
/**
* 从指定队列移除一条延迟消息
*
* @param e 待移除的消息
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void remove(E e, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.remove(e);
}
/**
* 从指定队列移除一条延迟消息
*
* @param filter 过滤器
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void removeIf(Predicate<? super E> filter, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.removeIf(filter);
}
/**
* 获取指定的阻塞队列
* 消费端使用
*
* @param queueName 队列名
* @param <E> 队列中每条消息对象的类型
* @return 指定的阻塞队列
*/
public <E> RBlockingDeque<E> getBlockingQueue(String queueName) {
return redissonClient.getBlockingDeque(queueName);
}
}
原创大约 2 分钟