博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
异步代理池2--正确实现并发
阅读量:6540 次
发布时间:2019-06-24

本文共 5591 字,大约阅读时间需要 18 分钟。

 

并发访问网站的例子

下面就是一个并发访问proxypool中实现的服务器的例子,以这个例子来说明如何实现并发。

import aiohttpimport asyncioasync def localserver(semaphore):    async with semaphore:               async with aiohttp.ClientSession() as session:            async with session.get('http://127.0.0.1:8088', timeout=5) as resp:                print('hello')            await asyncio.sleep(3) # 模拟网络延迟async def coro():    semaphore = asyncio.Semaphore(5) # 限制并发量为5    to_get = [localserver(semaphore) for _ in range(20)] # 同时建立20个协程    await asyncio.wait(to_get) # 等待所有协程结束loop = asyncio.get_event_loop()loop.run_until_complete(coro())print(result)loop.close()

运行上面的代码,可以在终端看到每隔3秒就打印出5个"hello",下面是服务器的日志:

2017-06-01 14:45:35,375  DEBUG                                 server started at http://127.0.0.1:8088...2017-06-01 14:45:44,851  DEBUG    127.0.0.1:35698       GET    requested index page2017-06-01 14:45:44,853  DEBUG    127.0.0.1:35700       GET    requested index page2017-06-01 14:45:44,855  DEBUG    127.0.0.1:35702       GET    requested index page2017-06-01 14:45:44,858  DEBUG    127.0.0.1:35704       GET    requested index page2017-06-01 14:45:44,876  DEBUG    127.0.0.1:35706       GET    requested index page2017-06-01 14:45:47,864  DEBUG    127.0.0.1:35710       GET    requested index page......2017-06-01 14:45:50,912  DEBUG    127.0.0.1:35732       GET    requested index page2017-06-01 14:45:53,887  DEBUG    127.0.0.1:35734       GET    requested index page2017-06-01 14:45:53,919  DEBUG    127.0.0.1:35736       GET    requested index page2017-06-01 14:45:53,924  DEBUG    127.0.0.1:35738       GET    requested index page2017-06-01 14:45:53,925  DEBUG    127.0.0.1:35740       GET    requested index page2017-06-01 14:45:53,929  DEBUG    127.0.0.1:35742       GET    requested index page

可以在 14:45:44 时有5个几乎同时到达的请求,之后间隔3秒会就会有5个并发请求到达,20个请求一共耗时9秒左右。

并发访问网站一定要限流,这里是通过asyncio.Semaphore将并发请求数量控制在5个。

通过上面的例子可以看出实现并发的关键就在于同时建立多个协程,然后通过asyncio.wait方法等待它们结束,各个协程之间的调度交给事件循环完成。

改造 proxypool 以实现并发

主要修改的是proxy_crawler.pyproxy_validator.py2个模块。

并发地爬取

因为每个网站的规则都不同,要实现并发爬取所有的代理网站,需要修改协程间传递的数据,为它们添加上各自对应的规则,这样最终页面解析函数就可以使用对应的规则来解析爬取到的页面内容了,使用一个命名元组来包装这2种数据:

Result = namedtuple('Result', 'content rule')

content字段是url和爬取到的页面,rule字段则是对应的规则。

下面是支持并发的proxy_crawler的启动函数:

async def start(self):    to_crawl = [self._crawler(rule) for rule in self._rules] # 协程数等于规则数    await asyncio.wait(to_crawl)

现在可以并发地爬取所有的代理网站,而对于单个网站来说爬取过程依旧是顺序的(爬取页面的page_download函数的基本逻辑没变),因为爬取时没有使用代理,并发访问可能会被封IP。如果想要实现对单个代理网站的并发爬取,参考上面的例子也很容易实现。

并发地验证

之前实现的proxypool中最耗时的部分就是验证了,如果代理无效,需要等待其超时才能判断其无效,而免费的代理中绝大多数都是无效的,顺序验证就会非常耗时。

下面是支持并发的proxy_validator的启动函数:

async def start(self, proxies=None):    if proxies is not None:        to_validate = [self.validate_many(proxies) for _ in range(50)] # 建立 50 个协程,在爬取过程中验证代理    else:        proxies = await self._get_proxies()# 从代理池中获取若干代理,返回一个asyncio.Queue 对象        to_validate = [self.validate_one(proxies) for _ in range(proxies.qsize())] # 协程数等于队列的长度,定期验证代理池中的代理    await asyncio.wait(to_validate)

