翼度科技»论坛 编程开发 python 查看内容

怎么用Python写一个浏览器集群框架

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
这是做什么用的

框架用途

在采集大量新闻网站时,不可避免的遇到动态加载的网站,这给配模版的人增加了很大难度。本来配静态网站只需要两个技能点:xpath和正则,如果是动态网站的还得抓包,遇到加密的还得js逆向。
所以就需要用浏览器渲染这些动态网站,来减少了配模板的工作难度和技能要求。动态加载的网站在新闻网站里占比很低,需要的硬件资源相对于一个人工来说更便宜。
实现方式

采集框架使用浏览器渲染有两种方式,一种是直接集成到框架,类似GerapyPyppeteer,这个项目你看下源代码就会发现写的很粗糙,它把浏览器放在_process_request方法里启动,然后采集完一个链接再关闭浏览器,大部分时间都浪费在浏览器的启动和关闭上,而且采集多个链接会打开多个浏览器抢占资源。
另一种则是将浏览器渲染独立成一个服务,类似scrapy-splash,这种方式比直接集成要好,本来就是两个不同的功能,实际就应该解耦成两个单独的模块。不过听前辈说这东西不太好用,会有内存泄漏的情况,我就没测试它。
自己实现

原理:在自动化浏览器中嵌入http服务实现http控制浏览器。这里我选择aiohttp+pyppeteer。之前看到有大佬使用go的rod来做,奈何自己不会go语言,还是用Python比较顺手。
后面会考虑用playwright重写一遍,pyppeteer的github说此仓库不常维护了,建议使用playwright。
开始写代码

web服务
  1. from aiohttp import web
  2. app = web.Application()
  3. app.router.add_view('/render.html', RenderHtmlView)
  4. app.router.add_view('/render.png', RenderPngView)
  5. app.router.add_view('/render.jpeg', RenderJpegView)
  6. app.router.add_view('/render.json', RenderJsonView)
复制代码
然后在RenderHtmlView类中写/render.html请求的逻辑。/render.json是用于获取网页的某个ajax接口响应内容。有些情况网页可能不方便解析,想拿到接口的json响应数据。
初始化浏览器

浏览器只需要初始化一次,所以启动放到on_startup,关闭放到on_cleanup
  1. c = LaunchChrome()
  2. app.on_startup.append(c.on_startup_tasks)
  3. app.on_cleanup.append(c.on_cleanup_tasks)
复制代码
其中on_startup_tasks和on_cleanup_tasks方法如下:
  1. async def on_startup_tasks(self, app: web.Application) -> None:
  2.                 page_count = 4
  3.                 await asyncio.create_task(self._launch())
  4.                 app["browser"] = self.browser
  5.                 tasks = [asyncio.create_task(self.launch_tab()) for _ in range(page_count-1)]
  6.                 await asyncio.gather(*tasks)
  7.                 queue = asyncio.Queue(maxsize=page_count+1)
  8.                 for i in await self.browser.pages():
  9.                                 await queue.put(i)
  10.                 app["pages_queue"] = queue
  11.                 app["screenshot_lock"] = asyncio.Lock()
  12. async def on_cleanup_tasks(self, app: web.Application) -> None:
  13.                 await self.browser.close()
复制代码
page_count为初始化的标签页数,这种常量一般定义到配置文件里,这里我图方便就不写配置文件了。
首先初始化所有的标签页放到队列里,然后存放在app这个对象里,这个对象可以在RenderHtmlView类里通过self.request.app访问到, 到时候就能控制使用哪个标签页来访问链接
我还初始化了一个协程锁,后面在RenderPngView类里截图的时候会用到,因为多标签不能同时截图,需要加锁。
超时停止页面继续加载
  1. async def _goto(self, page: Optional[Page], options: AjaxPostData) -> Dict:
  2.                 try:
  3.                                 await page.goto(options.url,
  4.                                                 waitUntil=options.wait_util, timeout=options.timeout*1000)
  5.                 except PPTimeoutError:
  6.                                 #await page.evaluate('() => window.stop()')
  7.                                 await page._client.send("Page.stopLoading")
  8.                 finally:
  9.                                 page.remove_all_listeners("request")
复制代码
有时间页面明明加载出来了,但还在转圈,因为某个图片或css等资源访问不到,强制停止加载也不会影响到网页的内容。
Page.stopLoading和window.stop()都可以停止页面继续加载,忘了之前为什么选择前者了
定义请求参数
  1. class HtmlPostData(BaseModel):
  2.     url: str
  3.     timeout: float = 30
  4.     wait_util: str = "domcontentloaded"
  5.     wait: float = 0   
  6.     js_name: str = ""
  7.     filters: List[str] = []
  8.     images: bool = 0  
  9.     forbidden_content_types: List[str] = ["image", "media"]
  10.     cache: bool = 1
  11.     cookie: bool = 0
  12.     text: bool = 1
  13.                 headers: bool = 1
