简介
随着互联网行业的发展,一个非常欣喜的现象就是,越来越多的用户开始愿意为优质内容付费,这就逐渐去除了创业公司只能靠广告盈利的魔咒,可以想见,越来越多的公司会在项目中加入支付系统。那么对于程序员来讲,了解支付系统一些技术关键点就非常有必要了。
事务
我们平时对事务的了解止步于ACID
- 原子性,可以由代码实现,比如try catch
- 一致性,一致性的概念是基于特定于业务的,比如转账。
- 隔离性,意味着一个事务的效果不影响正在同时执行的其他事务。从事务的角度讲,它意味着事务按顺序执行而不是并行执行。在数据库系统中,通常通过使用锁机制来实现隔离性。为了使应用程序获得最佳性能,有时也会对某些事务放松隔离性的要求。
- 持久性,意味着一旦成功完成某个事务,对应用程序状态所做的更改将 “经得起失败”。这个就需要我们对事务进行记录,比如日志(undo/redo log等)或数据库中。
实现ACID特性需要多个参与者共同作用
- 应用程序
- 事务监视器,TPM协调RM的活动,以确保事务的“要么全有要么全无”属性。
- 资源管理器(RM,即我们要操作的对象,诸如数据库之类)。并且根据管理RM个数的不同,事务分为local Transaction和global Transaction(管理两个及以上RM)
XA 协议
JTA接口
JTA即Java Transaction API(JTA)。
java提供一个javax.transaction
包,定义了事务(包括分布式事务)的一些接口。这些接口定义的组件间的通信用到了XA协议(javax.transaction.xa
),X/Open XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。
JTA提供了以下三个接口
1.javax.transaction.UserTransaction
,是面向开发人员的接口,能够编程地控制事务处理。
2.javax.transaction.TransactionManager
,允许应用程序服务器来控制代表正在管理的应用程序的事务。
3.javax.transaction.xa.XAResource
,面向提供商的实现接口,是一个基于X/Open CAE Specification的行业标准XA接口的Java映射。提供商在提供访问自己资源的驱动时,必须实现这样的接口。
来一个经典的转账例子,假设我们要操作账户a和账户b,本地事务处理(在一个数据库中)流程:
Connection conn = null;
try{
//若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
conn.setAutoCommit(false);
// 将 A 账户中的金额减少 500
// 将 B 账户中的金额增加 500
conn.commit();
}catch(){
conn.rollback();
}
如果账户a和b不在一个数据库中,可以应用UserTransaction接口:
UserTransaction userTx = null;
Connection connA = null;
Connection connB = null;
try{
userTx.begin();
// 将 A 账户中的金额减少 500
// 将 B 账户中的金额增加 500
userTx.commit();
}catch(){
userTx.rollback();
}
此时,connection就得支持XAResource接口了。XAResource和Transaction如何关联呢?增强exec方法。Connection的exec方法除了处理数据之外,还包含和Transaction关联的操作。
public void execute(String sql) {
// 对于每次数据库操作都检查此会话所在的数据库连接是否已经被加入到事务中
associateWithTransactionIfNecessary();
try{
// 处理数据库操作的代码
} catch(SQLException sqle){
// 处理异常代码
} catch(Exception ne){
e.printStackTrace();
}
}
public void associateWithTransactionIfNecessary(){
// 获得 TransactionManager
TransactionManager tm = getTransactionManager();
Transaction tx = tm.getTransaction();
// 检查当前线程是否有分布式事务
if(tx != null){
// 在分布式事务内,通过 tx 对象判断当前数据连接是否已经被包含在事务中,
// 如果不是那么将此连接加入到事务中
Connection conn = this.getConnection();
//tx.hasCurrentResource, xaConn.getDataSource() 不是标准的 JTA
// 接口方法,是为了实现分布式事务而增加的自定义方法
if(!tx.hasCurrentResource(conn)){
XAConnection xaConn = (XAConnection)conn;
XADataSource xaSource = xaConn.getDataSource();
// 调用 Transaction 的接口方法,将数据库事务资源加入到当前事务中
tx.enListResource(xaSource.getXAResource(), 1);
}
}
}
TransactionManager本身并不承担实际的事务处理功能,它更多的是充当用户接口和实现接口之间的桥梁。此接口中的大部分事务方法与UserTransaction和 Transaction 相同。
Transaction代表了一个物理意义上的事务,在开发人员调用UserTransaction.begin()
方法时TransactionManager 会创建一个Transaction事务对象(标志着事务的开始)并把此对象通过ThreadLocal关联到当前线程。同样UserTransaction.commit()
会调用 TransactionManager.commit()
,方法将从当前线程下取出事务对象Transaction并把此对象所代表的事务提交.其它方法诸如“rollback(),getStatus()”也是如此。
我以前的文章有说过,ThreadLocal算是在线程的方法间传递参数的一种方式。此处,TransactionManager和Transaction的关系也非常值得学习,Transaction负责实现接口操作,至于这些接口方法什么时候被调用,包括它从线程上被“拿上”还是“拿下”,这个活儿由TransactionManager干。然后用户就可以不分线程的使用TransactionManager,也无需知道自己用的是哪个Transaction。
重新审视下段代码,可以翻译为:
// 创建一个Transaction,挂到当前线程上
UserTransaction userTx = null;
Connection connA = null;
Connection connB = null;
try{
userTx.begin();
// 将Connection对应的XAResource挂到当前线程对应的Transaction
connA.exec("xxx")
connB.exec("xxx")
// 找到Transaction关联的XAResource,让它们都提交
userTx.commit();
}catch(){
// 找到Transaction关联的XAResource,让它们都回滚
userTx.rollback();
}
通过上述内容,我们可以得到一个链,UserTransaction ==> TransactionManager ==> Transaction ==> 与其关联的XAResource。begin,commit,rollback操作就是这样一步步传导下来,其中Threadlocal扮演了关键角色。
TCC
《分布式协议与算法实战》:TCC 本质上是补偿事务,它的核心思想是针对每个操作都要注册一个与其对应的确认操作和补偿操作(也就是撤销操作)。 它是一个业务层面的协议,你也可以将 TCC 理解为编程模型,TCC 的 3 个操作是需要在业务代码中编码实现的,为了实现一致性,确认操作和补偿操作必须是等幂的,因为这 2 个操作可能会失败重试。
TCC 不依赖于数据库的事务,而是在业务中实现了分布式事务,这样能减轻数据库的压力,但对业务代码的入侵性也更强,实现的复杂度也更高。
https://github.com/changmingxie/tcc-transaction.git
是我司大牛的一个两阶段提交协议实现,背景可以看github上的介绍,以下简称TCC。
先从例子tcc-transaction-dubbo-sample看起
假设有两个服务,分别提供不同业务model数据库操作。要搁以前,直接增删改查数据库就好了。但现在,要先让我们的服务支持XAResource接口。与数据库事务作类比,要想支持分布式事务,数据库驱动提供商就要提供支持XA协议的Connection,那么在本示例服务中,服务间的通信使用的是dubbo,相关特性的支持不能内嵌到dubbo,而mysql数据库驱动也不支持,因此需要我们在代码中显式提供每个数据库操作对应的confirm和cancel方法。
对应例子里的CapitalTradeOrderService接口(此处以captital服务为例),它定义了void record(TransactionContext transactionContext,CapitalTradeOrderDto tradeOrderDto);
在CapitalTradeOrderService实现类CapitalTradeOrderServiceImpl里,除了record方法的实现外,附带了两个方法(通过这种方式,将自己变成支持xa协议的XAResource)
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
public void record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
System.out.println("capital try record called");
CapitalAccount transferFromAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
transferFromAccount.transferFrom(tradeOrderDto.getAmount());
capitalAccountRepository.save(transferFromAccount);
}
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
System.out.println("capital confirm record called");
CapitalAccount transferToAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());
transferToAccount.transferTo(tradeOrderDto.getAmount());
capitalAccountRepository.save(transferToAccount);
}
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
System.out.println("capital cancel record called");
CapitalAccount capitalAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
capitalAccount.cancelTransfer(tradeOrderDto.getAmount());
capitalAccountRepository.save(capitalAccount);
}
然后订单服务也有一个(负责操作captital和redpacket两个服务)
@Compensable(confirmMethod = "confirmMakePayment",cancelMethod = "cancelMakePayment")
public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
System.out.println("order try make payment called");
order.pay(redPacketPayAmount, capitalPayAmount);
orderRepository.updateOrder(order);
capitalTradeOrderService.record(null, buildCapitalTradeOrderDto(order));
redPacketTradeOrderService.record(null, buildRedPacketTradeOrderDto(order));
}
从中,我们可以建立几个假设
- tcc整个流程跟Compensable注解关系很大(compensable,英文意思是可补偿的)
- 所谓tcc,即try confirm cancel,那么record方法在这里应该就是try的作用
- 一个标有Compensable注解的方法,如果内部包括多个标有Compensable注解的方法,就会启用两阶段提交,Compensable方法是可以嵌套的。
可以知道,TCC框架只是对我们编写函数提出了要求,比如一个数据库操作要有try,confirm,cancel三个方法(到底是本地事务一个方法省事儿),但这个三个方法的调用时机不用我们关心,我们只需要用注解标识一下,其它的由框架负责。
分析源码
整个项目分为三个部分
- tcc-transaction-api,定义了三个基本的model类。
- tcc-transaction-core,实现两阶段提交的业务流程
- tcc-transaction-spring,完成将tcc-transaction-core spring化的一些封装
从Compensable注解看起,Compensable在tcc-transaction-core中,org.mengyun.tcctransaction
包下有几个model,提供了这个项目的“数据结构”,通过分析这个几个model,发现其传导流程是TransactionManager(将transaction从线程上拿上拿下) ==》Transaction ==》 participant(应该是JTA规范的XAResource) ==》 Terminator ==》 两个InvocationContext(分别是confirm和cancel(类似于数据库中的commit和rollback))
tcc-transaction-core还应用了Aop的一些理念(使用aspectj,跟spring aop的实现还不太一样),从两个方向上对代码进行增强:
-
整个操作开始和结束,实现“Transaction的创建和commit,rollback”的透明化。参见CompensableTransactionInterceptor
private void rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable { transactionConfigurator.getTransactionManager().begin(); try { //这是留给用户实现的部分 pjp.proceed(); } catch (Throwable tryingException) { logger.error("compensable transaction trying failed.", tryingException); try { transactionConfigurator.getTransactionManager().rollback(); } catch (Throwable rollbackException) { logger.error("compensable transaction rollback failed.", rollbackException); throw rollbackException; } throw tryingException; } transactionConfigurator.getTransactionManager().commit(); }
-
数据操作的开始和结束,实现“创建Participant,并将participant关联到线程绑定的Transaction”的透明化。参见ResourceCoordinatorInterceptor
private Participant generateAndEnlistRootParticipant(ProceedingJoinPoint pjp) { // 获取拦截的方法信息 MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); Compensable compensable = getCompensable(pjp); String confirmMethodName = compensable.confirmMethod(); String cancelMethodName = compensable.cancelMethod(); // 获取本线程绑定的transaction Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction(); TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); // 根据拦截的方法创建participant Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, method.getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, method.getParameterTypes(), pjp.getArgs()); Participant participant = new Participant( xid, new Terminator(confirmInvocation, cancelInvocation)); // 将participant关联到transaction transaction.enlistParticipant(participant); // 更新transaction存储 TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository(); transactionRepository.update(transaction); return participant; }
除此之外,TransactionRepository,提供Transaction的存储,可以选择不同的存储介质。
TransactionRecoveryJob和TransactionRecovery,周期性的处理未完成的Transaction,根据以重试次数决定放弃或重试。