利用python获取IP资源池的方法

2023-05-16

在使用爬虫的时候,经常会遇到IP被禁止的情况,所以一般都需要一个资源池来提高降低风险

 以下代码中,基于python3.7,数据库用的是POSTGRESQL11,为了效率使用了队列,程序结构如下:

./

    bin/proxy_pools.py

    log/proxy_pools.log

IP来源于github,网址:https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list
 

import requests,re,random,psycopg2,logging,threading,time
from fake_useragent import UserAgent
from os import path
from queue import Queue

class Proxy_pools:
    
    #程序路径初始化
    program_path = path.dirname(path.dirname(path.abspath(__file__)))
    
    #网址链接初始化
    PROXY_URL = "https://raw.githubusercontent.com/fate0/proxylist/master/proxy.list"
    CHECK_URL1 = "https://www.baidu.com"
    CHECK_URL2 = "https://httpbin.org/ip"
    #数据库模块初始化
    INSERT_ACTIVE = "INSERT INTO spider_ip_pools_active(TYPE, URL, STATUS) VALUES('{0}', '{1}', '{2}');"
    SELECT_ACTIVE = "select 1 from spider_ip_pools_active where TYPE='{0}' and URL='{1}';" 
    conn = psycopg2.connect(host="127.0.0.1", user="root", password="000000", dbname="mysite", port=9527)
    cur = conn.cursor()
    #日志模块初始化
    logfile = program_path + '/log/proxy_pools.log'
    logger = logging.getLogger(__name__)
    logger.setLevel(level = logging.INFO)
    handler = logging.FileHandler(logfile)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(process)d - %(message)s')
    handler.setFormatter(formatter)
    handler.setLevel(logging.INFO)
    logger.addHandler(handler)

    def getHeaders(self):
        ua = UserAgent()
        headers = {'User-Agent': ua.random}
        return headers

    def getexistsproxy(self, type, url):
        self.cur.execute(self.SELECT_ACTIVE.format(type, url))
        proxy_exists = self.cur.fetchone()
        if not proxy_exists:
            return 0
        else:
            return 1
            

    def getProxies(self):
        id = 1
        proxys_dict = {}
        headers = self.getHeaders()
        proxy = {'https':'https://182.253.67.42:8080', 'http': 'http://122.152.4.65:3128'}
        try:
            htmls = requests.get(self.PROXY_URL, headers=headers, timeout=20).text
        except:
            self.logger.warning("获取最新的proxys代理池失败")
            raise
        htmls = htmls.split('\n')[:-1]
        for html in htmls:
            try:
                proxy_dict_t = eval(html)    #验证是否可以转化为字典
            except:
                pass
            else:
                proxy_exists = self.getexistsproxy(proxy_dict_t['type'],proxy_dict_t['type'] + "://" + str(proxy_dict_t['host']) + ":" + str(proxy_dict_t['port']))
                if proxy_exists == 0:
                    proxys_dict[id] = eval(html)
                    id += 1
        self.logger.info("获取最新的proxys代理池成功, 一共获取{0}条数据".format(len(proxys_dict)))
        return proxys_dict

    def insertProxies(self, proxys_checked):
        for id in proxys_checked:
            type = proxys_checked[id][0]
            url = proxys_checked[id][1]
            status = proxys_checked[id][2]
            try:
                self.logger.info("执行sql: " + self.INSERT_ACTIVE.format(type, url, status))
                self.cur.execute(self.INSERT_ACTIVE.format(type, url, status))
            except psycopg2.errors.UniqueViolation:
                self.conn.rollback()
                self.logger.info("插入proxy失败, 失败原因: 主键冲突")
            except psycopg2.errors.InFailedSqlTransaction:
                self.conn.rollback()
                self.logger.info("插入proxy失败, 失败原因: 提交异常") 
            except:
                self.conn.rollback()
                self.logger.info("插入proxy失败, 失败原因: 未知异常")
                raise
            else:
                self.logger.info("插入proxy成功")
            finally:
                self.conn.commit()
        return None
   
    def checkUrl(self, headers, status, id, type, host, port):
        if type == 'http':
            url = "http://" + str(host) + ":" + str(port)
            proxy = {type:url}
        elif type == 'https':
            url = "https://" + str(host) + ":" + str(port)
            proxy = {type:url}
        self.logger.info("生成proxy代理成功" + ",id=" + str(id) + ",type=" + type + ",url=" + url + "...准备测试")

        try:
            res = requests.get(self.CHECK_URL1, headers=headers,timeout=10)
        except:
            self.logger.info("测试proxy代理失败" + ",id=" + str(id) + ",type=" + type + ",url=" + url + "...无法使用")
        else:
            status = 'US10'
            if res.status_code == 200 and '<!--STATUS OK-->' in res.text:
                self.logger.info("测试proxy代理成功" + ",id=" + str(id) + ",type=" + type + ",url=" + url + "...可以使用")
                proxy = (type, url, status)
                return proxy
            else:
                self.logger.info("测试proxy代理失败" + ",id=" + str(id) + ",type=" + type + ",url=" + url + "...无法使用")
                
    def main(self):
        proxys_checked = {}
        #获取proxy代理池, 及相关数据
        self.logger.info("开始获取最新的proxys代理池")
        proxys_unchecked = self.getProxies()
        status = 'US99'                             #状态默认不通过
        headers = self.getHeaders()                 #头标识
        #创建多线程队列
        self.logger.info("开始创建校验proxy可用性线程队列")
        proxys_threads = []             #线程
        que = Queue()                   #队列
        for id in proxys_unchecked:
            type = proxys_unchecked[id]['type']
            host = proxys_unchecked[id]['host']
            port = proxys_unchecked[id]['port']
            proxys_threads.append(threading.Thread(target=lambda que, headers, status, id, type, host, port: que.put(self.checkUrl(headers, status, id, type, host, port)), args=(que, headers, status, id, type, host, port)))
            self.logger.info("插入" + "[type:" + type + ";host:" + host + ";port:" + str(port) +"]校验proxy可用性队列成功")
        self.logger.info("创建校验proxy可用性线程队列成功")
        self.logger.info("准备启动校验proxy可用性线程")
        for proxys_thread in proxys_threads:
            proxys_thread.setDaemon(True)
            proxys_thread.start()
        self.logger.info("开始守候校验proxy可用性线程...")
        for proxys_thread in proxys_threads:
            proxys_thread.join(timeout=12)
        self.logger.info("准备获取校验proxy可用性队列结束...")
        id = 1
        while not que.empty():
            result = que.get()
            if result:
                proxys_checked[id] = result
                id += 1
        self.logger.info("获取校验proxy可用性队列结果成功...")
        self.logger.info("准备将proxys插入数据库...")
        self.insertProxies(proxys_checked)
        self.logger.info("proxys插入数据库结束...")
        self.logger.info("获取最新的proxys代理池进程结束")

