目录
  1. 前言
  2. 获取事务
  3. 事务不是在其他事务中调用
    1. 开始事务
    2. 初始化同步
    3. 挂起事务
  4. 事务在其他事务中调用
  5. 事务的传播行为
spring事务(3) spring事务创建部分源码解析

前言

接着上一章节,我们这次阅读创建事务部分的源码,创建事务是在TransactionAspectSupport类的createTransactionIfNecessary方法中执行的。打开该方法的源码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

第一步是为事务属性设置名称,可以看到连接点标记就是被用作设置属性名称的。

第二步是获取事务。

第三步是根据给定的事务属性和事务状态准备事务信息。

我们接下来研究后面两步。

获取事务

获取事务部分是在AbstractPlatformTransactionManager类的getTransaction方法中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 获取事务对象,过程如下:
// 创建一个新的DataSourceTransactionObject
// 设置其savepointAllowed属性
// 从事务同步管理器中获取其connectionHolder
Object transaction = doGetTransaction();

boolean debugEnabled = logger.isDebugEnabled();

// 如果事务属性还没有设置,则使用默认的事务定义。
// 默认的事务属性为传播行为REQUIRED、隔离级别DEFAULT、超时时长-1、只读false
// 我们可以在前面的部分知道传入definition的参数为txAttr
// 其中TransactionAttribute是TransactionDefinition的子类
if (definition == null) {
definition = new DefaultTransactionDefinition();
}

// 如果该事务是在其他事务中调用的
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}

// 如果该事物不在其他事务中调用

// 检查timeout设置
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

// 对于传播属性为MANDATORY
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 对于传播属性为REQUIRED或REQUIRES_NEW或NESTED
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
// 对于传播行为为其他的
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

整理一下整个过程,如下:

  • 获取事务对象
    • savepointAllowed设为nestedTransactionAllowed
    • connectionHolder设置为事务同步管理器中绑定的connectionHolder,对于第一次创建的事务应为null
  • 若没有设置事务属性,设置默认事务定义,默认事务定义属性如下:
    • 传播行为REQUIRED
    • 隔离级别DEFAULT
    • 超时时长-1
    • 只读false
  • 如果事务是在其他中调用的,则处理已经存在的事务
  • 否则对于不同的事务传播行为执行不同的操作

由于讲解所有的传播行为不太现实,所以此处我们挑选默认的REQUIRED来阅读

事务不是在其他事务中调用

我们先来看事务不在其他事务中调用的情况,即该事务是当前线程的第一个事务。

摘出其源码部分,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 从上面一段代码中摘出

// 挂起指定事务,此处由于是第一个事务,所以参数设的是null
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 判断是否使用一个新的事务同步
// 事务同步有三个属性SYNCHRONIZATION_ALWAYS,SYNCHRONIZATION_ON_ACTUAL_TRANSACTION和SYNCHRONIZATION_NEVER
// 默认使用SYNCHRONIZATION_ALWAYS
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

// 创建新的事务状态,属性如下:
// 事务定义: definition
// 事务对象: transaction
// 是否创建新的事务: true
// 是否创建新的同步: newSynchronization
// 挂起资源: suspendedResources
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

// 开始事务
doBegin(transaction, definition);

// 初始化同步
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
// 根据挂起资源复原
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}

在这段代码中其实最好奇两处,一处是开始事务,另一处是初始化同步。

开始事务

先来看开始事务,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
// 从数据库中获取连接并持有
// 感觉下面两个条件几乎一直都成立,总觉得不知道为什么需要有这样一个判断条件在
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

// 设置当前持有的连接已经和事务同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

// 设置连接的隔离级别为新事务的隔离级别,并保存连接原来的隔离级别,用于以后恢复
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// 设置连接的自动提交为false并保存连接原来的自动提交状态
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

// 设置当前事务为激活状态
txObject.getConnectionHolder().setTransactionActive(true);

// 设置超时设置
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 将连接资源绑定到线程,与之前的setSynchronizedWithTransaction(true)相关联
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}

// 发生异常,释放连接
catch (Throwable ex) {
DataSourceUtils.releaseConnection(con, this.dataSource);
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

整理一下整个开始事务,过程如下:

  • 获取数据库连接并持有
  • 保存原事务的隔离级别及自动提交状态,设置新的隔离级别以及关闭自动提交
  • 激活当前事务
  • 设置超时时长
  • 将连接资源绑定到线程上

初始化同步

1
2
3
4
5
6
7
8
9
10
11
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}

