交易范围类似功能

2023-12-27

我希望设置与事务范围非常相似的东西,它在服务上创建一个版本,并将在范围结束时删除/提交。在事务范围内运行的每个 SQL 语句都会在内部查看某些连接池/事务存储,以确定其是否在范围内并做出适当的反应。调用者不需要将事务传递给每个调用。我正在寻找这个功能。

这里有更多关于它的信息:https://blogs.msdn.microsoft.com/florinlazar/2005/04/19/transaction-current-and-ambient-transactions/ https://blogs.msdn.microsoft.com/florinlazar/2005/04/19/transaction-current-and-ambient-transactions/

这是基本的一次性类:

public sealed class VersionScope : IDisposable
{
    private readonly GeodatabaseVersion _version;
    private readonly VersionManager _versionManager;

    public VersionScope(Configuration config)
    {
        _versionManager = new VersionManager(config);
        _version = _versionManager.GenerateTempVersion();
        _versionManager.Create(_version);
        _versionManager.VerifyValidVersion(_version);
        _versionManager.ServiceReconcilePull();
        _versionManager.ReconcilePull(_version);
    }

    public void Dispose()
    {
        _versionManager.Delete(_version);
    }

    public void Complete()
    {
        _versionManager.ReconcilePush(_version);
    }
}

我希望迄今为止我编写的所有代码都能够没有任何版本概念。我只想包括一个简单的

Version = GetCurrentVersionWithinScope()

在代码的最低级别。

如果内存中同时运行多个实例,那么实现此类操作的最安全方法是什么,并且使用错误版本的风险很小。

我非常天真的方法是找到进程正在运行的内存块是否有唯一标识符。然后将当前工作版本存储到全局数组或并发字典中。然后,在需要当前版本的代码中,我使用其内存标识符块,并将其映射到创建的版本。

Edit:

使用示例:

using (var scope = new VersionScope(_config))
{
    AddFeature(); // This has no concept of scope passed to it, and could error out forcing a dispose() without a complete()
    scope.Complete();
}

最直接的方法是使用ThreadStatic or ThreadLocal将当前版本存储在线程本地存储中。这样多个线程就不会互相干扰。例如假设我们版本类:

public class Version {
    public Version(int number) {
        Number = number;
    }
    public int Number { get; }

    public override string ToString() {
        return "Version " + Number;
    }
}

然后执行VersionScope可以这样:

public sealed class VersionScope : IDisposable {
    private bool _isCompleted;
    private bool _isDisposed;
    // note ThreadStatic attribute
    [ThreadStatic] private static Version _currentVersion;
    public static Version CurrentVersion => _currentVersion;

    public VersionScope(int version) {
        _currentVersion = new Version(version);
    }

    public void Dispose() {
        if (_isCompleted || _isDisposed)
            return;
        var v = _currentVersion;
        if (v != null) {
            DeleteVersion(v);
        }
        _currentVersion = null;
        _isDisposed = true;
    }

    public void Complete() {
        if (_isCompleted || _isDisposed)
            return;
        var v = _currentVersion;
        if (v != null) {
            PushVersion(v);
        }
        _currentVersion = null;
        _isCompleted = true;
    }

    private void DeleteVersion(Version version) {
        Console.WriteLine($"Version {version} deleted");
    }

    private void PushVersion(Version version) {
        Console.WriteLine($"Version {version} pushed");
    }
}

它可以工作,但它不支持嵌套作用域,这不好,因此为了解决这个问题,我们需要在启动新作用域时存储以前的作用域,并在Complete or Dispose:

public sealed class VersionScope : IDisposable {
    private bool _isCompleted;
    private bool _isDisposed;
    private static readonly ThreadLocal<VersionChain> _versions = new ThreadLocal<VersionChain>();

    public static Version CurrentVersion => _versions.Value?.Current;

    public VersionScope(int version) {
        var cur = _versions.Value;
        // remember previous versions if any
        _versions.Value = new VersionChain(new Version(version), cur);
    }

    public void Dispose() {
        if (_isCompleted || _isDisposed)
            return;
        var cur = _versions.Value;
        if (cur != null) {
            DeleteVersion(cur.Current);
            // restore previous
            _versions.Value = cur.Previous;
        }
        _isDisposed = true;
    }

    public void Complete() {
        if (_isCompleted || _isDisposed)
            return;
        var cur = _versions.Value;
        if (cur != null) {
            PushVersion(cur.Current);
            // restore previous
            _versions.Value = cur.Previous;
        }
        _isCompleted = true;
    }

    private void DeleteVersion(Version version) {
        Console.WriteLine($"Version {version} deleted");
    }

    private void PushVersion(Version version) {
        Console.WriteLine($"Version {version} pushed");
    }

    // just a class to store previous versions
    private class VersionChain {
        public VersionChain(Version current, VersionChain previous) {
            Current = current;
            Previous = previous;
        }

        public Version Current { get; }
        public VersionChain Previous { get; }
    }
}

这已经是你可以使用的东西了。示例用法(我使用单线程,但如果有多个线程分别执行此操作 - 它们不会相互干扰):

static void Main(string[] args) {
    PrintCurrentVersion(); // no version
    using (var s1 = new VersionScope(1)) {
        PrintCurrentVersion(); // version 1
        s1.Complete();
        PrintCurrentVersion(); // no version, 1 is already completed
        using (var s2 = new VersionScope(2)) {
            using (var s3 = new VersionScope(3)) {
                PrintCurrentVersion(); // version 3
            } // version 3 deleted
            PrintCurrentVersion(); // back to version 2
            s2.Complete();
        }
        PrintCurrentVersion(); // no version, all completed or deleted
    }
    Console.ReadKey();
}

private static void PrintCurrentVersion() {
    Console.WriteLine("Current version: " + VersionScope.CurrentVersion);
}

然而,当您使用异步调用时,这将不起作用,因为ThreadLocal绑定到一个线程,但异步方法可以跨多个线程。然而,有一个类似的构造名为AsyncLocal,该值将流经异步调用。所以我们可以添加构造函数参数VersionScope指示我们是否需要异步流。事务范围以类似的方式工作 - 有TransactionScopeAsyncFlowOption你进入TransactionScope构造函数指示它是否将流经异步调用。

修改后的版本如下所示:

public sealed class VersionScope : IDisposable {
    private bool _isCompleted;
    private bool _isDisposed;
    private readonly bool _asyncFlow;
    // thread local versions
    private static readonly ThreadLocal<VersionChain> _tlVersions = new ThreadLocal<VersionChain>();
    // async local versions
    private static readonly AsyncLocal<VersionChain> _alVersions = new AsyncLocal<VersionChain>();
    // to get current version, first check async local storage, then thread local
    public static Version CurrentVersion => _alVersions.Value?.Current ?? _tlVersions.Value?.Current;
    // helper method
    private VersionChain CurrentVersionChain => _asyncFlow ? _alVersions.Value : _tlVersions.Value;

    public VersionScope(int version, bool asyncFlow = false) {
        _asyncFlow = asyncFlow;

        var cur = CurrentVersionChain;
        // remember previous versions if any
        if (asyncFlow) {
            _alVersions.Value = new VersionChain(new Version(version), cur);
        }
        else {
            _tlVersions.Value = new VersionChain(new Version(version), cur);
        }
    }

    public void Dispose() {
        if (_isCompleted || _isDisposed)
            return;
        var cur = CurrentVersionChain;
        if (cur != null) {
            DeleteVersion(cur.Current);
            // restore previous
            if (_asyncFlow) {
                _alVersions.Value = cur.Previous;
            }
            else {
                _tlVersions.Value = cur.Previous;
            }
        }
        _isDisposed = true;
    }

    public void Complete() {
        if (_isCompleted || _isDisposed)
            return;
        var cur = CurrentVersionChain;
        if (cur != null) {
            PushVersion(cur.Current);
            // restore previous
            if (_asyncFlow) {
                _alVersions.Value = cur.Previous;
            }
            else {
                _tlVersions.Value = cur.Previous;
            }
        }
        _isCompleted = true;
    }

    private void DeleteVersion(Version version) {
        Console.WriteLine($"Version {version} deleted");
    }

    private void PushVersion(Version version) {
        Console.WriteLine($"Version {version} pushed");
    }

    // just a class to store previous versions
    private class VersionChain {
        public VersionChain(Version current, VersionChain previous) {
            Current = current;
            Previous = previous;
        }

        public Version Current { get; }
        public VersionChain Previous { get; }
    }
}

异步流范围的示例用法:

static void Main(string[] args) {
    Test();
    Console.ReadKey();
}

static async void Test() {
    PrintCurrentVersion(); // no version
    using (var s1 = new VersionScope(1, asyncFlow: true)) {
        await Task.Delay(100);
        PrintCurrentVersion(); // version 1
        await Task.Delay(100);
        s1.Complete();
        await Task.Delay(100);
        PrintCurrentVersion(); // no version, 1 is already completed
        using (var s2 = new VersionScope(2, asyncFlow: true)) {
            using (var s3 = new VersionScope(3, asyncFlow: true)) {
                PrintCurrentVersion(); // version 3
            } // version 3 deleted
            await Task.Delay(100);
            PrintCurrentVersion(); // back to version 2
            s2.Complete();
        }
        await Task.Delay(100);
        PrintCurrentVersion(); // no version, all completed or deleted
    }
}

private static void PrintCurrentVersion() {
    Console.WriteLine("Current version: " + VersionScope.CurrentVersion);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

交易范围类似功能 的相关文章