反反爬浏览器自动化patchright



This content originally appeared on DEV Community and was authored by drake

import asyncio
import json
import re
import logging
from enum import Enum
from datetime import datetime
from typing import Optional
from traceback import format_exc
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('GMGN Holders Tag')

class ChallengePlatform(Enum):
    """Cloudflare challenge platform types."""

    JAVASCRIPT = "non-interactive"
    MANAGED = "managed"
    INTERACTIVE = "interactive"

class PumpRanks:
    spider_name = 'gmgn_tags'
    def __init__(self):
        self._timeout = 30

    async def on_response(self, response):
        """
        拦截响应
        数据结构 gmgn.json
        """
        if not response.ok:
            return
        if '/v1/rank/sol/pump_ranks/1h' in response.url:
            oridata = await response.body()
            format_data = json.loads(oridata)
            data = format_data['data']
            completeds = data['completeds']
            for c in completeds:
                print(c)

    def _get_turnstile_frame(self, page) -> Optional[Frame]:
            """
            Get the Cloudflare turnstile frame.

            Returns
            -------
            Optional[Frame]
                The Cloudflare turnstile frame.
            """
            frame = page.frame(
                url=re.compile(
                    "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
                ),
            )
            return frame

    async def cookies(self, page) -> Optional[str]:
        """The cookies from the current page."""
        cookies = await page.context.cookies()
        if not cookies:
            return None
        for cookie in cookies:
            if cookie["name"] == "cf_clearance":
                return cookie["value"]
        return None

    async def detect_challenge(self, page) -> Optional[str]:
        """
        Detect the Cloudflare challenge platform on the current page.

        Returns
        -------
        Optional[ChallengePlatform]
            The Cloudflare challenge platform.
        """
        html = await page.content()
        for platform in ChallengePlatform:
            if f"cType: '{platform.value}'" in html:
                return platform.value
        return None

    async def solve_challenge(self, page) -> None:
        """Solve the Cloudflare challenge on the current page."""
        verify_button_pattern = re.compile(
            "Verify (I am|you are) (not a bot|(a )?human)"
        )

        verify_button = page.get_by_role("button", name=verify_button_pattern)
        challenge_spinner = page.locator("#challenge-spinner")
        challenge_stage = page.locator("#challenge-stage")
        start_timestamp = datetime.now()

        cookies = await self.cookies(page)
        challenge_type = await self.detect_challenge(page)
        while (
            cookies is None
            and challenge_type is not None
            and (datetime.now() - start_timestamp).seconds < self._timeout
        ):
            if await challenge_spinner.is_visible():
                await challenge_spinner.wait_for(state="hidden")

            turnstile_frame = self._get_turnstile_frame(page)

            if await verify_button.is_visible():
                await verify_button.click()
                await challenge_stage.wait_for(state="hidden")
            elif turnstile_frame is not None:
                await page.mouse.click(210, 290)
                await challenge_stage.wait_for(state="hidden")

            await page.wait_for_timeout(250)

    async def detect(self, page):
        """
        破解CloudFlare
        """
        clearance_cookie = await self.cookies(page)
        if clearance_cookie is None:
            challenge_platform = await self.detect_challenge(page)

            if challenge_platform is None:
                logging.error("No Cloudflare challenge detected.")
                return
            logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")

            try:
                await self.solve_challenge(page)
            except PlaywrightError as err:
                logging.error(err)

    async def run_local(self, proxy=None):
        async with async_playwright() as p:
            # 必须得是有头浏览器,否则过不了Cloudflare
            launch_data = {
                "headless": False,
                "proxy": proxy
            }

            user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
            browser = await p.chromium.launch(**launch_data)
            context = await browser.new_context(user_agent=user_agent)
            timeout = 30
            context.set_default_timeout(timeout * 1000)
            page = await context.new_page()
            # 监听请求流
            page.on('response', self.on_response)

            url = 'https://gmgn.ai/meme/LXtw30LM?chain=sol'
            # 访问目标地址
            await page.goto(url)
            await asyncio.sleep(10)
            await self.detect(page)
            await  page.wait_for_function("() => window.x > 0", timeout=0)

    async def run_aws(self):
        """
        在AWS服务器启动
        """
        # proxy = self.proxy
        from pyvirtualdisplay import Display
        with Display():
            try:
                await self.run_local()
            except:
                logger.error(f'浏览器异常:{format_exc()}')


    def task(self):
        if env == 'local':
            asyncio.run(self.run_local())
        else:
            asyncio.run(self.run_aws())

    def run(self):
        self.task()

if __name__ == '__main__':
    env = 'prod'
    PumpRanks().run()


This content originally appeared on DEV Community and was authored by drake