跳至主要內容
延迟任务实现方案之 Redisson RDelayedQueue

RedissonDelayedQueueService

/**
 * Redisson 延迟队列服务
 */
@Service
@RequiredArgsConstructor
public class RedissonDelayedQueueService {

    private final RedissonClient redissonClient;

    /**
     * 向指定的队列添加一条延迟消息
     *
     * @param e         消息对象
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void add(E e, long delay, TimeUnit timeUnit, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(e, delay, timeUnit);
    }

    /**
     * 从指定队列移除一条延迟消息
     *
     * @param e         待移除的消息
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void remove(E e, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.remove(e);
    }

    /**
     * 从指定队列移除一条延迟消息
     *
     * @param filter    过滤器
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void removeIf(Predicate<? super E> filter, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.removeIf(filter);
    }

    /**
     * 获取指定的阻塞队列
     * 消费端使用
     *
     * @param queueName 队列名
     * @param <E>       队列中每条消息对象的类型
     * @return 指定的阻塞队列
     */
    public <E> RBlockingDeque<E> getBlockingQueue(String queueName) {
        return redissonClient.getBlockingDeque(queueName);
    }

}

cpgege原创大约 2 分钟随笔延时队列