这部分相较之前的版本变化较大,除了为了支持并发而做的修改外,还进行了一点优化,重用了验证代理的代码,现在爬取代理时的验证和对代理池中的代理的定期验证都使用相同的验证代码。

防止日志阻塞事件循环

因为默认日志是输出到文件的,而asyncio包目前没有提供异步文件系统API,为了不让日志的I/O操作阻塞事件循环,通过调用run_in_executor方法,把日志操作发给asyncio的事件循环背后维护着的ThreadPoolExecutor 对象执行。

我定义了一个logger的代理,由于logger被托管到另一个线程中执行,会丢失当前的上下文信息,如果需要记录,可以使用traceback库获取它们并作为日志的msgexc_infostack_info都设置为False,这样就不需要修改现有的代码了:

import loggingimport logging.configimport yamlfrom pathlib import Pathfrom functools import wrapsPROJECT_ROOT = Path(__file__).parentdef _log_async(func):    """Send func to be executed by ThreadPoolExecutor of event loop."""    @wraps(func)    def wrapper(*args, **kwargs):        loop = asyncio.get_event_loop()        return loop.run_in_executor(None, partial(func, *args, **kwargs)) # run_in_executor 本身不支持关键字参数,logger是有关键字参数(如 'extra')的,使用 'functools.partial'    return wrapperclass _LoggerAsync:    """Logger's async proxy.    Logging were executed in a thread pool executor to avoid blocking the event loop.    """    def __init__(self, *, is_server=False):        logging.config.dictConfig(            yaml.load(open(str(PROJECT_ROOT / 'logging.yaml'), 'r')))  # load config from YAML file        if is_server:            self._logger = logging.getLogger('server_logger')        elif VERBOSE:            self._logger = logging.getLogger('console_logger')  # output to both stdout and file        else:            self._logger = logging.getLogger('file_logger')    def __getattr__(self, name):        if hasattr(self._logger, name):            return getattr(self._logger, name)        else:            msg = 'logger object has no attribute {!r}'            raise AttributeError(msg.format(name))    @_log_async    def debug(self, msg, *args, **kwargs):        self._logger.debug(msg, *args, exc_info=False, stack_info=False, **kwargs)    @_log_async    def info(self, msg, *args, **kwargs):        self._logger.info(msg, *args, exc_info=False, stack_info=False, **kwargs)    @_log_async    def warning(self, msg, *args, **kwargs):        self._logger.warning(msg, *args, exc_info=False, stack_info=False, **kwargs)    @_log_async    def error(self, msg, *args, **kwargs):        self._logger.error(msg, *args, exc_info=False, stack_info=False, **kwargs)    @_log_async    def exception(self, msg, *args, exc_info=True, **kwargs):        self._logger.exception(msg, *args, exc_info=False, stack_info=False, **kwargs)     @_log_async    def critical(self, msg, *args, **kwargs):        self._logger.critical(msg, *args, exc_info=False, stack_info=False, **kwargs)logger = _LoggerAsync()

转载于:https://www.cnblogs.com/trunkslisa/p/9830633.html

你可能感兴趣的文章
PHP中的一些新特性
查看>>
I.MX6 Android mmm convenient to use
查看>>
55、Android网络图片 加载缓存处理库的使用
查看>>
OC基础--OC中的类方法和对象方法
查看>>
ubuntu samba服务器多用户配置【转】
查看>>
母线的种类与作用是什么(转)
查看>>
【Xamarin 挖墙脚系列:IOS 开发界面的3种方式】
查看>>
Atitit.工作流系统的本质是dsl 图形化的dsl 4gl
查看>>
I.MX6 Android USB Touch eGTouchA.ini文件存放
查看>>
4-5-创建索引表-串-第4章-《数据结构》课本源码-严蔚敏吴伟民版
查看>>
java 操作 RabbitMQ 发送、接受消息
查看>>
go run main.go undefined? golang main包那点事
查看>>
从零开始写一个npm包,一键生成react组件(偷懒==提高效率)
查看>>
Volley(二)—— 基本Request对象 & RequestQueue&请求取消
查看>>
2017中国系统架构师大会“盛装”来袭
查看>>
中国最强的人工智能学术会议来了
查看>>
Metasploit的射频收发器功能 | Metasploit’s RF Transceiver Capabilities
查看>>
主库 归档 删除策略
查看>>
《Linux从入门到精通(第2版)》——导读
查看>>
路过下载攻击利用旧版 Android 漏洞安装勒索软件
查看>>