Downloader 下载器
下载器列表
maize
内置了一些常用的下载器:
maize.AioHttpDownloader
: 基于 aiohttp 封装的下载器maize.HTTPXDownloader
: 基于 httpx 封装的下载器maize.downloader.playwright_downloader.PlaywrightDownloader
: 基于 playwright 封装的下载器
自定义下载器
如果使用过程中,有特殊需求,需要自定义下载器,可以继承 BaseDownloader
实现自定义下载器。
import typing
from maize import BaseDownloader, Request, Response
if typing.TYPE_CHECKING:
from maize.core.crawler import Crawler
class CustomDownloader(BaseDownloader):
def __init__(self, crawler: "Crawler"):
super().__init__(crawler)
async def open(self):
await super().open()
# 您可以在这里初始化自定义下载器
async def close(self):
await super().close()
# 您可以在这里关闭自定义下载器
async def download(self, request: Request) -> typing.Optional[Response]:
"""
实现自定义下载逻辑
:param request: 请求对象
:return: 响应对象
"""
@staticmethod
def structure_response(
request: Request, response: typing.Any, body: bytes
) -> Response:
"""
构造响应对象
:param request: 请求对象
:param response: 自定义响应,您可以修改对象为您需要的格式
:param body: 响应体
:return: 响应对象
"""
async def process_error_request(self, request: Request):
"""
处理超过最大重试次数的请求。
此方法不强制实现,如果您未实现,则丢弃超过最大重试次数的请求。
:param request:
:return:
"""
示例
您也可以参考 maize.AioHttpDownloader
的源码实现,您的下载器需要使用异步的方式,否则无法发挥 maize
的性能。
import typing
from aiohttp import BaseConnector
from aiohttp import BasicAuth
from aiohttp import ClientResponse
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import TCPConnector
from aiohttp import TraceConfig
from aiohttp import TraceRequestStartParams
from maize.downloader.base_downloader import BaseDownloader
from maize.common.http.request import Request
from maize.common.http import Response
if typing.TYPE_CHECKING:
from maize.core.crawler import Crawler
class AioHttpDownloader(BaseDownloader):
def __init__(self, crawler: "Crawler"):
super().__init__(crawler)
self.session: typing.Optional[ClientSession] = None
self.connector: typing.Optional[BaseConnector] = None
self._verify_ssl: typing.Optional[bool] = None
self._timeout: typing.Optional[ClientTimeout] = None
self._use_session: typing.Optional[bool] = None
self.trace_config: typing.Optional[TraceConfig] = None
self.proxy_tunnel: typing.Optional[str] = None
self.proxy_auth: typing.Optional[BasicAuth] = None
async def open(self):
await super().open()
request_timeout = self.crawler.settings.getint("REQUEST_TIMEOUT")
self._timeout = ClientTimeout(total=request_timeout)
self._verify_ssl = self.crawler.settings.getbool("VERIFY_SSL")
self._use_session = self.crawler.settings.getbool("USE_SESSION")
self.proxy_tunnel = self.crawler.settings.get("PROXY_TUNNEL")
proxy_tunnel_username = self.crawler.settings.get("PROXY_TUNNEL_USERNAME")
proxy_tunnel_password = self.crawler.settings.get("PROXY_TUNNEL_PASSWORD")
if proxy_tunnel_username and proxy_tunnel_password:
self.proxy_auth = BasicAuth(proxy_tunnel_username, proxy_tunnel_password)
self.connector = TCPConnector(verify_ssl=self._verify_ssl)
self.trace_config = TraceConfig()
self.trace_config.on_request_start.append(self.request_start)
if self._use_session:
self.session = ClientSession(
connector=self.connector,
timeout=self._timeout,
trace_configs=[self.trace_config],
)
async def download(self, request: Request) -> typing.Optional[Response | Request]:
try:
if self._use_session:
response = await self.send_request(self.session, request)
body = await response.content.read()
else:
connector = TCPConnector(verify_ssl=self._verify_ssl)
async with ClientSession(
connector=connector,
timeout=self._timeout,
trace_configs=[self.trace_config],
) as session:
response = await self.send_request(session, request)
body = await response.content.read()
except Exception as e:
if new_request := await self._download_retry(request, e):
return new_request
self.logger.error(f"Error during request: {e}")
return None
return self.structure_response(request, response, body)
@staticmethod
def structure_response(
request: Request, response: ClientResponse, body: bytes
) -> Response:
return Response(
url=request.url,
headers=dict(response.headers),
status=response.status,
body=body,
request=request,
)
async def send_request(
self, session: ClientSession, request: Request
) -> ClientResponse:
if request.proxy_username and request.proxy_password:
proxy_auth = BasicAuth(request.proxy_username, request.proxy_password)
else:
proxy_auth = self.proxy_auth
return await session.request(
method=request.method,
url=request.url,
params=request.params,
data=request.data,
headers=request.headers,
cookies=request.cookies,
proxy=request.proxy or self.proxy_tunnel,
proxy_auth=proxy_auth,
)
async def request_start(
self, _session, _trace_config_ctx, params: TraceRequestStartParams
):
self.logger.debug(
rf"request downloading: {params.url}, method: {params.method}"
)
async def close(self):
await super().close()
if self.connector:
await self.connector.close()
if self.session:
await self.session.close()