0
点赞
收藏
分享

微信扫一扫

淘宝检测 selenium,那就试试 pyppeteer 吧~!

金穗_ec4b 2022-10-17 阅读 202


淘宝检测 selenium,那就试试 pyppeteer 吧~!_ide


我们都知道,selenium 可以很好地处理动态加载,做到 WYSIWYG(What you see is what you get,所见即所得),但是有那么少数网站(比如淘宝)不仅仅是动态加载,而且还对 selenium 做了检测,这个时候还是有办法的,我们可以使用 PyQt5 内置的浏览器注入 JavaScript 来进行爬取。但是这种办法太繁琐了,有没有简单一点的办法呢?答案是有的,我们可以使用 pyppeteer。



什么是 pyppeteer?


pyppeteer 是 selenium 的一个替代品,是 puppeteer 的 Python 版本的实现。puppeteer 是 Google 基于 Node.js 开发的一个工具,有了它我们就可以通过 JavaScript 来控制 Chrome 浏览器的一些操作,当然也可以用在网络爬虫上,其 API 极其完善,功能非常强大。那么 pyppeteer 是不是也是 Google 开发的呢?这里需要注意一下,pyppeteer 并不是谷歌开发的,是一位来自日本的工程师依据 puppeteer 的一些功能开发出来的非官方版本。还有就是 pyppeteer 是基于 Python 的新特性 async 实现的,支持异步,效率比 selenium 要高,既然如此,那么 Python 的版本必须是 3.5 以上!

在 pyppeteer 中,实际上它背后也有一个类似 Chrome 浏览器的 Chromium 浏览器在执行一些动作进行网页渲染,接下来我说一下 Chrome 浏览器和 Chromium 浏览器的渊源。

Chromium 是 Google 为了研发 Chrome 而启动的一个项目,是完全开源的。二者基于相同的源代码构建,Chrome 所有的新功能都会先在 Chromium 上面实现,等经过测试认为稳定之后才会移植到 Chrome 上,因此 Chromium 更新频率更高,也会包含很多新的功能,但作为一款独立的浏览器,Chromium 用户群体要小众得多。

总的来说,两款浏览器有着相同的内核,一模一样的实现方式,可以认为是开发版和正式版的区别,功能上基本是没有太大区别的。


是不是需要先安装 Chromium?


既然 pyppeteer 是依赖于 Chromium 的,那么我是不是需要先安装 Chromium 呢?其实不用,第一次使用 pyppeteer 会自动为你下载安装 Chromium 的,但是……不要高兴太早!不要高兴太早!不要高兴太早!(重要的事情说 3 遍)至于为什么?后面再说,现在只不过是让大家做好心理准备。


安装 pyppeteer


pyppeteer 的安装非常简单,直接 pip install pyppeteer 就行了,安装完成之后,进入交互式解释器 import 一下,如果没有报错就说明安装成功了。


快速上手


下面就来简单地使用一下 pyppeteer,代码如下:

import asyncio
from pyppeteer import launch
from pyquery import PyQuery as Pq


async def main():
browser = await launch()
page = await browser.newPage()
await page.goto('http://quotes.toscrape.com/js/')
doc = Pq(await page.content())
print('Quotes:', doc('.quote').length)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())

第一次运行之后会发现等了很久一直卡在如图所示的状态。

淘宝检测 selenium,那就试试 pyppeteer 吧~!_chrome_02

再过一会儿就会报错,如图所示。

淘宝检测 selenium,那就试试 pyppeteer 吧~!_chrome_03

大意是重试了很多次,但依旧没有和下载地址直接建立连接,既然无法直接建立连接,那么我们可以找一个中介者,这个中介者就是传说中的 SOCKS5。SOCKS5 可以去付费代理网站(比如阿布云)自行购买,下面我是直接使用本地代理软件的。接下来的问题是在代码的哪一块地方设置代理?其实我们注意到运行结果的第一行红色的字。

[W:pyppeteer.chromium_downloader] start chromium download.

这一行就是告诉你开始下载 Chromium,下载 Chromium 的操作位于模块 pyppeteer.chromium_downloader 中,那么下载的实现代码十有八九就在里面,我们来看一下下载的实现。

def download_zip(url: str) -> BytesIO:
"""Download data from url."""
logger.warning('start chromium download.\n'
'Download may take a few minutes.')

# disable warnings so that we don't need a cert.
# see https://urllib3.readthedocs.io/en/latest/advanced-usage.html for more
urllib3.disable_warnings()

with urllib3.PoolManager() as http:
# Get data from url.
# set preload_content=False means using stream later.
data = http.request('GET', url, preload_content=False)

try:
total_length = int(data.headers['content-length'])
except (KeyError, ValueError, AttributeError):
total_length = 0

