Skip to content

声明式事务

事务传播模型

传播行为含义
PROPAGATION_REQUIRED支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
PROPAGATION_SUPPORTS支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY支持当前事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRED_NEW新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED表示如果当前已经存在一个事务,那么该方法将会在嵌套事务中运行。嵌套的事务可以独立于当前事务进行单独的提交或回滚。如果当前事务不存在,那么其行为和 PROPAGATION_REQUIRED一样。注意各厂商对这种传播行为的支持是有所差异的,可以参考资源管理器的文档来确认它们是够支持嵌套事务

Spring事务注解-@Transactional

  • 如果加在类上,则所有public方法都会开启事务
java
//支持类和方法
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
    @AliasFor("transactionManager")
    String value() default "";

    @AliasFor("value")
    String transactionManager() default "";

    Propagation propagation() default Propagation.REQUIRED;

    Isolation isolation() default Isolation.DEFAULT;

    int timeout() default -1;

    boolean readOnly() default false;

    Class<? extends Throwable>[] rollbackFor() default {};

    String[] rollbackForClassName() default {};

    Class<? extends Throwable>[] noRollbackFor() default {};

    String[] noRollbackForClassName() default {};
}

JDBC实现事务

  1. 获取数据库连接: 你首先需要一个到数据库的连接。这通常通过DriverManager类或DataSource来完成。
  2. 设置自动提交为false: JDBC连接默认是自动提交模式,这意味着每执行一次SQL语句,都会立即提交。为了控制事务,你需要将连接设置为非自动提交模式。
  3. 执行SQL语句: 在非自动提交模式下,你可以执行一个或多个SQL语句。
  4. 提交或回滚事务: 如果所有操作都成功,你可以调用Connection.commit()方法来提交事务。如果发生错误,你应该调用Connection.rollback()方法来回滚事务,撤销所有更改。
  5. 关闭资源: 最后,确保你关闭所有的Statement, ResultSetConnection对象。

核心类

PlatformTransactionManager

java
/**
 * 真正执行开启、提交、回滚事务的地方
 * 依赖动态代理实现,在原始Bean的基础上,扩展事务的逻辑
 *
 * @author yangjunwei
 * @date 2024/7/23
 */
public class DataSourceTransactionManager implements PlatformTransactionManager, InvocationHandler {

    /**
     * 使用ThreadLocal保存事务状态(包括事务传播行为)
     */
    static final ThreadLocal<TransactionStatus> TRANSACTION_STATUS = new ThreadLocal<>();

    final DataSource dataSource;

    public DataSourceTransactionManager(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * 基于动态代理实现事务方法
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //TODO 解析proxyBean的@Transactional

        TransactionStatus transactionStatus = TRANSACTION_STATUS.get();
        //当前事务
        if (transactionStatus == null) {
            try (Connection connection = dataSource.getConnection()) {
                boolean autoCommit = connection.getAutoCommit();
                if (autoCommit) {
                    //取消自动提交,开启本地事务
                    connection.setAutoCommit(false);
                }
                try {
                    //保存当前事务
                    TRANSACTION_STATUS.set(new TransactionStatus(connection));
                    //继续执行业务方法
                    Object invoke = method.invoke(proxy, args);

                    //提交事务
                    connection.commit();

                    //方法返回
                    return invoke;
                } catch (Exception e) {
                    //TODO 集合事务注解的rollBack
                    //回滚事务
                    TransactionException transactionException = new TransactionException(e);
                    try {
                        connection.rollback();
                    } catch (SQLException sqlException) {
                        //添加SQLException到TranscationException
                        transactionException.addSuppressed(sqlException);
                    }
                    throw transactionException;

                } finally {
                    TRANSACTION_STATUS.remove();
                    if(autoCommit){
                        connection.setAutoCommit(true);
                    }
                }
            }
        }else{
            //TODO 默认事务传播行为 REQUIRED
            //当前已有事务,加入当前事务执行
            return method.invoke(proxy, args);
        }
    }

}
java
    /**
     * 使用dataSource执行数据库操作
     *
     * @param action
     * @param <T>
     * @return
     */
    public <T> T execute(ConnectionCallback<T> action) {
        
        //获取当前事务
        Connection currentConnection = TransactionalUtils.getCurrentConnection();
        if (currentConnection != null) {
            try {
                //存在事务,加入当前事务
                return action.doInConnection(currentConnection);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }

        //无事务,则从DataSource获取新连接
        try (Connection connection = dataSource.getConnection()) {
            //提供Connection,供上层代码使用
            T t = action.doInConnection(connection);
            return t;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }