使用线程池限制最大线程数 - 尝试读取或写入受保护的内存错误

2023-12-15

我正在使用一些scrapingNoseratio 的代码在这里找到https://stackoverflow.com/a/22262976/3499115。他编写它是为了抓取 url 列表,但我正在使用它,但是在我正在使用的另一个网络爬虫 MVC 控制器中一次仅呈现一个 url。每次找到特定类型的链接时,我都会调用此代码,并且多次执行此操作似乎会导致内存不足。也许解决方案是使用线程池并限制最大线程数,但我该如何对此代码执行此操作?以下是调用网络浏览器代码的网络爬虫代码:

public static HtmlDocument renderJavascript(string url)
    {
        HtmlDocument doc = new HtmlDocument();
        // using webBrowserScraper
        try
        {
            WebBrowserExt.SetFeatureBrowserEmulation(); // enable HTML5

            var cts = new CancellationTokenSource((int)TimeSpan.FromMinutes(3).TotalMilliseconds);
            var task = WebBrowserScraper.ScrapeSitesAsync(
                url,
                cts.Token);

            task.Wait();

            //Console.WriteLine("Press Enter to exit...");
            //Console.ReadLine();
            doc.LoadHtml(task.Result);
            return doc;
        }
        catch (Exception ex)
        {
            while (ex is AggregateException && ex.InnerException != null)
                ex = ex.InnerException;
            Console.WriteLine(ex.Message);
            //Environment.Exit(-1);
        }
        return null;
    }

以及网络浏览器代码(我刚刚将 ScrapeSitesAsync 函数中的参数更改为单个字符串:

using System;
using using System.Linq;
using System.Text;
using Microsoft.Win32;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace Abot.Demo
{
    public class WebBrowserScraper
    {
        // by Noseratio - https://stackoverflow.com/a/22262976/1768303

        // main logic
        public static async Task<string> ScrapeSitesAsync(string url, CancellationToken token)

        {
            using (var apartment = new MessageLoopApartment())
            {
                // create WebBrowser inside MessageLoopApartment
                var webBrowser = apartment.Invoke(() => new WebBrowser());
                try
                {
                    Console.WriteLine("WebBrowser URL:\n" + url);

                    // cancel in 30s or when the main token is signalled
                    var navigationCts = CancellationTokenSource.CreateLinkedTokenSource(token);
                    navigationCts.CancelAfter((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
                    var navigationToken = navigationCts.Token;

                    // run the navigation task inside MessageLoopApartment
                    string html = await apartment.Run(() =>
                        webBrowser.NavigateAsync(url, navigationToken), navigationToken);

                    Console.WriteLine("Scrape complete for URL:\n" + url);
                    return html;
            }
            finally
            {
                // dispose of WebBrowser inside MessageLoopApartment
                apartment.Invoke(() => webBrowser.Dispose());
            }
        }
    }
}

/// <summary>
/// WebBrowserExt - WebBrowser extensions
/// by Noseratio - https://stackoverflow.com/a/22262976/1768303
/// </summary>
public static class WebBrowserExt
{
    const int POLL_DELAY = 500;

    // navigate and download 
    public static async Task<string> NavigateAsync(this WebBrowser webBrowser, string url, CancellationToken token)
    {
        // navigate and await DocumentCompleted
        var tcs = new TaskCompletionSource<bool>();
        WebBrowserDocumentCompletedEventHandler handler = (s, arg) =>
            tcs.TrySetResult(true);

        using (token.Register(
            () => { webBrowser.Stop(); tcs.TrySetCanceled(); }, 
            useSynchronizationContext: true))
        {
            webBrowser.DocumentCompleted += handler;
            try
            {
                webBrowser.Navigate(url);
                await tcs.Task; // wait for DocumentCompleted
            }
            finally
            {
                webBrowser.DocumentCompleted -= handler;
            }
        }

        // get the root element
        var documentElement = webBrowser.Document.GetElementsByTagName("html")[0];

        // poll the current HTML for changes asynchronosly
        var html = documentElement.OuterHtml;
        while (true)
        {
            // wait asynchronously, this will throw if cancellation requested
            await Task.Delay(POLL_DELAY, token);

            // continue polling if the WebBrowser is still busy
            if (webBrowser.IsBusy)
                continue;

            var htmlNow = documentElement.OuterHtml;
            if (html == htmlNow)
                break; // no changes detected, end the poll loop

            html = htmlNow;
        }

        // consider the page fully rendered 
        token.ThrowIfCancellationRequested();
        return html;
    }

    // enable HTML5 (assuming we're running IE10+)
    // more info: https://stackoverflow.com/a/18333982/1768303
    public static void SetFeatureBrowserEmulation()
    {
        if (System.ComponentModel.LicenseManager.UsageMode != System.ComponentModel.LicenseUsageMode.Runtime)
            return;
        var appName = System.IO.Path.GetFileName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);
        Registry.SetValue(@"HKEY_CURRENT_USER\Software\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BROWSER_EMULATION",
            appName, 10000, RegistryValueKind.DWord);
    }
}

/// <summary>
/// MessageLoopApartment
/// STA thread with message pump for serial execution of tasks
/// by Noseratio - https://stackoverflow.com/a/22262976/1768303
/// </summary>
public class MessageLoopApartment : IDisposable
{
    Thread _thread; // the STA thread

    TaskScheduler _taskScheduler; // the STA thread's task scheduler

    public TaskScheduler TaskScheduler { get { return _taskScheduler; } }

    /// <summary>MessageLoopApartment constructor</summary>
    public MessageLoopApartment()
    {
        var tcs = new TaskCompletionSource<TaskScheduler>();

        // start an STA thread and gets a task scheduler
        _thread = new Thread(startArg =>
        {
            EventHandler idleHandler = null;

            idleHandler = (s, e) =>
            {
                // handle Application.Idle just once
                Application.Idle -= idleHandler;
                // return the task scheduler
                tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext());
            };

            // handle Application.Idle just once
            // to make sure we're inside the message loop
            // and SynchronizationContext has been correctly installed
            Application.Idle += idleHandler;
            Application.Run();
        });

        _thread.SetApartmentState(ApartmentState.STA);
        _thread.IsBackground = true;
        _thread.Start();
        _taskScheduler = tcs.Task.Result;
    }

    /// <summary>shutdown the STA thread</summary>
    public void Dispose()
    {
        if (_taskScheduler != null)
        {
            var taskScheduler = _taskScheduler;
            _taskScheduler = null;

            // execute Application.ExitThread() on the STA thread
            Task.Factory.StartNew(
                () => Application.ExitThread(),
                CancellationToken.None,
                TaskCreationOptions.None,
                taskScheduler).Wait();

            _thread.Join();
            _thread = null;
        }
    }

    /// <summary>Task.Factory.StartNew wrappers</summary>
    public void Invoke(Action action)
    {
        Task.Factory.StartNew(action,
            CancellationToken.None, TaskCreationOptions.None, _taskScheduler).Wait();
    }

    public TResult Invoke<TResult>(Func<TResult> action)
    {
        return Task.Factory.StartNew(action,
            CancellationToken.None, TaskCreationOptions.None, _taskScheduler).Result;
    }

    public Task Run(Action action, CancellationToken token)
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler);
    }

    public Task<TResult> Run<TResult>(Func<TResult> action, CancellationToken token)
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler);
    }

    public Task Run(Func<Task> action, CancellationToken token)
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token)
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }
}

}