if __name__ == "__main__":
    a = Proxy_pools()
    a.main()

写的一般般,其实还有很多地方可以优化一下,不过既然程序可以跑起来,就懒得搞了,如果你看了之后有更好的想法,请分享给我,谢谢~~

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

利用python获取IP资源池的方法 的相关文章

随机推荐

  • Ubuntu 升级cmake 版本

    PS 在编译一些包时需要更高的版本 xff0c 需要升级 cmake 千万别执行下面的命令 xff0c 这样会把之前用 cmake 编译好的包都给卸载掉 xff0c 包括ros sudo apt get autoremove cmake 比
  • 视觉slam十四讲(ch6) Ubuntu18.04安装 g2o库 报错error: FixedArray ... has no member named ‘fill’

    ps 再学习14讲第二版的时候 xff0c 运行g2o 报错 error FixedArray aka class ceres internal FixedArray lt double 6 gt has no member named f
  • 无人驾驶学习笔记-NDT 配准

    目录 1 NDT 的算法处理流程 2 NDT 公式推导 3 NDT 实例 3 1 常规NDT的位姿估计 3 2 front end node 1 ROS常规初始化 2 初始化操作 xff1a 读取传感器数据 获取lidar to imu变换
  • KD 树原理以及在三维激光点云中的应用

    目录 1 介绍 2 原理 2 1 数据结构 2 2 构建KD树 2 3 实例 3 程序示例 4 参考链接 1 介绍 kd tree简称k维树 xff0c 是一种空间划分的数据结构 常被用于高维空间中的搜索 xff0c 比如范围搜索和最近邻搜
  • slam 基础知识整理之- 最小二乘问题的引出与求解方法

    目录 1 最小二乘引出 2 线性最小二乘 及 求解方法 3 非线性最小二乘 编辑 3 1 求解思路 3 2 常用四种方法 3 3 四种方法总结 4 参考链接 在SLAM的过程中 xff0c 我们可以构建机器人状态过程 通过对其概率的计算 x
  • 无人驾驶学习笔记 - LOAM 算法论文核心关键点总结

    目录 1 框架 2 特征点提取 3 点云去畸变 4 帧间匹配 特征关联与损失函数计算 a 线特征 b 面特征 5 运动估计 6 建图 7 姿态融合 8 LOAM 优劣势 9 参考连接 1 框架 loam框架核心是两部分 xff0c 高频率的
  • 动态窗口法的理解和一些细节

    机器人局部路径规划 动态窗口法 动态窗口法 xff08 Dynamic Window Approach xff0c DWA xff09 是一类经典的机器人局部路径规划算法 它的过程主要分为两部分 xff1a 速度空间 v
  • 无人驾驶学习笔记 - A-LOAM 算法代码解析总结

    目录 1 概述 2 scanRegistration cpp 2 1 代码注释 2 1 1 主函数 2 1 2 removeClosedPointCloud xff08 雷达周边过近点移除 xff09 2 1 3 laserCloudHan
  • 无人驾驶学习笔记-LeGO-LOAM 算法源码学习总结

    目录 1 概述 2 lego loam的贡献 3 系统框图 4 ros graph中的节点关系表 5 lego loam 的文件系统架构 6 各部分方法原理及代码注释 6 1 点云投影与目标分割 1 总结概述 2 代码注释 2 1 copy
  • Boost 中 signal2 用法

    boost 函数与回调 xff08 三 xff09 signals2
  • 树莓派学习笔记

    文章目录 树莓派基础入门笔记无显示屏使用方式基础教程5 树莓派文件传输 配置编译环境使用U盘直接传输使用vnc传输文件FTP文件传输协议Python配置编译环境C C 43 43 配置编译环境Linux常用终端命令nano和vi编辑器的使用
  • 22.IO与显示器

    README 1 本文内容总结自 B站 操作系统 哈工大李治军老师 xff0c 内容非常棒 xff0c 墙裂推荐 xff1b 2 显示器是输入型外设 xff1b 3 本章主要内容是讲 显示器是如何被驱动的 xff1b 或操作系统是如何让用户
  • BGP协议基础配置—学习

    BGP重要概念 IGP是运行在AS内部的路由协议 xff0c 主要有RIP OSPF及IS IS xff0c 着重于发现和计算路由 EGP是运行在AS之间的路由协议 xff0c 通常是BGP xff0c 它是实现路由控制和选择最好的路由协议
  • STM32 Not a genuine ST Device! Abort connection 错误解决方案

    STM32 Not a genuine ST Device Abort connection 错误解决方案 网上解决方案晶振设置不匹配导致Connect setting to with Pre reset降低MAX Clock 我自己的解决
  • VMware的.vmdk文件只赠不减的处理方法

    VMware虚拟机的虚拟磁盘的大小会随着使用时间不断变大 xff0c 而且只赠不减 即使在虚拟系统中删除了磁盘中的文件 xff0c 虚拟磁盘的大小仍然不会变小 释放空闲磁盘的方法如下 xff1a VMWare Tools中的 Shrink功
  • 使用码云(Gitee)进行代码管理,以及VsCode关联Git

    一 安装git Git的下载 安装与配置 git 简明指南 二 注册码云 1 xff09 注册码云账号 xff1a 码云官网 2 xff09 绑定邮箱 xff1a 右上角 头像 设置 邮箱管理 三 本地项目与码云关联 1 本地项目上传至码云
  • 删掉带页眉的空白页结果把所有页眉都删掉解决办法

    点击视图 大纲 会发现在之前的操作中有两个分节符 xff0c 把上面那个删掉即可 参考链接https zhidao baidu com question 105591450 html 小问题也蛮耗时的
  • ROS之多个订阅数据同步

    做传感器数据融合时 xff0c 常常会需要用到多个数据 xff0c 即需要同时订阅多个话题 那么 xff0c 如何同步这些传感器数据的时间辍 xff0c 并将它们放入一个回调函数中进行处理呢 xff1f 参考文档 xff1a http wi
  • C++中使用strtok函数分割字符串String

    C 43 43 中使用strtok函数分割字符串String string str getline cin str vector lt string gt vec char p 61 strtok char str c str 34 34
  • 利用python获取IP资源池的方法

    在使用爬虫的时候 xff0c 经常会遇到IP被禁止的情况 xff0c 所以一般都需要一个资源池来提高降低风险 以下代码中 xff0c 基于python3 7 xff0c 数据库用的是POSTGRESQL11 xff0c 为了效率使用了队列