note:之前项目中一直使用的工具,最近正好有时间,看看具体实现吧。分析TCC-Transaction下面关于dubbo的sample,本文中不会按照模块分析TCC-Transaction的源码,而是按照例子中的流程来分析,GitHub源码:https://github.com/changmingxie/tcc-transaction/tree/master-1.2.x/tcc-transaction-tutorial-sample/tcc-transaction-dubbo-sample

1 流程

tcc-transaction-dubbo-sample流程:用户购买一款产品,然后使用账户余额和红包付款。用户下单为主事务,扣除账户余额和扣除红包是两个子事务,来看看TCC-Transaction中到底是怎么实现主从事务的try,confirm,cancel。
在这里插入图片描述
在这里插入图片描述

2 主事务处理

如果要使用TCC-Transaction,需要在try方法上增加@Compensable注解,在该注解中要声明confirmMethod和cancelMethod分别对应TCC事务中的confirm阶段和cancel阶段。下面是主事务的try方法上的注解:

1
2
@Compensable(confirmMethod = "confirmMakePayment",
cancelMethod = "cancelMakePayment", asyncConfirm = true)

TCC-Transaction中对于@Compensable注解是通过spring AOP来处理的,我在tcc-transaction-core模块中找到了处理@Compensable注解的相关代码。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor
compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(
CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor
= compensableTransactionInterceptor;
}
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp)
throws Throwable {
// 调用CompensableTransactionInterceptor类中的
// interceptCompensableMethod方法来处理注解
return
compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}

当代码中调用到带有@Compensable注解的代码时,都会调用CompensableTransactionInterceptor类中的interceptCompensableMethod方法,这个方法就是用来处理TCC事务的核心方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 通过上下文对象,获取待访问对象的方法名称【makePayment】
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
// 获取目标对象的Compensable注解
Compensable compensable = method.getAnnotation(Compensable.class);
// 获取目标对象的事务传播属性
Propagation propagation = compensable.propagation();
// 获取事务上下文
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();
// 判断当前是否有存在的事务
boolean isTransactionActive = transactionManager.isTransactionActive();
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
// 判断当前事务角色
// ROOT-> 主事务 Provider->事务参与者
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
switch (methodType) {
case ROOT:
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
return pjp.proceed();
}
}

interceptCompensableMethod方法主要做了以下几件事:

  1. 获取try方法(makePayment)的相关信息、@Compensable注解内容
  2. 获取事务的上下文
  3. 判断事务类型,是ROOT事务(主事务)、还是PROVIDER事务(子事务)
  4. 根据事务类型执行后续的方法,如主事务try,子事务try
  5. 根据try方法执行情况,执行confirm或cancel方法

@Compensable注解的定义中有一个TransactionContextEditor类,默认值是DefaultTransactionContextEditor类,这个类就是用来获取事务的上下文。具体的获取事务上下文的代码如下:

1
2
3
TransactionContext transactionContext =
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().
get(pjp.getTarget(), method, pjp.getArgs());

在主事务的@Compensable注解中没有声明TransactionContextEditor类,那肯定是使用默认的DefaultTransactionContextEditor类,DefaultTransactionContextEditor的get方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
return (TransactionContext) args[position];
}
return null;
}
public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
int position = -1;
for (int i = 0; i < parameterTypes.length; i++) {
if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
position = i;
break;
}
}
return position;
}

很明显主事务的makePayment方法中没有传递org.mengyun.tcctransaction.api.TransactionContext类型的参数,所以此时主事务TransactionContext transactionContext = null。transactionContext上下文中的内容如下。

1
2
3
4
5
6
7
8
9
10
public class TransactionContext implements Serializable {
private static final long serialVersionUID = -8199390103169700387L;
// 事务ID
private TransactionXid xid;
// 事务状态,try、confirm、cancel
private int status;
private Map<String, String> attachments = new ConcurrentHashMap<String, String>();
...
}

在TCC-Transaction中可以使用MySQL,Redis,Zookeeper对事务进行持久化,我是用的MySQL保存的,主事务详细信息对应MySQL中TCC数据库中的tcc_transaction_order表,子事务对应TCC库的tcc_transaction_red和tcc_transaction_cap表。Java中使用Transaction类对应表,每一个请求其实都会对应一个Transaction,也就是每个线程都会对应一个Transaction,最好的解决方案就是使用ThreadLocal,TCC-Transaction也是这么做的。Transaction类信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Transaction implements Serializable {
private static final long serialVersionUID = 7291423944314337931L;
// 主事务ID
private TransactionXid xid;
// 事务状态
private TransactionStatus status;
// 事务类型 root provider
private TransactionType transactionType;
// 重试次数
private volatile int retriedCount = 0;
// 创建时间
private Date createTime = new Date();
// 最近更新时间
private Date lastUpdateTime = new Date();
// 版本
private long version = 1;
// 事务参与者
private List<Participant> participants = new ArrayList<Participant>();
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
...
}

transactionContext和另外一个变量isTransactionActive一同来确认当前事务的类型,是ROOT还是PROVIDER类型。isTransactionActive就是判断当前线程是否有事务。

1
2
3
4
5
6
7
private static final ThreadLocal<Deque<Transaction>> CURRENT
= new ThreadLocal<Deque<Transaction>>();
public boolean isTransactionActive() {
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
}