一种解决方案是使用SemaphoreSlim维持有限的资源池WebBrowser反对scrape并行网站。为所有人共享公共消息循环也是有意义的WebBrowser实例。

以下是它的实现方式,基于我的控制台网络scraper code你链接了。新的部分是WebBrowserPool类(警告:仅进行了轻微测试):

using Microsoft.Win32;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace AsyncWebBrowserScraper
{
    class Program
    {
        // by Noseratio - https://stackoverflow.com/a/23819021/1768303

        // test: web-scrape a list of URLs
        static async Task ScrapeSitesAsync(string[] urls, CancellationToken token)
        {
            using (var pool = new WebBrowserPool(maxParallel: 2, token: token))
            {
                // cancel each site in 30s or when the main token is signalled
                var timeout = (int)TimeSpan.FromSeconds(30).TotalMilliseconds;

                var results = urls.ToDictionary(
                    url => url, url => pool.ScrapeSiteAsync(url, timeout));

                await Task.WhenAll(results.Values);

                foreach (var url in results.Keys)
                {
                    Console.WriteLine("URL:\n" + url);

                    string html = results[url].Result;

                    Console.WriteLine("HTML:\n" + html);
                }
            }
        }

        // entry point
        static void Main(string[] args)
        {
            try
            {
                WebBrowserExt.SetFeatureBrowserEmulation(); // enable HTML5

                var cts = new CancellationTokenSource((int)TimeSpan.FromMinutes(3).TotalMilliseconds);

                var task = ScrapeSitesAsync(
                    new[] { "http://example.com", "http://example.org", "http://example.net", "http://www.bing.com", "http://www.google.com" },
                    cts.Token);

                task.Wait();

                Console.WriteLine("Press Enter to exit...");
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                while (ex is AggregateException && ex.InnerException != null)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
                Environment.Exit(-1);
            }
        }
    }

    /// <summary>
    /// WebBrowserPool the pool of WebBrowser objects sharing the same message loop
    /// </summary>
    public class WebBrowserPool : IDisposable
    {
        MessageLoopApartment _apartment; // a WinFroms STA thread with message loop
        readonly SemaphoreSlim _semaphore; // regulate available browsers
        readonly Queue<WebBrowser> _browsers; // the pool of available browsers
        readonly HashSet<Task> _pendingTasks; // keep track of pending tasks for proper cancellation
        readonly CancellationTokenSource _cts; // global cancellation (for Dispose)

        public WebBrowserPool(int maxParallel, CancellationToken token)
        {
            if (maxParallel < 1)
                throw new ArgumentException("maxParallel");

            _cts = CancellationTokenSource.CreateLinkedTokenSource(token);
            _apartment = new MessageLoopApartment();
            _semaphore = new SemaphoreSlim(maxParallel);
            _browsers = new Queue<WebBrowser>();
            _pendingTasks = new HashSet<Task>();

            // init the pool of WebBrowser objects
            _apartment.Invoke(() =>
            {
                while (--maxParallel >= 0)
                    _browsers.Enqueue(new WebBrowser());
            });
        }

        // Navigate to a site and get a snapshot of its DOM HTML
        public async Task<string> ScrapeSiteAsync(string url, int timeout, CancellationToken token = default(CancellationToken))
        {
            var navigationCts = CancellationTokenSource.CreateLinkedTokenSource(token, _cts.Token);
            var combinedToken = navigationCts.Token;

            // we have a limited number of WebBrowser objects available, so await the semaphore
            await _semaphore.WaitAsync(combinedToken);
            try
            {
                if (timeout != Timeout.Infinite)
                    navigationCts.CancelAfter(timeout);

                // run the main logic on the STA thread
                return await _apartment.Run(async () =>
                {
                    // acquire the 1st available WebBrowser from the pool
                    var webBrowser = _browsers.Dequeue();
                    try
                    {
                        var task = webBrowser.NavigateAsync(url, combinedToken);
                        _pendingTasks.Add(task); // register the pending task
                        try
                        {
                            return await task;
                        }
                        finally
                        {
                            // unregister the completed task
                            _pendingTasks.Remove(task);
                        }
                    }
                    finally
                    {
                        // return the WebBrowser to the pool
                        _browsers.Enqueue(webBrowser);
                    }
                }, combinedToken);
            }
            finally
            {
                _semaphore.Release();
            }
        }

        // Dispose of WebBrowserPool
        public void Dispose()
        {
            if (_apartment == null)
                throw new ObjectDisposedException(this.GetType().Name);

            // cancel and wait for all pending tasks
            _cts.Cancel();
            var task = _apartment.Run(() => Task.WhenAll(_pendingTasks.ToArray()));
            try
            {
                task.Wait();
            }
            catch
            {
                if (!task.IsCanceled)
                    throw;
            }

            // dispose of WebBrowser objects
            _apartment.Run(() =>
            {
                while (_browsers.Any())
                    _browsers.Dequeue().Dispose();
            });

            _apartment.Dispose();
            _apartment = null;
        }
    }

    /// <summary>
    /// WebBrowserExt - WebBrowser extensions
    /// by Noseratio - https://stackoverflow.com/a/22262976/1768303
    /// </summary>
    public static class WebBrowserExt
    {
        const int POLL_DELAY = 500;

        // navigate and download 
        public static async Task<string> NavigateAsync(this WebBrowser webBrowser, string url, CancellationToken token)
        {
            // navigate and await DocumentCompleted
            var tcs = new TaskCompletionSource<bool>();
            WebBrowserDocumentCompletedEventHandler handler = (s, arg) =>
                tcs.TrySetResult(true);

            using (token.Register(
                () => { webBrowser.Stop(); tcs.TrySetCanceled(); },
                useSynchronizationContext: true))
            {
                webBrowser.DocumentCompleted += handler;
                try
                {
                    webBrowser.Navigate(url);
                    await tcs.Task; // wait for DocumentCompleted
                }
                finally
                {
                    webBrowser.DocumentCompleted -= handler;
                }
            }

            // get the root element
            var documentElement = webBrowser.Document.GetElementsByTagName("html")[0];

            // poll the current HTML for changes asynchronosly
            var html = documentElement.OuterHtml;
            while (true)
            {
                // wait asynchronously, this will throw if cancellation requested
                await Task.Delay(POLL_DELAY, token);

                // continue polling if the WebBrowser is still busy
                if (webBrowser.IsBusy)
                    continue;

                var htmlNow = documentElement.OuterHtml;
                if (html == htmlNow)
                    break; // no changes detected, end the poll loop

                html = htmlNow;
            }

            // consider the page fully rendered 
            token.ThrowIfCancellationRequested();
            return html;
        }

        // enable HTML5 (assuming we're running IE10+)
        // more info: https://stackoverflow.com/a/18333982/1768303
        public static void SetFeatureBrowserEmulation()
        {
            if (System.ComponentModel.LicenseManager.UsageMode != System.ComponentModel.LicenseUsageMode.Runtime)
                return;
            var appName = System.IO.Path.GetFileName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);
            Registry.SetValue(@"HKEY_CURRENT_USER\Software\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BROWSER_EMULATION",
                appName, 10000, RegistryValueKind.DWord);
        }
    }

    /// <summary>
    /// MessageLoopApartment
    /// STA thread with message pump for serial execution of tasks
    /// by Noseratio - https://stackoverflow.com/a/22262976/1768303
    /// </summary>
    public class MessageLoopApartment : IDisposable
    {
        Thread _thread; // the STA thread

        TaskScheduler _taskScheduler; // the STA thread's task scheduler

        public TaskScheduler TaskScheduler { get { return _taskScheduler; } }

        /// <summary>MessageLoopApartment constructor</summary>
        public MessageLoopApartment()
        {
            var tcs = new TaskCompletionSource<TaskScheduler>();

            // start an STA thread and gets a task scheduler
            _thread = new Thread(startArg =>
            {
                EventHandler idleHandler = null;

                idleHandler = (s, e) =>
                {
                    // handle Application.Idle just once
                    Application.Idle -= idleHandler;
                    // return the task scheduler
                    tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext());
                };

                // handle Application.Idle just once
                // to make sure we're inside the message loop
                // and SynchronizationContext has been correctly installed
                Application.Idle += idleHandler;
                Application.Run();
            });

            _thread.SetApartmentState(ApartmentState.STA);
            _thread.IsBackground = true;
            _thread.Start();
            _taskScheduler = tcs.Task.Result;
        }

        /// <summary>shutdown the STA thread</summary>
        public void Dispose()
        {
            if (_taskScheduler != null)
            {
                var taskScheduler = _taskScheduler;
                _taskScheduler = null;

                // execute Application.ExitThread() on the STA thread
                Task.Factory.StartNew(
                    () => Application.ExitThread(),
                    CancellationToken.None,
                    TaskCreationOptions.None,
                    taskScheduler).Wait();

                _thread.Join();
                _thread = null;
            }
        }

        /// <summary>Task.Factory.StartNew wrappers</summary>
        public void Invoke(Action action)
        {
            Task.Factory.StartNew(action,
                CancellationToken.None, TaskCreationOptions.None, _taskScheduler).Wait();
        }

        public TResult Invoke<TResult>(Func<TResult> action)
        {
            return Task.Factory.StartNew(action,
                CancellationToken.None, TaskCreationOptions.None, _taskScheduler).Result;
        }

        public Task Run(Action action, CancellationToken token = default(CancellationToken))
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler);
        }

        public Task<TResult> Run<TResult>(Func<TResult> action, CancellationToken token = default(CancellationToken))
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler);
        }

        public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
        }

        public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用线程池限制最大线程数 - 尝试读取或写入受保护的内存错误 的相关文章

  • 为什么 int8_t 和用户通过 cin 输入显示奇怪的结果[重复]

    这个问题在这里已经有答案了 一小段代码让我发疯 但希望你能阻止我跳出窗外 看这里 include
  • 如何让 Swagger 插件在自托管服务堆栈中工作

    我已经用 github 上提供的示例重新提出了这个问题 并为任何想要自己运行代码的人提供了一个下拉框下载链接 Swagger 无法在自托管 ServiceStack 服务上工作 https stackoverflow com questio
  • 在 LINQ 中按 Id 连接多表和分组

    我想按categoryId显示列表产品的名称组 这是我的代码 我想要我的视图显示结果 Desktop PC HP Red PC Dell Yellow PC Asus Red SmartPhone Lumia 720 Blue 我的组模型
  • 如何在加载.NET WinForm应用程序user.config文件时捕获异常?

    有时 在使用默认配置系统的 NET 2 0 WinForm 桌面应用程序中 user config文件将被损坏并且无法再加载 当配置系统尝试加载它时 它会抛出一个System Xml XmlException 抛开 为什么文件首先被损坏 的
  • 在 C 中匹配二进制模式

    我目前正在开发一个 C 程序 需要解析一些定制的数据结构 幸运的是我知道它们是如何构造的 但是我不确定如何在 C 中实现我的解析器 每个结构的长度都是 32 位 并且每个结构都可以通过其二进制签名来识别 举个例子 有两个我感兴趣的特定结构
  • 使用 LINQ2SQL 在 ASP.NET MVC 中的各种模型存储库之间共享数据上下文

    我的应用程序中有 2 个存储库 每个存储库都有自己的数据上下文对象 最终结果是我尝试将从一个存储库检索到的对象附加到从另一个存储库检索到的对象 这会导致异常 Use 构造函数注入将 DataContext 注入每个存储库 public cl
  • 复制目录内容

    我想将目录 tmp1 的内容复制到另一个目录 tmp2 tmp1 可能包含文件和其他目录 我想使用C C 复制tmp1的内容 包括模式 如果 tmp1 包含目录树 我想递归复制它们 最简单的解决方案是什么 我找到了一个解决方案来打开目录并读
  • 使用 Newtonsoft 和 C# 反序列化嵌套 JSON

    我正在尝试解析来自 Rest API 的 Json 响应 我可以获得很好的响应并创建了一些类模型 我正在使用 Newtonsoft 的 Json Net 我的响应中不断收到空值 并且不确定我的模型设置是否正确或缺少某些内容 例如 我想要获取
  • 单个对象的 Monogame XNA 变换矩阵?

    我读过一些解释 XNA Monogame 变换矩阵的教程 问题是这些矩阵应用于 SpriteBatch Begin matrix 这意味着所有 Draw 代码都将被转换 如何将变换矩阵应用于单个可绘制对象 就我而言 我想转换滚动背景 使其自
  • java.io.Serialized 在 C/C++ 中的等价物是什么?

    C C 的等价物是什么java io Serialized https docs oracle com javase 7 docs api java io Serializable html 有对序列化库的引用 用 C 序列化数据结构 ht
  • 在 C 中初始化变量

    我知道有时如果你不初始化int 如果打印整数 您将得到一个随机数 但将所有内容初始化为零似乎有点愚蠢 我问这个问题是因为我正在评论我的 C 项目 而且我对缩进非常直接 并且它可以完全编译 90 90 谢谢 Stackoverflow 但我想
  • DbContext 和 ObjectContext 有什么区别

    From MSDN 表示工作单元和存储库模式的组合 使您能够查询数据库并将更改分组在一起 然后将这些更改作为一个单元写回存储 DbContext在概念上类似于ObjectContext 我虽然DbContext只处理与数据库的连接以及针对数
  • 为什么 std::strstream 被弃用?

    我最近发现std strstream已被弃用 取而代之的是std stringstream 我已经有一段时间没有使用它了 但它做了我当时需要做的事情 所以很惊讶听到它的弃用 我的问题是为什么做出这个决定 有什么好处std stringstr
  • 等待进程释放文件

    我如何等待文件空闲以便ss Save 可以用新的覆盖它吗 如果我紧密地运行两次 左右 我会得到一个generic GDI error
  • CMake 无法确定目标的链接器语言

    首先 我查看了this https stackoverflow com questions 11801186 cmake unable to determine linker language with c发帖并找不到解决我的问题的方法 我
  • 使用管道时,如果子进程数量大于处理器数量,进程是否会被阻塞?

    当子进程数量很大时 我的程序停止运行 我不知道问题是什么 但我猜子进程在运行时以某种方式被阻止 下面是该程序的主要工作流程 void function int process num int i initial variables for
  • Cmake 链接共享库:包含库中的头文件时“没有这样的文件或目录”

    我正在学习使用 CMake 构建库 构建库的代码结构如下 include Test hpp ITest hpp interface src Test cpp ITest cpp 在 CMakeLists txt 中 我用来构建库的句子是 f
  • 将 MQTTNet 服务器与 MQTT.js 客户端结合使用

    我已经启动了一个 MQTT 服务器 就像this https github com chkr1011 MQTTnet tree master例子 该代码托管在 ASP Net Core 2 0 应用程序中 但我尝试过控制台应用程序 但没有成
  • 为什么 gcc 抱怨“错误:模板参数 '0' 的类型 'intT' 取决于模板参数”?

    我的编译器是gcc 4 9 0 以下代码无法编译 template
  • 无法接收 UDP Windows RT

    我正在为 Windows 8 RT 编写一个 Windows Store Metro Modern RT 应用程序 需要在端口 49030 上接收 UDP 数据包 但我似乎无法接收任何数据包 我已按照使用教程进行操作DatagramSock

随机推荐