复制代码

  • url: 访问的链接
  • timeout: 超时时间
  • wait_util: 页面加载完成的标识,一般都是domcontentloaded,只有截图的时候会选择networkidle2,让网页加载全一点。更多的选项的选项请看:Puppeteer waitUntil Options
  • wait: 页面加载完成后等待的时间,有时候还得等页面的某个元素加载完成
  • js_name: 预留的参数,用于在页面访问前加载js,目前就只有一个js(stealth.min.js)用于去浏览器特征
  • filters: 过滤的请求列表,  支持正则。比如有些css请求你不想让他加载
  • images: 是否加载图片
  • forbidden_content_types: 禁止加载的资源类型,默认是图片和视频。所有的类型见: resourcetype
  • cache: 是否启用缓存
  • cookie: 是否在返回结果里包含cookie
  • text: 是否在返回结果里包含html
  • headers: 是否在返回结果里包含headers
图片的参数
  1. class PngPostData(HtmlPostData):
  2.     render_all: int = 0
  3.     text: bool = 0
  4.     images: bool = 1
  5.     forbidden_content_types: List[str] = []
  6.     wait_util: str = "networkidle2"
复制代码
参数和html的基本一样,增加了一个render_all用于是否截取整个页面。截图的时候一般是需要加载图片的,所以就启用了图片加载
怎么使用

多个标签同时采集

默认是启动了四个标签页,这四个标签页可以同时访问不同链接。如果标签页过多可能会影响性能,不过开了二三十个应该没什么问题
请求例子如下:
  1. import sys
  2. import asyncio
  3. import aiohttp
  4. if sys.platform == 'win32':
  5.     asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  6. async def get_sign(session, delay):
  7.     url = f"http://www.httpbin.org/delay/{delay}"
  8.     api = f'http://127.0.0.1:8080/render.html?url={url}'
  9.     async with session.get(api) as resp:
  10.         data = await resp.json()
  11.         print(url, data.get("status"))
  12.         return data
  13. async def main():
  14.     headers = {
  15.         "Content-Type": "application/json",
  16.         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
  17.     }
  18.     loop = asyncio.get_event_loop()
  19.     t = loop.time()
  20.     async with aiohttp.ClientSession(headers=headers) as session:
  21.         tasks = [asyncio.create_task(get_sign(session, i)) for i in range(1, 5)]
  22.         await asyncio.gather(*tasks)
  23.     print("耗时: ", loop.time()-t)
  24.         
  25. if __name__ == "__main__":
  26.     asyncio.run(main())
复制代码
http://www.httpbin.org/delay后面跟的数字是多少,网站就会多少秒后返回。所以如果同步运行的话至少需要1+2+3+4秒,而多标签页异步运行的话至少需要4秒
结果如图,四个链接只用了4秒多点:

拦截指定ajax请求的响应
  1. import json
  2. import sys
  3. import asyncio
  4. import aiohttp
  5. if sys.platform == 'win32':
  6.     asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  7. async def get_sign(session, url):
  8.     api = f'http://127.0.0.1:8080/render.json'
  9.     data = {
  10.         "url": url,
  11.         "xhr": "/api/", # 拦截接口包含/api/的响应并返回
  12.         "cache": 0,
  13.         "filters": [".png", ".jpg"]
  14.     }
  15.     async with session.post(api, data=json.dumps(data)) as resp:
  16.         data = await resp.json()
  17.         print(url, data)
  18.         return data
  19. async def main():
  20.     urls = ["https://spa1.scrape.center/"]
  21.     headers = {
  22.         "Content-Type": "application/json",
  23.         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
  24.     }
  25.     loop = asyncio.get_event_loop()
  26.     t = loop.time()
  27.     async with aiohttp.ClientSession(headers=headers) as session:
  28.         tasks = [asyncio.create_task(get_sign(session, url)) for url in urls]
  29.         await asyncio.gather(*tasks)
  30.     print(loop.time()-t)
  31.         
  32. if __name__ == "__main__":
  33.     asyncio.run(main())
复制代码
请求https://spa1.scrape.center/这个网站并获取ajax链接中包含/api/的接口响应数据,结果如图:

请求一个网站用时21秒,这是因为网站一直在转圈,其实要的数据已经加载完成了,可能是一些图标或者css还在请求。
超时强制返回

