引言

因为暂时在写的APP接口这边有这么一个需求 ,双方在进行交易的时候,买方上传完了支付凭证之后需要卖方这边确认,然后会有xx流向买方账户,这里就有一个超时自动确认的功能,类似淘宝的自动确认收货吧

一开始我没有准备去用延时队列写的,就想用一个定时任务去完成,在每天的xx时间查出所有已经达到自动确认收货要求的订单,然后批量修改,不过文档里有个坑,上面的要求是小时,而不是天,这样的话定时任务就行不通,必然也不能把间隔改小

然后我就找了找关于延时队列的文章,花了半天时间去看了一下关于DelayQueue和有关线程池的文章

Code

package org.opsli.modulars.otc.task;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author xiamo
 * @Description: otc-xd交易自动确认支付/买家xd入账
 * @ClassName: DelayedOtcPayConfirm
 * @date 2021/6/28 10:24
 */
public class DelayedOtcPayConfirm implements Delayed {


    /**
     * xd交易订单id - otc_market_order id
     */
    private String orderId;

    /**
     * 订单自动确认收货时间
     */
    private long expire;

    /**
     * 创建时间
     */
    private long createTime;

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public long getExpire() {
        return expire;
    }

    public void setExpire(long expire) {
        this.expire = expire;
    }

    public long getCreateTime() {
        return createTime;
    }

    public void setCreateTime(long createTime) {
        this.createTime = createTime;
    }

    /**
     * Instantiates a new Delayed otc pay confirm.
     *
     * @param orderId the order id  订单id
     * @param delay   the delay     延时时间
     */
    public DelayedOtcPayConfirm(String orderId, long delay) {
        this.orderId = orderId;
        //到期时间 = 当前时间+延迟时间
        expire = System.currentTimeMillis() + delay;
        createTime = System.currentTimeMillis();
    }
    @Override
    public long getDelay(TimeUnit timeUnit) {
        return timeUnit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}
package org.opsli.modulars.otc.task;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.opsli.api.wrapper.otc.OtcMarketModel;
import org.opsli.api.wrapper.otc.OtcMarketOrderModel;
import org.opsli.api.wrapper.otc.OtcXdCongealModel;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse;
import org.opsli.common.thread.refuse.AsyncProcessorReFuse;
import org.opsli.modulars.otc.entity.OtcMarket;
import org.opsli.modulars.otc.entity.OtcMarketOrder;
import org.opsli.modulars.otc.service.IOtcMarketOrderService;
import org.opsli.modulars.otc.service.IOtcMarketService;
import org.opsli.modulars.otc.service.IOtcXdCongealService;
import org.opsli.modulars.otc.service.impl.OtcMarketOrderServiceImpl;
import org.opsli.modulars.xxb.enums.UserAssetOptTypeEnum;
import org.opsli.modulars.xxb.receiver.UserAsset;
import org.opsli.modulars.xxb.service.IXxbUserAssetService;
import org.opsli.modulars.xxb.wrapper.XxbUserAssetModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author xiamo
 * @Description: otc-xd交易自动确认支付/买家xd入账
 * @ClassName: OtcPayConfirmTask
 * @date 2021/6/28 10:41
 */
@Slf4j
@Component
public class OtcPayConfirmTask {

    @Autowired
    private IOtcMarketOrderService otcMarketOrderService;

    @Autowired
    private IOtcXdCongealService otcXdCongealService;

    @Autowired
    private IXxbUserAssetService xxbUserAssetService;

    @Autowired
    private IOtcMarketService otcMarketService;

    private static DelayQueue<DelayedOtcPayConfirm> delayQueue = new DelayQueue<DelayedOtcPayConfirm>();

    private static OtcPayConfirmTask instance = new OtcPayConfirmTask();

    /**
     * Get instance otc pay confirm task.
     *
     * @return the otc pay confirm task
     */
    public static OtcPayConfirmTask getInstance() {
        return instance;
    }

    /**
     * Create task.
     * 创建延时任务
     *
     * @param orderId the order id
     */
    public void createTask(String orderId) {
        producer(delayQueue, orderId);
        consumer(delayQueue);
        System.out.println("delayQueue size:" + delayQueue.size());
    }

    /**
     * Producer.
     *
     * @param delayQueue the delay queue
     * @param orderId    the order id
     */
    private void producer(final DelayQueue<DelayedOtcPayConfirm> delayQueue, String orderId) {
        long delay = 10000;

        DelayedOtcPayConfirm data = new DelayedOtcPayConfirm(orderId, delay);
        delayQueue.offer(data);
    }