process_bar = tqdm(total=total_length)

# 10 * 1024
_data = BytesIO()
for chunk in data.stream(10240):
_data.write(chunk)
process_bar.update(len(chunk))
process_bar.close()

logger.warning('\nchromium download done.')
return _data

这个函数传入一个 URL 的字符串,返回一个字节流。

函数内部前面两行代码(注释空行不算)是对警告的处理,不必理会。继续往下看,可以发现它先实例化一个 urllib3.PoolManager。那么代理是不是通过设置 urllib3.PoolManager 的属性来设置的呢?不用多说,继续去看一下 urllib3.PoolManager 的源码,如下所示。

class PoolManager(RequestMethods):
"""
Allows for arbitrary requests while transparently keeping track of
necessary connection pools for you.

:param num_pools:
Number of connection pools to cache before discarding the least
recently used pool.

:param headers:
Headers to include with all requests, unless other headers are given
explicitly.

:param \\**connection_pool_kw:
Additional parameters are used to create fresh
:class:`urllib3.connectionpool.ConnectionPool` instances.

Example::

>>> manager = PoolManager(num_pools=2)
>>> r = manager.request('GET', 'http://google.com/')
>>> r = manager.request('GET', 'http://google.com/mail')
>>> r = manager.request('GET', 'http://yahoo.com/')
>>> len(manager.pools)
2

"""

proxy = None

def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
RequestMethods.__init__(self, headers)
self.connection_pool_kw = connection_pool_kw
self.pools = RecentlyUsedContainer(num_pools,
dispose_func=lambda p: p.close())

# Locally set the pool classes and keys so other PoolManagers can
# override them.
self.pool_classes_by_scheme = pool_classes_by_scheme
self.key_fn_by_scheme = key_fn_by_scheme.copy()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.clear()
# Return False to re-raise any potential exceptions
return False

def _new_pool(self, scheme, host, port, request_context=None):
"""
Create a new :class:`ConnectionPool` based on host, port, scheme, and
any additional pool keyword arguments.

If ``request_context`` is provided, it is provided as keyword arguments
to the pool class used. This method is used to actually create the
connection pools handed out by :meth:`connection_from_url` and
companion methods. It is intended to be overridden for customization.
"""
pool_cls = self.pool_classes_by_scheme[scheme]
if request_context is None:
request_context = self.connection_pool_kw.copy()

# Although the context has everything necessary to create the pool,
# this function has historically only used the scheme, host, and port
# in the positional args. When an API change is acceptable these can
# be removed.
for key in ('scheme', 'host', 'port'):
request_context.pop(key, None)

if scheme == 'http':
for kw in SSL_KEYWORDS:
request_context.pop(kw, None)

return pool_cls(host, port, **request_context)

def clear(self):
"""
Empty our store of pools and direct them all to close.

This will not affect in-flight connections, but they will not be
re-used after completion.
"""
self.pools.clear()

def connection_from_host(self, host, port=None, scheme='http', pool_kwargs=None):
"""
Get a :class:`ConnectionPool` based on the host, port, and scheme.

If ``port`` isn't given, it will be derived from the ``scheme`` using
``urllib3.connectionpool.port_by_scheme``. If ``pool_kwargs`` is
provided, it is merged with the instance's ``connection_pool_kw``
variable and used to create the new connection pool, if one is
needed.
"""

if not host:
raise LocationValueError("No host specified.")

request_context = self._merge_pool_kwargs(pool_kwargs)
request_context['scheme'] = scheme or 'http'
if not port:
port = port_by_scheme.get(request_context['scheme'].lower(), 80)
request_context['port'] = port
request_context['host'] = host

return self.connection_from_context(request_context)

def connection_from_context(self, request_context):
"""
Get a :class:`ConnectionPool` based on the request context.

``request_context`` must at least contain the ``scheme`` key and its
value must be a key in ``key_fn_by_scheme`` instance variable.
"""
scheme = request_context['scheme'].lower()
pool_key_constructor = self.key_fn_by_scheme[scheme]
pool_key = pool_key_constructor(request_context)

return self.connection_from_pool_key(pool_key, request_context=request_context)

def connection_from_pool_key(self, pool_key, request_context=None):
"""
Get a :class:`ConnectionPool` based on the provided pool key.

``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool

# Make a fresh ConnectionPool of the desired type
scheme = request_context['scheme']
host = request_context['host']
port = request_context['port']
pool = self._new_pool(scheme, host, port, request_context=request_context)
self.pools[pool_key] = pool

return pool

def connection_from_url(self, url, pool_kwargs=None):
"""
Similar to :func:`urllib3.connectionpool.connection_from_url`.