加上timeout参数后,即使页面未加载完成也会强制停止并返回数据。如果这个时候已经拦截到了ajax请求会返回ajax响应内容,不然就是返回空
不过好像因为有缓存,现在时间不到1秒就返回了

截图
  1. import json
  2. import sys
  3. import asyncio
  4. import base64
  5. import aiohttp
  6. if sys.platform == 'win32':
  7.     asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  8. async def get_sign(session, url, name):
  9.     api = f'http://127.0.0.1:8080/render.png'
  10.     data = {
  11.         "url": url,
  12.         #"render_all": 1,
  13.         "images": 1,
  14.         "cache": 1,
  15.         "wait": 1
  16.     }
  17.     async with session.post(api, data=json.dumps(data)) as resp:
  18.         data = await resp.json()
  19.         if data.get('image'):
  20.             image_bytes = base64.b64decode(data["image"])
  21.             with open(name, 'wb') as f:
  22.                 f.write(image_bytes)
  23.             print(url, name, len(image_bytes))
  24.         return data
  25. async def main():
  26.     urls = [
  27.         "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&tn=44004473_102_oem_dg&wd=%E5%9B%BE%E7%89%87&rn=50",
  28.         "https://www.toutiao.com/article/7145668657396564518/",
  29.         "https://new.qq.com/rain/a/NEW2022092100053400",
  30.         "https://new.qq.com/rain/a/DSG2022092100053300"
  31.     ]
  32.     headers = {
  33.         "Content-Type": "application/json",
  34.         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
  35.     }
  36.     loop = asyncio.get_event_loop()
  37.     t = loop.time()
  38.     async with aiohttp.ClientSession(headers=headers) as session:
  39.         tasks = [asyncio.create_task(get_sign(session, url, f"{n}.png")) for n,url in enumerate(urls)]
  40.         await asyncio.gather(*tasks)
  41.     print(loop.time()-t)
  42. if __name__ == "__main__":
  43.     asyncio.run(main())
复制代码
集成到scrapy
  1. import json
  2. import logging
  3. from scrapy.exceptions import NotConfigured
  4. logger = logging.getLogger(__name__)
  5. class BrowserMiddleware(object):
  6.     def __init__(self, browser_base_url: str):
  7.         self.browser_base_url = browser_base_url
  8.         self.logger = logger
  9.         
  10.     @classmethod
  11.     def from_crawler(cls, crawler):
  12.         s = crawler.settings
  13.         browser_base_url = s.get('PYPPETEER_CLUSTER_URL')
  14.         if not browser_base_url:
  15.             raise NotConfigured
  16.         o = cls(browser_base_url)
  17.         return o
  18.    
  19.     def process_request(self, request, spider):
  20.         if "browser_options" not in request.meta or request.method != "GET":
  21.             return
  22.         browser_options = request.meta["browser_options"]
  23.         url = request.url
  24.         browser_options["url"] = url
  25.         uri = browser_options.get('browser_uri', "/render.html")
  26.         browser_url = self.browser_base_url.rstrip('/') + '/' + uri.lstrip('/')
  27.         new_request = request.replace(
  28.             url=browser_url,
  29.             method='POST',
  30.             body=json.dumps(browser_options)
  31.         )
  32.         new_request.meta["ori_url"] = url
  33.         return new_request
  34.     def process_response(self, request, response, spider):
  35.         if "browser_options" not in request.meta or "ori_url" not in request.meta:
  36.             return response
  37.         try:
  38.             datas = json.loads(response.text)
  39.         except json.decoder.JSONDecodeError:
  40.             return response.replace(url=url, status=500)
  41.         datas = self.deal_datas(datas)
  42.         url = request.meta["ori_url"]
  43.         new_response = response.replace(url=url, **datas)
  44.         return new_response
  45.    
  46.     def deal_datas(self, datas: dict) -> dict:
  47.         status = datas["status"]
  48.         text: str = datas.get('text') or datas.get('content')
  49.         headers = datas.get('headers')
  50.         response = {
  51.             "status": status,
  52.             "headers": headers,
  53.             "body": text.encode()
  54.         }
  55.         return response            
复制代码
开始想用aiohttp来请求,后面想了下,其实都要替换请求和响应,为什么不直接用scrapy的下载器
完整源代码

现在还只是个半成品玩具,还没有用于实际生产中,集群打包也没做。有兴趣的话可以自己完善一下
如果感兴趣的人比较多,后面也会系统的完善一下,打包成docker和发布第三方库到pypi
github:https://github.com/kanadeblisst00/browser_cluster

来源:https://www.cnblogs.com/kanadeblisst/p/17792187.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
来自手机

举报 回复 使用道具