前言
之前有在b站找过一些关于多线程的视频看,这些视频大多以模拟高并发为主题来讲,比如说抢票之类的
这就更像是一种API压测了
使得我早期一度以为多线程的使用场景非常有限
而实际则是SpringBoot把活给干完了 ,比如最常见的请求,每个请求都会对应一个线程
异步(多线程)&事务
通过手动事务实现
1️⃣ 主线程
public String rollbackTest() {
PlatformTransactionManager transactionManager = SpringUtil.getBean(DataSourceTransactionManager.class);
// 用于阻塞子线程,等待主线程判断是否需要回滚
CountDownLatch latch = new CountDownLatch(1);
// 用于阻塞主线程,等待所有子线程执行完毕
CountDownLatch mainLatch = new CountDownLatch(2);
// 回滚标识
AtomicBoolean isRollback = new AtomicBoolean(false);
try {
// 两个异步函数
asyncService.rollbackTestOne(transactionManager, latch, mainLatch, isRollback);
asyncService.rollbackTestTwo(transactionManager, latch, mainLatch, isRollback);
// 阻塞主线程
mainLatch.await();
// commit
latch.countDown();
} catch (Exception e) {
System.out.println(e);
// rollback
isRollback.set(true);
latch.countDown();
}
return "233333";
}
2️⃣ rollbackTestOne
@Async
public void rollbackTestOne(PlatformTransactionManager transactionManager, CountDownLatch latch, CountDownLatch mainLatch, AtomicBoolean isRollback) {
DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus transStatus = transactionManager.getTransaction(defaultTransactionDefinition);
log.info("=============== sql one start at " + System.currentTimeMillis());
ThreadUtil.sleep(2000);
TbGuildMessageC entity = mapper.selectById(4053);
entity.setId(null);
mapper.insert(entity);
log.info("=============== sql one end at " + System.currentTimeMillis());
try {
mainLatch.countDown();
log.warn("阻塞中...");
latch.await();
if (isRollback.get()) transactionManager.rollback(transStatus);
else transactionManager.commit(transStatus);
} catch (Exception e) {
log.error(e.getMessage());
}
}
3️⃣ rollbackTestTwo
@Async
public void rollbackTestOne(PlatformTransactionManager transactionManager, CountDownLatch latch, CountDownLatch mainLatch, AtomicBoolean isRollback) {
DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus transStatus = transactionManager.getTransaction(defaultTransactionDefinition);
log.info("=============== sql two start at " + System.currentTimeMillis());
ThreadUtil.sleep(2000);
TbGuildMessageC entity = mapper.selectById(4053);
// entity.setId(null);
mapper.insert(entity);
log.info("=============== sql two end at " + System.currentTimeMillis());
try {
mainLatch.countDown();
log.warn("阻塞中...");
latch.await();
if (isRollback.get()) transactionManager.rollback(transStatus);
else transactionManager.commit(transStatus);
} catch (Exception e) {
log.error(e.getMessage());
}
}
这里会有异常产生,即主键冲突
Another
原本我想通过 List<TransactionStatus> 来批量回滚/提交事务(在主线程中完成,子线程调用 transactionManage.getTransaction 后对该集合进行赋值
显然这样做是不行的,会抛出异常
No value for key [HikariDataSource (HikariPool-1)] bound to thread
而这个异常则来自这里
Thread t = Thread.currentThread();
不难发现这个报错是因为线程不一致导致的
也可以观察事务创建时对 ThreadLocalMap 的操作
其中
Object key & Object actualKey 均为 HikariDataSource
Object value 为 ConnectionHolder
最后以 HikariDataSource 作为 key,ConnectionHolder 作为 value 保存在了当前线程的 ThreadLocalMap 中
后续进行事务提交/回滚的时候主线程拿不到保存在子线程 ThreadLocalMap 中的数据库连接信息,所以就抛出了上文中的异常
顺便再mark一篇关于事务源码的文章 http://www.itsoku.com/course/12/129
Go - goroutine
算是一个入门案例,之前并没有去了解过(因为用不上
- go fun() - 启动一个协程
- chan - 协程间的通信
package main
import (
"fmt"
"github.com/kataras/iris/v12"
"github.com/mattn/go-colorable"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
"time"
)
var ch = make(chan string)
func main() {
// log样式配置
log.SetFormatter(&log.TextFormatter{ForceColors: true})
log.SetOutput(colorable.NewColorableStdout())
// 读取消息
go read()
app := iris.New()
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowCredentials: true,
Debug: false,
})
app.WrapRouter(c.ServeHTTP)
app.Get("/send", Send)
err := app.Listen(":7072")
if err != nil {
return
}
}
func Send(ctx iris.Context) {
go save(ctx.URLParam("str"))
Result(ctx, 200, "success", "")
}
//wg *sync.WaitGroup
func save(str string) {
ch <- str
}
func read() {
for v := range ch {
fmt.Println(v + " Time === " + time.Now().String())
}
}
// Result
// @Description:
// @param ctx
// @param code
// @param msg
// @param data
func Result(ctx iris.Context, code int, msg interface{}, data interface{}) {
res := iris.Map{
"code": code,
"msg": msg,
}
if msg == "请先登陆!" {
res["code"] = 403
}
if code == 500 {
log.Error("failed by ["+ctx.GetHeader("X-Real-Ip")+"]: ", msg)
} else if code == 200 {
res["data"] = data
}
_, err := ctx.JSON(res)
if err != nil {
return
}
}
Comments | 1 条评论
博主 903783278
代码是不是有问题呀,rollBackTestTwo中的mapper.insert(entity);抛出异常,后面还能执行吗