前言 接着上一章节,我们这次阅读创建事务部分的源码,创建事务是在TransactionAspectSupport类的createTransactionIfNecessary方法中执行的。 打开该方法的源码,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected TransactionInfo createTransactionIfNecessary ( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { if (txAttr != null && txAttr.getName() == null ) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName () { return joinpointIdentification; } }; } TransactionStatus status = null ; if (txAttr != null ) { if (tm != null ) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured" ); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
第一步是为事务属性设置名称,可以看到连接点标记就是被用作设置属性名称的。
第二步是获取事务。
第三步是根据给定的事务属性和事务状态准备事务信息。
我们接下来研究后面两步。
获取事务 获取事务部分是在AbstractPlatformTransactionManager类的getTransaction方法中,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public final TransactionStatus getTransaction (TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null ) { definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout" , definition.getTimeout()); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'" ); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null ); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null , suspendedResources); throw ex; } catch (Error err) { resume(null , suspendedResources); throw err; } } else { boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null , true , newSynchronization, debugEnabled, null ); } }
整理一下整个过程,如下:
获取事务对象
savepointAllowed设为nestedTransactionAllowed
connectionHolder设置为事务同步管理器中绑定的connectionHolder,对于第一次创建的事务应为null
若没有设置事务属性,设置默认事务定义,默认事务定义属性如下:
传播行为REQUIRED
隔离级别DEFAULT
超时时长-1
只读false
如果事务是在其他中调用的,则处理已经存在的事务
否则对于不同的事务传播行为执行不同的操作
由于讲解所有的传播行为不太现实,所以此处我们挑选默认的REQUIRED来阅读
事务不是在其他事务中调用 我们先来看事务不在其他事务中调用的情况,即该事务是当前线程的第一个事务。
摘出其源码部分,如下,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 SuspendedResourcesHolder suspendedResources = suspend(null ); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null , suspendedResources); throw ex; } catch (Error err) { resume(null , suspendedResources); throw err; }
在这段代码中其实最好奇两处,一处是开始事务,另一处是初始化同步。
开始事务 先来看开始事务,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 protected void doBegin (Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null ; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this .dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction" ); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true ); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true ); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true ); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit" ); } con.setAutoCommit(false ); } txObject.getConnectionHolder().setTransactionActive(true ); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { DataSourceUtils.releaseConnection(con, this .dataSource); throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction" , ex); } }
整理一下整个开始事务,过程如下:
获取数据库连接并持有
保存原事务的隔离级别及自动提交状态,设置新的隔离级别以及关闭自动提交
激活当前事务
设置超时时长
将连接资源绑定到线程上
初始化同步 1 2 3 4 5 6 7 8 9 10 11 protected void prepareSynchronization (DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ? definition.getIsolationLevel() : null ); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
我们可以看到此处使用了TransactionSynchronizationManager设置了几个属性:
actualTransactionActive:设置激活的是否是一个真实的事务,我们可以在前面的获取事务部分看到else中的传播行为中事务状态中事务对象设置为空,也就是说那并没有创建一个真实的事务,即创建了一个空事务
currentTransactionIsolationLevel:当前事务的隔离级别
currentTransactionReadOnly:当前事务是否只读
currentTransactionName:当前事务的名称
最后还做了一个激活同步的操作。
那么我们此处又要好奇了这个事务同步管理器(TransactionSynchronizationManager)到底是一个怎样的东西,见spring事务同步管理器 。
那么看到这里,那我们来不妨来猜测一下事务的挂起会有哪些操作?
可以大胆地做出如下的猜测:
将事务同步管理器中的属性储存下来
将原事务存储下来
挂起事务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 protected final SuspendedResourcesHolder suspend (Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null ; if (transaction != null ) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null ); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false ); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null ); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false ); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null ) { Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { return null ; } }
如果事务同步管理器已经激活了同步的情况下:
第一步是挂起同步,
1 2 3 4 5 6 7 8 9 private List<TransactionSynchronization> doSuspendSynchronization () { List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations(); for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.suspend(); } TransactionSynchronizationManager.clearSynchronization(); return suspendedSynchronizations; }
我们可以知道这段代码做了如下几件事:
获取同步管理器中的所有同步并保存到suspendedSynchronizations中
将所有同步全部挂起
清空同步管理器中所有同步
第二步是挂起事务,
1 2 3 4 5 6 7 protected Object doSuspend (Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null ); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(this .dataSource); return conHolder; }
这个过程就是将connectionHolder与事务以及线程解绑。
第三步是保存并重置事务同步管理器中的各项属性。
第四步也是最后一步是将挂起的同步、前一个事务对象以及事务同步管理器中各项属性包装起来返回
对于事务同步管理器没有激活同步且仍然存在事务的情况,那么就只有挂起事务并返回挂起资源这两步。
总结一下,可以发现整个挂起事务的流程与我们之前猜测的并没有太大的出入。
事务在其他事务中调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 private TransactionStatus handleExistingTransaction ( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'" ); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction" ); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null , false , newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]" ); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'" ); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]" ); } if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false , false , debugEnabled, null ); status.createAndHoldSavepoint(); return status; } else { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, null ); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } if (debugEnabled) { logger.debug("Participating in existing transaction" ); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)" )); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is" ); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false , newSynchronization, debugEnabled, null ); }
如果大致浏览一下,我们可以发现这部分与上一段中的处理没有太大不同,大致流程都是:
如果我们仔细看一下,就可以发现最重要的不同其实是事务状态(TransactionStatus)的各项属性的不同。
我们取三个典型的来做比较,
1 2 3 4 5 6 7 8 9 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null , false , newSynchronization, debugEnabled, suspendedResources); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } }
1 2 3 4 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false , newSynchronization, debugEnabled, null );
因为NOT_SUPPORTED不支持事务,所以可以看到其中第二个参数为null,表明当前不使用事务。
对于第三个参数,true表示新开的一个事务,false反之。可以看到只有REQUIRES_NEW设置为true。
对于第四个参数,表示是否是新的同步,其中NESTED中使用的是false。而NOT_SUPPORTED也与其他的不同,因为NOT_SUPPORTED是使用非事务方式(也可以说是空事务)运行,所以只有SYNCHRONIZATION_ALWAYS的状态下才会使用同步。
在此顺便讲一下三种事务同步属性:
SYNCHRONIZATION_ALWAYS,表示总是使用同步
SYNCHRONIZATION_ON_ACTUAL_TRANSACTION,表示只有对真实事务才会使用同步,也就是说对于非事务方式不会使用同步
SYNCHRONIZATION_NEVER,表示不使用同步
事务的传播行为 可以看到整个事务创建过程中主要是围绕着传播行为来进行不同的操作的。
前面的代码部分也说了其实我们最终都是通过TransactionStatus中的各项属性以及是否进行了挂起事务、开始事务等操作来划分不同的传播行为的。
那我们根据上面的源码对传播行为做一个总结,如下表:
传播行为
不在其他事务中
在其他事务中
REQUIRED
新起一个事务
加入已有事务
SUPPORTS
以非事务方式运行
加入已有事务
MANDATORY
Error
加入已有事务
REQUIRES_NEW
新起一个事务
挂起原事务,新起一个事务
NOT_SUPPORTED
以非事务方式运行
挂起原事务,自己以非事务方式运行
NEVER
以非事务方式运行
Error
NESTED
新起一个事务
运行在嵌套的事务中