If ``pool_kwargs`` is not provided and a new pool needs to be
constructed, ``self.connection_pool_kw`` is used to initialize
the :class:`urllib3.connectionpool.ConnectionPool`. If ``pool_kwargs``
is provided, it is used instead. Note that if a new pool does not
need to be created for the request, the provided ``pool_kwargs`` are
not used.
"""
u = parse_url(url)
return self.connection_from_host(u.host, port=u.port, scheme=u.scheme,
pool_kwargs=pool_kwargs)

def _merge_pool_kwargs(self, override):
"""
Merge a dictionary of override values for self.connection_pool_kw.

This does not modify self.connection_pool_kw and returns a new dict.
Any keys in the override dictionary with a value of ``None`` are
removed from the merged dictionary.
"""
base_pool_kwargs = self.connection_pool_kw.copy()
if override:
for key, value in override.items():
if value is None:
try:
del base_pool_kwargs[key]
except KeyError:
pass
else:
base_pool_kwargs[key] = value
return base_pool_kwargs

def urlopen(self, method, url, redirect=True, **kw):
"""
Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
with custom cross-host redirect logic and only sends the request-uri
portion of the ``url``.

The given ``url`` parameter must be absolute, such that an appropriate
:class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
"""
u = parse_url(url)
conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)

kw['assert_same_host'] = False
kw['redirect'] = False

if 'headers' not in kw:
kw['headers'] = self.headers.copy()

if self.proxy is not None and u.scheme == "http":
response = conn.urlopen(method, url, **kw)
else:
response = conn.urlopen(method, u.request_uri, **kw)

redirect_location = redirect and response.get_redirect_location()
if not redirect_location:
return response

# Support relative URLs for redirecting.
redirect_location = urljoin(url, redirect_location)

# RFC 7231, Section 6.4.4
if response.status == 303:
method = 'GET'

retries = kw.get('retries')
if not isinstance(retries, Retry):
retries = Retry.from_int(retries, redirect=redirect)

# Strip headers marked as unsafe to forward to the redirected location.
# Check remove_headers_on_redirect to avoid a potential network call within
# conn.is_same_host() which may use socket.gethostbyname() in the future.
if (retries.remove_headers_on_redirect
and not conn.is_same_host(redirect_location)):
for header in retries.remove_headers_on_redirect:
kw['headers'].pop(header, None)

try:
retries = retries.increment(method, url, response=response, _pool=conn)
except MaxRetryError:
if retries.raise_on_redirect:
raise
return response

kw['retries'] = retries
kw['redirect'] = redirect

log.info("Redirecting %s -> %s", url, redirect_location)
return self.urlopen(method, redirect_location, **kw)

我们可以发现它有一个静态属性 proxy,我们只要修改这个 proxy 的值就可以了,现在直接修改 PoolManager 的源码;也可以修改 download_zip 的源码;还可以通过一步步调试找到关键的信息:下载地址,下载路径,安装路径,然后手动下载安装。但是我这里不建议用这些方法,我们可以自己编写一个脚本来实现修改,代码如下:

from pyppeteer.chromium_downloader import download_chromium
from urllib3 import PoolManager
PoolManager.proxy = 'socks5://localhost:1080'
download_chromium()

运行脚本即可完成 Chromium 的下载安装,运行结果如图所示。

淘宝检测 selenium,那就试试 pyppeteer 吧~!_chrome_04

好,接下来我就运行一下我之前写的那个程序,运行结果如图所示。

淘宝检测 selenium,那就试试 pyppeteer 吧~!_sed_05

可以发现动态加载的 10 条数据都被爬下来了。在讲绕过淘宝检测之前,先给出大家 pyppeteer 的官方文档,链接为:https://miyakogi.github.io/pyppeteer/reference.html,用到就去查,不用死记硬背。


绕过淘宝的检测


如果大家认真看官方文档,写出绕过淘宝检测的代码应该不是特别难,代码如下:

import asyncio
from pyppeteer import launch


async def main():
width, height = 1500, 800
browser = await launch(headless=False, args=['--disable-infobars', f'--window-size={width},{height}'])
page = await browser.newPage()
await page.setViewport({'width': width, 'height': height})
await page.goto('https://login.taobao.com/member/login.jhtml?redirectURL=https://www.taobao.com/')
await page.eval('() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }')
await asyncio.sleep(100)
asyncio.get_event_loop().run_until_complete(main())

运行之后会弹出一个 Chromium 浏览器,这个浏览器的加载页面的是淘宝的登录页面,我们尝试登录一下,看看能不能登进去,这当然是可以登进去,大家可以自己验证一下。

今天的文章有不懂的可以后台回复“加群”,备注:小陈学Python,不备注可是会被拒绝的哦~!

淘宝检测 selenium,那就试试 pyppeteer 吧~!_ide_06


举报

相关推荐

0 条评论