前言

之前有在b站找过一些关于多线程的视频看,这些视频大多以模拟高并发为主题来讲,比如说抢票之类的

这就更像是一种API压测了

使得我早期一度以为多线程的使用场景非常有限

而实际则是SpringBoot把活给干完了 ,比如最常见的请求,每个请求都会对应一个线程

异步(多线程)&事务

通过手动事务实现

1️⃣ 主线程

 public String rollbackTest() {
        PlatformTransactionManager transactionManager = SpringUtil.getBean(DataSourceTransactionManager.class);
        // 用于阻塞子线程,等待主线程判断是否需要回滚
        CountDownLatch latch = new CountDownLatch(1);
        // 用于阻塞主线程,等待所有子线程执行完毕
        CountDownLatch mainLatch = new CountDownLatch(2);
        // 回滚标识
        AtomicBoolean isRollback = new AtomicBoolean(false);
        try {

            // 两个异步函数
            asyncService.rollbackTestOne(transactionManager, latch, mainLatch, isRollback);
            asyncService.rollbackTestTwo(transactionManager, latch, mainLatch, isRollback);
            // 阻塞主线程
            mainLatch.await();
            // commit
            latch.countDown();
        } catch (Exception e) {
            System.out.println(e);
            // rollback
            isRollback.set(true);
            latch.countDown();
        }
        return "233333";
    }

2️⃣ rollbackTestOne

@Async
    public void rollbackTestOne(PlatformTransactionManager transactionManager, CountDownLatch latch, CountDownLatch mainLatch, AtomicBoolean isRollback) {
        DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
        defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus transStatus = transactionManager.getTransaction(defaultTransactionDefinition);

        log.info("=============== sql one start at " + System.currentTimeMillis());
        ThreadUtil.sleep(2000);
        TbGuildMessageC entity = mapper.selectById(4053);
        entity.setId(null);
        mapper.insert(entity);
        log.info("=============== sql one end at " + System.currentTimeMillis());
        
        try {
            mainLatch.countDown();
            log.warn("阻塞中...");
            latch.await();
            if (isRollback.get()) transactionManager.rollback(transStatus);
            else transactionManager.commit(transStatus);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

3️⃣ rollbackTestTwo

@Async
    public void rollbackTestOne(PlatformTransactionManager transactionManager, CountDownLatch latch, CountDownLatch mainLatch, AtomicBoolean isRollback) {
        DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
        defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus transStatus = transactionManager.getTransaction(defaultTransactionDefinition);

        log.info("=============== sql two start at " + System.currentTimeMillis());
        ThreadUtil.sleep(2000);
        TbGuildMessageC entity = mapper.selectById(4053);
        // entity.setId(null);
        mapper.insert(entity);
        log.info("=============== sql two end at " + System.currentTimeMillis());

        try {
            mainLatch.countDown();
            log.warn("阻塞中...");
            latch.await();
            if (isRollback.get()) transactionManager.rollback(transStatus);
            else transactionManager.commit(transStatus);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

这里会有异常产生,即主键冲突

Another

原本我想通过 List<TransactionStatus> 来批量回滚/提交事务(在主线程中完成,子线程调用 transactionManage.getTransaction 后对该集合进行赋值

显然这样做是不行的,会抛出异常

No value for key [HikariDataSource (HikariPool-1)] bound to thread

而这个异常则来自这里

Thread t = Thread.currentThread();

不难发现这个报错是因为线程不一致导致的

也可以观察事务创建时对 ThreadLocalMap 的操作

其中

Object key & Object actualKey 均为 HikariDataSource

Object value 为 ConnectionHolder

最后以 HikariDataSource 作为 key,ConnectionHolder 作为 value 保存在了当前线程的 ThreadLocalMap 中

后续进行事务提交/回滚的时候主线程拿不到保存在子线程 ThreadLocalMap 中的数据库连接信息,所以就抛出了上文中的异常

顺便再mark一篇关于事务源码的文章 http://www.itsoku.com/course/12/129


Go - goroutine

算是一个入门案例,之前并没有去了解过(因为用不上

  • go fun() - 启动一个协程
  • chan - 协程间的通信
package main

import (
	"fmt"
	"github.com/kataras/iris/v12"
	"github.com/mattn/go-colorable"
	"github.com/rs/cors"
	log "github.com/sirupsen/logrus"
	"time"
)

var ch = make(chan string)

func main() {
	// log样式配置
	log.SetFormatter(&log.TextFormatter{ForceColors: true})
	log.SetOutput(colorable.NewColorableStdout())

	// 读取消息
	go read()
	app := iris.New()
	c := cors.New(cors.Options{
		AllowedOrigins:   []string{"*"},
		AllowCredentials: true,
		Debug:            false,
	})
	app.WrapRouter(c.ServeHTTP)
	app.Get("/send", Send)
	err := app.Listen(":7072")
	if err != nil {
		return
	}
}

func Send(ctx iris.Context) {
	go save(ctx.URLParam("str"))
	Result(ctx, 200, "success", "")
}

//wg *sync.WaitGroup
func save(str string) {
	ch <- str
}

func read() {
	for v := range ch {
		fmt.Println(v + " Time === " + time.Now().String())
	}
}

// Result
// @Description:
// @param ctx
// @param code
// @param msg
// @param data
func Result(ctx iris.Context, code int, msg interface{}, data interface{}) {
	res := iris.Map{
		"code": code,
		"msg":  msg,
	}
	if msg == "请先登陆!" {
		res["code"] = 403
	}
	if code == 500 {
		log.Error("failed by ["+ctx.GetHeader("X-Real-Ip")+"]: ", msg)
	} else if code == 200 {
		res["data"] = data
	}
	_, err := ctx.JSON(res)
	if err != nil {
		return
	}
}


Ex - ploooosion!