xaonei:《分布式事务系列教程-第四章-XA分布式事务解决方案》

《分布式事务系列教程-第四章-XA分布式事务解决方案》

一、 XA解决方案

XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器(TM)和资源管理器(RM)。其中资源管理器(RM)由数据库实现,比如Oracle、DB2这些数据库都实现了XA规范的接口,而TM作为全局的调度者,负责各个RM的提交和回滚。XA协议也分为2PC和3PC,本章讨论的是XA协议的2PC;

1.1 MySQL的XA实现

MySQL XA是基于DTP分布式事务处理模型标准实现的,支持多数据源的分布式事务。

命令如下:

  • 开启一个全局事务:
xa start xidxa start "001";
  • 将全局事务置于空闲状态(此状态时所有的RM资源已经锁定):
xa end xidxa start "001";
  • 准备阶段:
xa prepare xidxa prepare "001"
  • 提交阶段:
xa commit xidxa commit "001";
  • 不进入两段式直接提交:
xa commit xid one phase;xa commit "001" one phase;
XA执行流程:

创建一张测试表:

create table user(id int, username varchar(30));insert into user values(1,'zs');

案例:

mysql> xa start '001';Query OK, 0 rows affected (0.00 sec)mysql> update user set username='ls';Query OK, 1 row affected (0.00 sec)Rows matched: 1 Changed: 1 Warnings: 0mysql> xa end '001';Query OK, 0 rows affected (0.00 sec)mysql> xa prepare '001';Query OK, 0 rows affected (0.00 sec)mysql> xa commit '001';# 释放锁资源Query OK, 0 rows affected (0.00 sec)mysql>

Tips:在MySQL控制台中演示不出一个全局事务中执行多个分支事务,我们可以在下一小结使用Java客户端在一个全局事务操作多个分支事务。

1.2 Atomikos实现分布式事务

JTA(Java Transaction API): Java事务API(编程接口),是XA在Java上的一种实现。JTA底层采用的是2PC(两阶段提交协议)

在JTA中存在以下三种角色:

  • TransactionManager:事务管理器

  • XAResource:资源管理器。代表每一个数据源(RM)

  • XID:事务ID,每一个独立的数据源都会分配一个事务ID(前面的xa start xid)

JTA只是Java提供的一个分布式多数据源的一套编程接口,来帮助我们解决分布式事务,我们具体的实现采用Atomikos,Atomikos是一个为Java平台提供增值服务的并且开源类事务管理器

搭建工程

创建订单数据库、库存数据库:

create database orders;use orders;CREATE TABLE `t_orders` ( `id` varchar(30) NOT NULL, `count` int(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into `t_orders`(`id`,`count`) values ('1',101);create database store;use store;CREATE TABLE `t_store` ( `id` varchar(30) NOT NULL, `count` int(11) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;insert into `t_store`(`id`,`count`) values ('1',100);

pom.xml:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.nc</groupId> <artifactId>xa_transaction</artifactId> <version>0.0.1-SNAPSHOT</version> <name>xa_transaction</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.17</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>

DataSourceConfig:

package com.lscl.config;import com.atomikos.icatch.jta.UserTransactionManager;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.transaction.jta.JtaTransactionManager;import java.util.Properties;@Configurationpublic class DataSourceConfig { /** * t_orders数据源 * * @return */ @Bean(name = "ordersDS") @Qualifier("ordersDS") public AtomikosDataSourceBean ordersDS() { AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setUniqueResourceName("ordersDS"); atomikosDataSourceBean.setXaDataSourceClassName( "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"); Properties properties = new Properties(); properties.put("URL","jdbc:mysql://localhost:3306/orders"); properties.put("user", "root"); properties.put("password", "admin"); atomikosDataSourceBean.setXaProperties(properties); return atomikosDataSourceBean; } /** * t_store数据源 * * @return */ @Bean(name = "storeDS") @Qualifier("storeDS") public AtomikosDataSourceBean storeDS() { AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setUniqueResourceName("storeDS"); atomikosDataSourceBean.setXaDataSourceClassName( "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"); Properties properties = new Properties(); properties.put("URL", "jdbc:mysql://localhost:3306/store"); properties.put("user", "root"); properties.put("password", "admin"); atomikosDataSourceBean.setXaProperties(properties); return atomikosDataSourceBean; } /** * transaction manager * * @return */ @Bean public UserTransactionManager userTransactionManager() { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(true); return userTransactionManager; } /** * jta transactionManager * * @return */ @Bean public JtaTransactionManager transactionManager() { JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(); jtaTransactionManager.setTransactionManager(userTransactionManager()); return jtaTransactionManager; }}

Controller:

package com.lscl.controller;import com.lscl.service.OrdersService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class OrdersController { @Autowired private OrdersService ordersService; @RequestMapping("/addOrders/{flag}") public String add(@PathVariable Integer flag) throws Exception{ ordersService.add(flag);; return "ok"; }}

Service:

package com.lscl.service;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.stereotype.Service;import javax.transaction.Transactional;import java.sql.Connection;import java.sql.SQLException;import java.sql.Statement;@Servicepublic class OrdersService { @Autowired @Qualifier("ordersDS") private AtomikosDataSourceBean ordersDS; //RM1 @Autowired @Qualifier("storeDS") private AtomikosDataSourceBean storeDS; //RM2 @Transactional public void add(Integer flag) throws Exception { Connection ordersConn = null; Connection storeConn = null; try { ordersConn = ordersDS.getConnection(); storeConn = storeDS.getConnection(); // 订单+1 String ordersSQL = "update t_orders set count=count+1"; // 库存-1 String storeSQL = "update t_store set count=count-1"; // 执行订单+1 Statement ordersState = ordersConn.createStatement();// prepare 阶段 ordersState.execute(ordersSQL); if (flag == 500) { int i = 1 / 0; // 模拟异常 } // 执行库存-1 Statement storeState = storeConn.createStatement();// prepare 阶段 storeState.execute(storeSQL); // 代码没有问题准备发起全局提交 commit 阶段 } catch (SQLException e) { e.printStackTrace(); } finally { if (ordersConn != null) { ordersConn.close(); } if (storeConn != null) { storeConn.close(); } } }}

调用图解:

1.3 总结

1)在执行分支事务时,会将RM资源锁住,需要等到所有的RM响应,等到第二阶段执行完毕时(提交/回滚),RM的锁才会释放,在高并发场所不适用

2)XA方案依赖于本地数据库对XA协议的支持,如果本地数据库不支持XA协议那么第三方程序(Java)将操作不了。例如许多非关系型数据库并没有支持XA。

3)MySQL对XA方案支持的不太友好,MySQL的XA实现,没有记录prepare阶段日志。

相关推荐

相关文章