我们可以看到此处使用了TransactionSynchronizationManager设置了几个属性:

  • actualTransactionActive:设置激活的是否是一个真实的事务,我们可以在前面的获取事务部分看到else中的传播行为中事务状态中事务对象设置为空,也就是说那并没有创建一个真实的事务,即创建了一个空事务
  • currentTransactionIsolationLevel:当前事务的隔离级别
  • currentTransactionReadOnly:当前事务是否只读
  • currentTransactionName:当前事务的名称

最后还做了一个激活同步的操作。

那么我们此处又要好奇了这个事务同步管理器(TransactionSynchronizationManager)到底是一个怎样的东西,见spring事务同步管理器

那么看到这里,那我们来不妨来猜测一下事务的挂起会有哪些操作?

可以大胆地做出如下的猜测:

  • 将事务同步管理器中的属性储存下来
  • 将原事务存储下来

挂起事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 挂起同步
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}

如果事务同步管理器已经激活了同步的情况下:

第一步是挂起同步,

1
2
3
4
5
6
7
8
9
private List<TransactionSynchronization> doSuspendSynchronization() {
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}

我们可以知道这段代码做了如下几件事:

  • 获取同步管理器中的所有同步并保存到suspendedSynchronizations中
  • 将所有同步全部挂起
  • 清空同步管理器中所有同步

第二步是挂起事务,

1
2
3
4
5
6
7
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.unbindResource(this.dataSource);
return conHolder;
}

这个过程就是将connectionHolder与事务以及线程解绑。

第三步是保存并重置事务同步管理器中的各项属性。

第四步也是最后一步是将挂起的同步、前一个事务对象以及事务同步管理器中各项属性包装起来返回

对于事务同步管理器没有激活同步且仍然存在事务的情况,那么就只有挂起事务并返回挂起资源这两步。

总结一下,可以发现整个挂起事务的流程与我们之前猜测的并没有太大的出入。

事务在其他事务中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

如果大致浏览一下,我们可以发现这部分与上一段中的处理没有太大不同,大致流程都是:

  • 挂起前一个事务
  • 开始当前事务
  • 初始化同步

如果我们仔细看一下,就可以发现最重要的不同其实是事务状态(TransactionStatus)的各项属性的不同。

我们取三个典型的来做比较,

1
2
3
4
5
6
7
8
9
// NOT_SUPPORTED
// 此段代码中去除了log部分

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// REQUIRES_NEW
// 此段代码中去除了log部分

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
1
2
3
4
// REQUIRED

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);

因为NOT_SUPPORTED不支持事务,所以可以看到其中第二个参数为null,表明当前不使用事务。

对于第三个参数,true表示新开的一个事务,false反之。可以看到只有REQUIRES_NEW设置为true。

对于第四个参数,表示是否是新的同步,其中NESTED中使用的是false。而NOT_SUPPORTED也与其他的不同,因为NOT_SUPPORTED是使用非事务方式(也可以说是空事务)运行,所以只有SYNCHRONIZATION_ALWAYS的状态下才会使用同步。

在此顺便讲一下三种事务同步属性:

  • SYNCHRONIZATION_ALWAYS,表示总是使用同步
  • SYNCHRONIZATION_ON_ACTUAL_TRANSACTION,表示只有对真实事务才会使用同步,也就是说对于非事务方式不会使用同步
  • SYNCHRONIZATION_NEVER,表示不使用同步

事务的传播行为

可以看到整个事务创建过程中主要是围绕着传播行为来进行不同的操作的。

前面的代码部分也说了其实我们最终都是通过TransactionStatus中的各项属性以及是否进行了挂起事务、开始事务等操作来划分不同的传播行为的。

那我们根据上面的源码对传播行为做一个总结,如下表:

传播行为 不在其他事务中 在其他事务中
REQUIRED 新起一个事务 加入已有事务
SUPPORTS 以非事务方式运行 加入已有事务
MANDATORY Error 加入已有事务
REQUIRES_NEW 新起一个事务 挂起原事务,新起一个事务
NOT_SUPPORTED 以非事务方式运行 挂起原事务,自己以非事务方式运行
NEVER 以非事务方式运行 Error
NESTED 新起一个事务 运行在嵌套的事务中
文章作者: 谷河
文章链接: https://www.lyytaw.com/spring/spring_tx_create_transaction/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 谷河|BLOG