引言
因为暂时在写的APP接口这边有这么一个需求 ,双方在进行交易的时候,买方上传完了支付凭证之后需要卖方这边确认,然后会有xx流向买方账户,这里就有一个超时自动确认的功能,类似淘宝的自动确认收货吧
一开始我没有准备去用延时队列写的,就想用一个定时任务去完成,在每天的xx时间查出所有已经达到自动确认收货要求的订单,然后批量修改,不过文档里有个坑,上面的要求是小时,而不是天,这样的话定时任务就行不通,必然也不能把间隔改小
然后我就找了找关于延时队列的文章,花了半天时间去看了一下关于DelayQueue和有关线程池的文章
Code
package org.opsli.modulars.otc.task;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author xiamo
* @Description: otc-xd交易自动确认支付/买家xd入账
* @ClassName: DelayedOtcPayConfirm
* @date 2021/6/28 10:24
*/
public class DelayedOtcPayConfirm implements Delayed {
/**
* xd交易订单id - otc_market_order id
*/
private String orderId;
/**
* 订单自动确认收货时间
*/
private long expire;
/**
* 创建时间
*/
private long createTime;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public long getExpire() {
return expire;
}
public void setExpire(long expire) {
this.expire = expire;
}
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
/**
* Instantiates a new Delayed otc pay confirm.
*
* @param orderId the order id 订单id
* @param delay the delay 延时时间
*/
public DelayedOtcPayConfirm(String orderId, long delay) {
this.orderId = orderId;
//到期时间 = 当前时间+延迟时间
expire = System.currentTimeMillis() + delay;
createTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit timeUnit) {
return timeUnit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed delayed) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS));
}
}
package org.opsli.modulars.otc.task;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.opsli.api.wrapper.otc.OtcMarketModel;
import org.opsli.api.wrapper.otc.OtcMarketOrderModel;
import org.opsli.api.wrapper.otc.OtcXdCongealModel;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse;
import org.opsli.common.thread.refuse.AsyncProcessorReFuse;
import org.opsli.modulars.otc.entity.OtcMarket;
import org.opsli.modulars.otc.entity.OtcMarketOrder;
import org.opsli.modulars.otc.service.IOtcMarketOrderService;
import org.opsli.modulars.otc.service.IOtcMarketService;
import org.opsli.modulars.otc.service.IOtcXdCongealService;
import org.opsli.modulars.otc.service.impl.OtcMarketOrderServiceImpl;
import org.opsli.modulars.xxb.enums.UserAssetOptTypeEnum;
import org.opsli.modulars.xxb.receiver.UserAsset;
import org.opsli.modulars.xxb.service.IXxbUserAssetService;
import org.opsli.modulars.xxb.wrapper.XxbUserAssetModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author xiamo
* @Description: otc-xd交易自动确认支付/买家xd入账
* @ClassName: OtcPayConfirmTask
* @date 2021/6/28 10:41
*/
@Slf4j
@Component
public class OtcPayConfirmTask {
@Autowired
private IOtcMarketOrderService otcMarketOrderService;
@Autowired
private IOtcXdCongealService otcXdCongealService;
@Autowired
private IXxbUserAssetService xxbUserAssetService;
@Autowired
private IOtcMarketService otcMarketService;
private static DelayQueue<DelayedOtcPayConfirm> delayQueue = new DelayQueue<DelayedOtcPayConfirm>();
private static OtcPayConfirmTask instance = new OtcPayConfirmTask();
/**
* Get instance otc pay confirm task.
*
* @return the otc pay confirm task
*/
public static OtcPayConfirmTask getInstance() {
return instance;
}
/**
* Create task.
* 创建延时任务
*
* @param orderId the order id
*/
public void createTask(String orderId) {
producer(delayQueue, orderId);
consumer(delayQueue);
System.out.println("delayQueue size:" + delayQueue.size());
}
/**
* Producer.
*
* @param delayQueue the delay queue
* @param orderId the order id
*/
private void producer(final DelayQueue<DelayedOtcPayConfirm> delayQueue, String orderId) {
long delay = 10000;
DelayedOtcPayConfirm data = new DelayedOtcPayConfirm(orderId, delay);
delayQueue.offer(data);
}
/**
* Consumer.
*
* @param delayQueue the delay queue
*/
private void consumer(final DelayQueue<DelayedOtcPayConfirm> delayQueue) {
// 执行
AsyncProcessorReFuse.executeTask(new AsyncProcessQueueReFuse.TaskWrapper(new Runnable() {
@Override
public void run() {
while (true) {
DelayedOtcPayConfirm element = null;
try {
// 没有满足延时的元素 用poll返回 null
// 没有满足延时的元素 用take会阻塞
element = delayQueue.take();
} catch (Exception e) {
e.printStackTrace();
}
String orderId = element.getOrderId();
OtcMarketOrderModel order = otcMarketOrderService.get(orderId);
// 判断交易订单是否已完成
if (!("2".equals(order.getRecordStatus()))) {
// 未完成则自动确认
log.info("订单:{} 未确认支付,执行自动确认操作!", orderId);
// 查出该笔订单中的卖方和买方
Map<String, String> userIds = otcMarketOrderService.getCheckUserId(order, 1);
// 入库
otcMarketOrderService.userConfirm(order, userIds.get("sellUserId"), userIds.get("buyUserId"));
} else {
log.info("订单:{} 已确认支付", orderId);
}
}
}
}));
}
}
package org.opsli.common.thread.refuse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.*;
/**
* @Author: 一枝花算不算浪漫
* @CreateTime: 2020-10-08 10:24
* @Description: 自定义线程执行器 - 线程超时自动拒绝
*/
@Slf4j
public class AsyncProcessorReFuse {
/**
* 默认最大并发数<br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 线程池名称格式
*/
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Refuse-%d";
/**
* 线程工厂名称
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder()
.namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/**
* 默认队列大小
*/
private static final int DEFAULT_SIZE = 500;
/**
* 默认线程等待时间 秒
*/
private static final int DEFAULT_WAIT_TIME = 10;
/**
* 默认线程存活时间
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**NewEntryServiceImpl.java:689
* Executor
*/
private static final ExecutorService EXECUTOR;
/**
* 执行队列
*/
private static final BlockingQueue<Runnable> EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
// 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍
try {
EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY);
// 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("AsyncProcessor shutting down.");
EXECUTOR.shutdown();
try {
// 等待1秒执行关闭
if (!EXECUTOR.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
log.error("AsyncProcessor shutdown immediately due to wait timeout.");
EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
log.error("AsyncProcessor shutdown interrupted.");
EXECUTOR.shutdownNow();
}
log.info("AsyncProcessor shutdown complete.");
}));
} catch (Exception e) {
log.error("AsyncProcessor init error.", e);
throw new ExceptionInInitializerError(e);
}
}
/**
* 此类型无法实例化
*/
private AsyncProcessorReFuse() {
}
/**
* 执行任务,不管是否成功<br>
* 其实也就是包装以后的 {@link } 方法
*
* @param task
* @return
*/
public static boolean executeTask(Runnable task) {
try {
EXECUTOR.execute(task);
} catch (RejectedExecutionException e) {
log.error("Task executing was rejected.", e);
return false;
}
return true;
}
/**
* 提交任务,并可以在稍后获取其执行情况<br>
* 当提交失败时,会抛出 {@link }
*
* @param task
* @return
*/
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return EXECUTOR.submit(task);
} catch (RejectedExecutionException e) {
log.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
// 调用
OtcPayConfirmTask.getInstance().createTask(orderId);
Comments | NOTHING