跳至主要內容

延迟任务实现方案之 Redisson RDelayedQueue

cpgege原创大约 2 分钟随笔延时队列

RedissonDelayedQueueService

/**
 * Redisson 延迟队列服务
 */
@Service
@RequiredArgsConstructor
public class RedissonDelayedQueueService {

    private final RedissonClient redissonClient;

    /**
     * 向指定的队列添加一条延迟消息
     *
     * @param e         消息对象
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void add(E e, long delay, TimeUnit timeUnit, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(e, delay, timeUnit);
    }

    /**
     * 从指定队列移除一条延迟消息
     *
     * @param e         待移除的消息
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void remove(E e, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.remove(e);
    }

    /**
     * 从指定队列移除一条延迟消息
     *
     * @param filter    过滤器
     * @param queueName 队列名
     * @param <E>       消息对象类型
     */
    public <E> void removeIf(Predicate<? super E> filter, String queueName) {
        RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.removeIf(filter);
    }

    /**
     * 获取指定的阻塞队列
     * 消费端使用
     *
     * @param queueName 队列名
     * @param <E>       队列中每条消息对象的类型
     * @return 指定的阻塞队列
     */
    public <E> RBlockingDeque<E> getBlockingQueue(String queueName) {
        return redissonClient.getBlockingDeque(queueName);
    }

}

TestController

@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
public class TestController {

    private static final String TEST_DELAY = "test:delay";

    private final AtomicLong counter = new AtomicLong(1);

    private final RedissonDelayedQueueService redissonDelayedQueueService;

    @GetMapping("/offer/{delay}/{message}")
    public R<Void> offer(@PathVariable("delay") Long delay,
                         @PathVariable("message") String message) {
        redissonDelayedQueueService.add(Message.builder()
                        .id(counter.getAndIncrement())
                        .content(message)
                        .build(),
                delay, TimeUnit.SECONDS, TEST_DELAY);
        return R.ok();
    }

    @GetMapping("/poll")
    public R<Message> poll() {
        RBlockingDeque<Message> queue = redissonDelayedQueueService.getBlockingQueue(TEST_DELAY);
        return R.ok(queue.poll());
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Message implements Serializable {

        private Long id;

        private String content;
    }

}

参考文献

上次编辑于: