在线程环境中,分布式事务如何处理与同一数据库的多个连接?

2024-01-17

我正在尝试确定分布式事务中多个数据库连接的行为。

我有一个长时间运行的进程,它会产生一系列线程,然后每个线程负责管理其数据库连接等。所有这些都在事务范围内运行,并且每个线程都通过DependentTransaction object.

当我并行处理这个过程时,我遇到了一些问题,即似乎存在某种阻止查询在事务上同时执行的块。

我想知道事务协调器如何处理从多个连接到同一数据库的查询,以及是否建议跨线程传递连接对象?

我读到 MS SQL 只允许每个事务有一个连接,但我显然能够在同一事务中创建和初始化到同一数据库的多个连接。在打开连接时,如果没有出现“事务上下文正在被另一个会话使用”异常,我根本无法并行执行线程。结果是连接必须等待执行而不是同时运行,最终代码运行完成,但由于此锁定问题,线程化应用程序没有任何净收益。

代码看起来像这样。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

EDIT

我的测试最终表明这是不可能完成的。即使有多个连接或使用相同的连接,事务中的所有请求或问题也会按顺序处理。

也许他们将来会改变这种行为。


首先,您必须将您在这里或那里读到的有关 SQL Server 事务的内容分为两种不同的情况:本地和分布式。

本地SQL事务:

  • SQL Server 只允许在每个本地事务上执行一个请求。
  • 默认情况下,只有一个会话可以注册本地事务。使用 sp_getbindtoken 和 sp_bindsession 可以在本地事务中注册多个会话。会话仍然仅限于在任何时间只有一个执行请求。
  • 通过多个活动结果集 (MARS),一个会话可以执行多个请求。所有请求都必须注册到同一个本地事务中。

分布式事务:

  • 多个会话可以将其本地事务注册到单个分布式事务中。
  • 每个会话仍然注册本地事务,受到上述本地事务的所有限制
  • 分布式事务中注册的本地事务受到分布式事务协调的两阶段提交
  • 注册分布式事务的实例上的所有本地事务仍然独立的本地事务,主要意味着它们具有冲突的锁命名空间。

因此,当客户端创建 .Net TransactionScope 并在此事务范围下在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的本地事务。一个简单的例子:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

创建一个虚拟测试表:

create table test (id int not null identity(1,1) primary key, a varchar(100));

并运行我的示例中的代码。您将看到两个请求并行执行,每个请求在表中插入 100k 行,然后在事务范围完成时提交两个请求。因此,您看到的问题与 SQL Server 或 TransactionScope 无关,它们可以轻松处理您描述的场景。此外,代码非常简单和直接,不需要创建依赖事务、进行克隆或提升事务。

Updated

使用显式线程和相关事务:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在线程环境中,分布式事务如何处理与同一数据库的多个连接? 的相关文章

  • 在 std::thread 创建的线程中调用 pthread_sigmask 是一个好习惯吗?

    1 我是 std thread 的新手 我想知道调用是否是一个好的做法pthread sigmask 阻止某些信号特别的线程创建者std thread 我不希望新线程接收SIGTERM SIGHUP等信号 因为主进程已经安装了这些信号的处理
  • T-SQL:用最新的非空值替换 NULL 的最佳方法?

    假设我有这张表 id value 1 5 2 4 3 1 4 NULL 5 NULL 6 14 7 NULL 8 0 9 3 10 NULL 我想编写一个查询来替换任何NULL值与表中最后一个不为空的值在那一栏里 我想要这个结果 id va
  • 多线程环境下如何更好的使用ExecutorService?

    我需要创建一个库 其中包含同步和异步方法 executeSynchronous 等待直到有结果 返回结果 executeAsynchronous 立即返回一个 Future 如果需要 可以在其他事情完成后进行处理 我的图书馆的核心逻辑 客户
  • SQL 2008全文索引填充延迟

    我的经理说 在基础表数据更改后 可能需要一段时间才能更新全文搜索索引 例如 如果我有一张桌子Products有一个柱子Description我更新了该描述 然后我可能需要一些时间才能搜索该新描述 真的吗 这需要多长时间 SQL 2008 对
  • 检查两个“select”是否相等

    有没有办法检查两个 非平凡的 选择是否等效 最初我希望两个选择之间有形式上的等价 但是答案在证明 sql 查询等价性 https stackoverflow com questions 56895 proving sql query equ
  • 如何在其他核心上运行每个线程?

    我有一个 udp 服务器接收数据并计算它 每个角色我都有两个线程 我的CPU是8个多核 我以不同的速度发送数据 但最多我只使用了 cpu 两核 50 的 14 如果我发送更多的数据值 我的缓冲区将填满并且不会使用更多的CPU 为什么每个核心
  • 可升级读锁的优点?

    我想知道使用可升级读锁与执行这些步骤相比有什么优势 获取读锁 检查条件以查看是否需要进行写锁定 释放读锁 采取写锁定 执行更新 释放写锁 与获取可升级读锁相比 执行上述步骤的一个明显缺点是 步骤 3 和步骤 4 之间存在一个时间窗口 其中另
  • 使用 AJAX 或多线程加速页面加载

    我的页面有 5 个部分 每个部分大约需要 1 秒来渲染 Page Load RenderSection1 1 sec RenderSection2 1 sec RenderSection3 1 sec RenderSection4 1 se
  • 如何搜索例程的内容/(SP-触发函数)

    我需要在数据库内所有例程的例程主体 存储过程 函数 触发器 中搜索文本 我该怎么做 Thanks SELECT OBJECT NAME object id FROM sys sql modules WHERE definition LIKE
  • 理解 C++11 中的 std::atomic::compare_exchange_weak()

    bool compare exchange weak T expected T val compare exchange weak 是 C 11 中提供的比较交换原语之一 它是weak即使对象的值等于 它也会返回 falseexpected
  • H2 用户定义的聚合函数 ListAgg 不能在第一个参数上使用 DISTINCT 或 TRIM()

    所以我有一个 DB2 生产数据库 我需要在其中使用可用的函数 ListAgg 我希望使用 H2 的单元测试能够正确测试此功能 不幸的是H2不直接支持ListAgg 但是 我可以创建一个用户定义的聚合函数 import java sql Co
  • 如何通过 SQL 表关联 SQL 中的实体

    我是数据库设计的初学者 我需要为项目创建数据库 我可以用面向对象的术语解释我想要做什么 值得庆幸的是 数据库专家会很友善地向我解释如何在数据库方面处理这个问题 我想创建一个与位置实体 州 城市 有关系的用户 ID 名称 实体 所以在编程语言
  • SQL Server 连接其他表中不存在的位置

    Service Asset AssetService Id Name Id Name AssetId ServiceId
  • 如何处理来自单独线程的窗口消息?

    我希望启动一个单独的线程来处理窗口消息 通过阻塞 GetMessage 循环 但之后仍然在初始线程中创建窗口 在单独的线程中 一旦启动 我就会调用PeekMessage使用 PM NOREMOVE 确保消息队列存在 有必要吗 然后 Atta
  • 删除数据库中的行后如何重新排序ID

    我正在使用 C 来制作具有 sql 数据库的程序 在数据库中我有一个名为Workers 它有一个自动增量和主键ID column 当我删除一条记录时 ID 之间会出现间隙 删除记录后如何重新排序 ID UPDATE 我要做的就是找到记录后将
  • 日期语句之间的 JPQL SELECT [关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我想将此 SQL 语句转换为等效的 JPQL SELECT FROM events WHERE events date BETWE
  • MYSQL从每个类别中随机选择一条记录

    我有一个数据库Items表看起来像这样 id name category int 有几十万条记录 每个item可以是 7 种不同的之一categories 对应于categories table id category 我想要一个从每个类别
  • 在 DataView 的 RowFilter 中选择 DISTINCT

    我试图根据与另一个表的关系缩小 DataView 中的行范围 我使用的 RowFilter 如下 dv new DataView myDS myTable id IN SELECT DISTINCT parentID FROM myOthe
  • 如何获取自定义订单的结果? [关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 代替ASC or DESC 我希望我的查询结果采用特定的自定义顺序 例如 如果我想要的结果不是 A B C D 而是 P A L H 该怎么
  • SQL 更新 - 更新选定的行

    我正在使用 SQL Server 2008 我有一个名为MYTABLE有两列 ID STATUS 我想编写一个存储过程来返回其记录STATUS是 0 但是这个存储过程必须更新STATUS返回行数为 1 如何在单个查询中执行此选择和更新操作

随机推荐

  • tinymce鼠标贴不起作用

    我想在tinymce中启用鼠标粘贴 当我点击粘贴时 它显示错误 剪切 粘贴 复制在 Firefox 中被禁用 我在他们的论坛上搜索了这个 http www tinymce com forum viewtopic php id 20637 h
  • 在函数内分配内存后使用双指针

    我正在使用 C 中的双指针 想知道是否创建一个初始化表的函数 当我尝试使用 InitStringTable 分配的内存时 它会在返回 main 时崩溃 我相信一个简单的解决方法是使 strTable 成为全局的 然后我相信它可以 但我不想这
  • OpenCV-Python cv2.CV_CAP_PROP_POS_FRAMES 错误

    目前 我使用的是opencv 3 1 0 在执行以下代码时遇到以下错误 post frame cap get cv2 CV CAP PROP POS FRAMES 我收到以下错误消息 文件 videoOperation py 第 37 行
  • jQuery 模态和深度链接

    我目前有一个画廊 当您单击缩略图时 它会打开一个模式弹出窗口 我想做的是能够专门为模式生成一个唯一的链接 即 www mywebite com link1 它通过 ajax 加载其内容 如果有人要发送这个独特的模式链接并将其发送给某人 然后
  • NSDate 格式化程序

    不幸的是 我尝试将字符串转换为 NSDATE 但没有成功 2010 年 10 月 22 日星期五 11 26 45 美国东部时间 我知道格式化选项 http sree cc objective c nsdate format string
  • 与实体对象一起使用时,ResponseBuilder 不起作用

    我正在尝试使用responsebuilder 创建响应 当我在实体中传递字符串时 它工作正常 但是当我传递一些错误类时 它不起作用 这是代码 1 工作正常 Response status 400 entity test build 2 不工
  • 日期时间数据类型在soap php中不起作用

    这是我的代码 c new soapclient http www redbus in WS2 BookingService asmx wsdl array authentication gt array LoginID gt x Passw
  • jQuery Mobile:将数据从一个页面发送到另一页面

    我有一个问题 我需要将数据 ID 从列表发送到另一个页面 这是html代码 div div h1 Players App h1 div div ul ul div div
  • 如何以编程方式重置表单?

    我想在 JQuery 单击事件函数中重置表单 怎么做 最简单的是 form selector here 0 reset 但也请参阅 使用 jQuery 重置多阶段表单 https stackoverflow com questions 68
  • 如何根据日期是否大于今天的日期来呈现 JSF 组件?

    我从服务器端获取一个日期 以及如何在我的 xhtml 代码中比较它 这样如果它小于今天的日期 我将渲染面板 否则不会 你可以在你的bean中有一个方法 class MyBean public boolean isDateBigger dat
  • 将正则表达式模式从 Sub 传递到 Excel VBA 中的函数

    我试图将正则表达式模式传递给 Excel VBA 中的函数 但该模式似乎没有任何效果 我插入了 msgbox es 来查看字符串的样子 结果没问题 这是我正在使用的代码 Sub clean COP names Dim strSheet As
  • 一个 git 命令显示目录中所有文件的状态

    git ls files v o 显示未跟踪的文件 git ls files v 显示跟踪的文件 必须有一种更简单的方法来显示当前目录中所有文件的状态 如果有 259 个 可能是隐藏的 文件 那么我想查看 259 行 每行的状态如下 匹配头
  • Whoosh (Python) 在哪里物理存储索引内容?

    我开始研究内容索引的实现 并且正在查看 Whoosh https pypi python org pypi Whoosh https pypi python org pypi Whoosh 我很想知道 Whoosh 将其内容物理存储在哪里
  • 如何使用 DataAnnotation 验证下拉列表?

    我需要您的帮助 我在使用 AdataAnnotation 进行验证时遇到问题 我正在尝试使用它验证下拉列表 但它有一些问题 这是我的代码 查看侧面 using Html BeginForm addNewProject Activities
  • Oracle 19c Open_cursor 超出问题

    我们在 Oracle 10g 和 19c 中存在相同的存储过程 具有相同的数据集和设置 该过程执行大量数据获取和操作 当我们使用相同的数据集 假设 10000 条记录 执行时 它在 10g 中运行良好 时间更少 但在 19c 中需要很多时间
  • 我在哪里可以找到更新的实时汇率?

    如何将实时货币汇率链接到我的 iPhone 应用程序 首先 谁知道有哪些网站可以查询汇率 其次 如何将其链接到我的应用程序 我想做这个应用程序所做的事情 http the dream co uk currencee http the dre
  • Kendo 网格、ko 绑定和行索引访问

    我有一个 ko 视图模型 我使用 knockout kendo js 将其绑定到 KendoGrid 我使用 rowTemplate 因为我需要在某些列中使用一些自定义功能 图标 链接等 我需要根据行号执行一些自定义功能 当直接绑定 ko
  • EPERM:不允许操作 - IIS 上的 NPM Angular 7(后端为 .Net Core 2.1)

    大家好 我用 Visual Studio 2017 制作了一个 Angular 7 应用程序 所以我在 Windows 10 上安装了带有 IIS 的 AWS 机器 当我加载应用程序时 我收到此错误 AggregateException 发
  • 如何为 Web API 指定不同的 AADInstance?

    我正在致力于将 Web Api 与 azure China Active Directory 集成并部署到 azure China 环境 Azure 中国的端点与常规 Azure 环境完全不同 我想知道如何指定AADInstancehttp
  • 在线程环境中,分布式事务如何处理与同一数据库的多个连接?

    我正在尝试确定分布式事务中多个数据库连接的行为 我有一个长时间运行的进程 它会产生一系列线程 然后每个线程负责管理其数据库连接等 所有这些都在事务范围内运行 并且每个线程都通过DependentTransaction object 当我并行