    /**
     * Consumer.
     *
     * @param delayQueue the delay queue
     */
    private void consumer(final DelayQueue<DelayedOtcPayConfirm> delayQueue) {
        // 执行
        AsyncProcessorReFuse.executeTask(new AsyncProcessQueueReFuse.TaskWrapper(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    DelayedOtcPayConfirm element = null;
                    try {
                        // 没有满足延时的元素 用poll返回 null
                        // 没有满足延时的元素 用take会阻塞
                        element = delayQueue.take();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    String orderId = element.getOrderId();
                    OtcMarketOrderModel order = otcMarketOrderService.get(orderId);
                    // 判断交易订单是否已完成
                    if (!("2".equals(order.getRecordStatus()))) {
                        // 未完成则自动确认
                        log.info("订单:{} 未确认支付,执行自动确认操作!", orderId);
                        // 查出该笔订单中的卖方和买方
                        Map<String, String> userIds = otcMarketOrderService.getCheckUserId(order, 1);
                        // 入库
                        otcMarketOrderService.userConfirm(order, userIds.get("sellUserId"), userIds.get("buyUserId"));

                    } else {
                        log.info("订单:{} 已确认支付", orderId);
                    }
                }
            }
        }));

    }

}
package org.opsli.common.thread.refuse;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.*;

/**
 * @Author: 一枝花算不算浪漫
 * @CreateTime: 2020-10-08 10:24
 * @Description: 自定义线程执行器 - 线程超时自动拒绝
 */
@Slf4j
public class AsyncProcessorReFuse {

    /**
     * 默认最大并发数<br>
     */
    private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

    /**
     * 线程池名称格式
     */
    private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Refuse-%d";

    /**
     * 线程工厂名称
     */
    private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder()
            .namingPattern(THREAD_POOL_NAME)
            .daemon(true).build();

    /**
     * 默认队列大小
     */
    private static final int DEFAULT_SIZE = 500;

    /**
     * 默认线程等待时间 秒
     */
    private static final int DEFAULT_WAIT_TIME = 10;

    /**
     * 默认线程存活时间
     */
    private static final long DEFAULT_KEEP_ALIVE = 60L;

    /**NewEntryServiceImpl.java:689
     * Executor
     */
    private static final ExecutorService EXECUTOR;

    /**
     * 执行队列
     */
    private static final BlockingQueue<Runnable> EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE);

    static {
        // 创建 Executor
        // 此处默认最大值改为处理器数量的 4 倍
        try {
           EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
               TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY);

           // 关闭事件的挂钩
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
              log.info("AsyncProcessor shutting down.");

              EXECUTOR.shutdown();

              try {
                 // 等待1秒执行关闭
                 if (!EXECUTOR.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
                     log.error("AsyncProcessor shutdown immediately due to wait timeout.");
                     EXECUTOR.shutdownNow();
                 }
              } catch (InterruptedException e) {
                 log.error("AsyncProcessor shutdown interrupted.");
                 EXECUTOR.shutdownNow();
              }

              log.info("AsyncProcessor shutdown complete.");
           }));
        } catch (Exception e) {
            log.error("AsyncProcessor init error.", e);
            throw new ExceptionInInitializerError(e);
        }
    }

    /**
     * 此类型无法实例化
     */
    private AsyncProcessorReFuse() {
    }

    /**
     * 执行任务,不管是否成功<br>
     * 其实也就是包装以后的 {@link } 方法
     *
     * @param task
     * @return
     */
    public static boolean executeTask(Runnable task) {
        try {
            EXECUTOR.execute(task);
        } catch (RejectedExecutionException e) {
            log.error("Task executing was rejected.", e);
            return false;
        }
        return true;
    }

    /**
     * 提交任务,并可以在稍后获取其执行情况<br>
     * 当提交失败时,会抛出 {@link }
     *
     * @param task
     * @return
     */
    public static <T> Future<T> submitTask(Callable<T> task) {
        try {
            return EXECUTOR.submit(task);
        } catch (RejectedExecutionException e) {
            log.error("Task executing was rejected.", e);
            throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
        }
    }
}
// 调用
OtcPayConfirmTask.getInstance().createTask(orderId);

Ex - ploooosion!