延迟任务实现方案之 Redisson RDelayedQueue
原创大约 2 分钟
RedissonDelayedQueueService
/**
* Redisson 延迟队列服务
*/
@Service
@RequiredArgsConstructor
public class RedissonDelayedQueueService {
private final RedissonClient redissonClient;
/**
* 向指定的队列添加一条延迟消息
*
* @param e 消息对象
* @param delay 延迟时间
* @param timeUnit 时间单位
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void add(E e, long delay, TimeUnit timeUnit, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(e, delay, timeUnit);
}
/**
* 从指定队列移除一条延迟消息
*
* @param e 待移除的消息
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void remove(E e, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.remove(e);
}
/**
* 从指定队列移除一条延迟消息
*
* @param filter 过滤器
* @param queueName 队列名
* @param <E> 消息对象类型
*/
public <E> void removeIf(Predicate<? super E> filter, String queueName) {
RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.removeIf(filter);
}
/**
* 获取指定的阻塞队列
* 消费端使用
*
* @param queueName 队列名
* @param <E> 队列中每条消息对象的类型
* @return 指定的阻塞队列
*/
public <E> RBlockingDeque<E> getBlockingQueue(String queueName) {
return redissonClient.getBlockingDeque(queueName);
}
}
TestController
@RestController
@RequiredArgsConstructor
@RequestMapping("/test")
public class TestController {
private static final String TEST_DELAY = "test:delay";
private final AtomicLong counter = new AtomicLong(1);
private final RedissonDelayedQueueService redissonDelayedQueueService;
@GetMapping("/offer/{delay}/{message}")
public R<Void> offer(@PathVariable("delay") Long delay,
@PathVariable("message") String message) {
redissonDelayedQueueService.add(Message.builder()
.id(counter.getAndIncrement())
.content(message)
.build(),
delay, TimeUnit.SECONDS, TEST_DELAY);
return R.ok();
}
@GetMapping("/poll")
public R<Message> poll() {
RBlockingDeque<Message> queue = redissonDelayedQueueService.getBlockingQueue(TEST_DELAY);
return R.ok(queue.poll());
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Message implements Serializable {
private Long id;
private String content;
}
}