由于上下文transactionContext是null,isTransactionActive是false,所以事务类型为ROOT。需要执行这个方法rootMethodProceed(pjp, asyncConfirm, asyncCancel),进入看一下吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
try {
// 开启一个全新的事务
/*
1、持久化事务
2、注册一个事务【Threadlocal】
*/
transaction = transactionManager.begin();
try {
// 执行try方法(主事务makePayment方法)
returnValue = pjp.proceed();
// 如果抛出异常
} catch (Throwable tryingException) {
if (!isDelayCancelException(tryingException)) {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
/*
1、修改数据库状态
2、执行rollback方法
3、如果执行成功,则删除事务资源数据
*/
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
/*
1、修改数据库状态
2、执行confirm方法
3、如果执行成功,则删除事务资源数据
*/
transactionManager.commit(asyncConfirm);
} finally {
// 清除队列中的事务
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}

rootMethodProceed方法中主要做了一下几件事:

  1. 开启一个事务,并持久化、保存到ThreadLocal
  2. 执行主事务的try方法(makePayment)
  3. 判断try方法执行过程中是否抛出异常:如果抛出异常,执行rollback;否则执行commit
  4. 清除ThreadLocal

在执行上面的第2步时,就进入了子事务的处理流程,这里暂时不做分析。

下面来看看commit方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void commit(boolean asyncCommit) {
// 从ThreadLocal中获取当前线程的Transaction
final Transaction transaction = getCurrentTransaction();
// 修改当前事务的状态为confirm
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新数据库
transactionRepository.update(transaction);
// 如果是异步
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
executorService.submit(new Runnable() {
@Override
public void run() {
// 执行事务
commitTransaction(transaction);
}
});
logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
} else { // 如果是同步
commitTransaction(transaction);
}
}

commit方法最终会通过反射机制执行主事务try方法的confirm方法。rollback方法与commit方法类似,最终会通过反射机制执行主事务的cancel方法,执行完成后,删除当前事务在数据库中的数据。这其中就有一个问题,如果执行confirm或者执行cancel方法时又发送了错误,这就会导致数据的不一致性。

在执行commit方法时会用到Transaction类中的participants属性,该属性是通过另外一个AOP注入的,在ResourceCoordinatorAspect和ResourceCoordinatorInterceptor类中实现。主要做了两件事:

  1. 注入Transaction中的participants
  2. 将transactionContext存到Dubbo隐式参数中

ResourceCoordinatorInterceptor类中的部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
// 1.注入Transaction中的participants
// 2.将transactionContext存到Dubbo隐式参数中
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
return pjp.proceed(pjp.getArgs());
}

3 子事务处理

子事务处理流程与主事务处理流程类似,最大的区别在于判断当前事务的类型后,如果是主事务会执行rootMethodProceed(pjp, asyncConfirm, asyncCancel),如果是子事务会执行providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel),如下:

1
2
3
4
5
6
7
8
9
switch (methodType) {
case ROOT:
// 主事务
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
// 子事务
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
return pjp.proceed();

在分析主事务流程的过程中,我们知道使用DefaultTransactionContextEditor的get方法获取上下文transactionContext,子事务不是这样的,下面来看一下子事务红包的@Compensable注解:

1
2
3
@Compensable(confirmMethod = "confirmRecord",
cancelMethod = "cancelRecord",
transactionContextEditor = DubboTransactionContextEditor.class)

在子事务的@Compensable注解中使用到DubboTransactionContextEditor的get方法去获取transactionContext,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DubboTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
// 使用Dubbo的隐式参数获取transactionContext
String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);
if (StringUtils.isNotEmpty(context)) {
return JSON.parseObject(context, TransactionContext.class);
}
return null;
}
...

在DubboTransactionContextEditor的get方法中,使用Dubbo的隐式参数获取transactionContext,那么transactionContext是在什么时候存到Dubbo的隐式参数中的呢?上面有介绍。

所以子事务的transactionContext不为null,且isTransactionActive为True,从而判断子事务的类型为PROVIDER,执行providerMethodProceed方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
// 如果事务状态为try
case TRYING:
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
// 如果事务状态为confirm
case CONFIRMING:
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
// 如果事务状态为cancel
case CANCELLING:
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
...
}

按照我的理解,providerMethodProceed方法中TransactionStatus的状态基本上都是TRYING,只有在做重试JOB时会是其他状态。

4 总结

在这里插入图片描述

在执行主事务的ResourceCoordinatorInterceptor时,会将主事务添加到participants中,然后执行子事务的ResourceCoordinatorInterceptor时,会把所有的子事务添加到participants中。所以到最后执行主事务CompensableTransactionInterceptor 中的confirm或者cancel方法,会把执行所有参与者的confirm和cancel。

在前面我有说一个问题:就是当执行confirm或者cancel出错,那么怎么办?

在TCC-Transaction中,主要分成以下几个模块:
在这里插入图片描述
其中有一个事务处理JOB,这是一个定时任务,定时去重试confirm或者cancel失败的事务。可以设置重试次数,如果多次重试都未成功,那可能就需要人工干预了。在重试时,会调用providerMethodProceed方法,此时TransactionStatus的状态就可能是CONFIRMING或者CANCELLING,这时需要实现confirm和cancel方法的幂等性。