This content originally appeared on DEV Community and was authored by drake
#!/usr/bin/env python3
"""
人类光标动作链模拟的GMGN绕过器
加入最符合人类特征的鼠标轨迹和行为模拟
"""
import asyncio
import time
import json
import re
import logging
import random
import math
from datetime import datetime
from typing import Optional, Dict, List, Tuple
from enum import Enum
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Human Cursor Bypass')
class ChallengePlatform(Enum):
"""Cloudflare challenge platform types."""
JAVASCRIPT = "non-interactive"
MANAGED = "managed"
INTERACTIVE = "interactive"
class HumanCursorBypass:
"""
人类光标动作链模拟的GMGN绕过器
"""
def __init__(self):
self._timeout = 30
self.task_holders_status = False
self.api_data = None
self.success_count = 0
self.total_attempts = 0
# 人类行为参数
self.human_params = {
'mouse_speed': (0.8, 2.5), # 鼠标移动速度范围
'reaction_time': (0.8, 2.5), # 人类反应时间
'movement_noise': 0.15, # 移动轨迹噪声
'overshoot_prob': 0.3, # 过冲概率
'hesitation_prob': 0.4, # 犹豫概率
'micro_movement_prob': 0.6, # 微小移动概率
}
# 用户代理
self.primary_ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
def _get_turnstile_frame(self, page) -> Optional[Frame]:
"""获取Cloudflare turnstile frame"""
frame = page.frame(
url=re.compile(
"https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
),
)
return frame
async def cookies(self, page) -> Optional[str]:
"""获取cf_clearance cookie"""
cookies = await page.context.cookies()
if not cookies:
return None
for cookie in cookies:
if cookie["name"] == "cf_clearance":
return cookie["value"]
return None
async def detect_challenge(self, page) -> Optional[str]:
"""检测Cloudflare挑战类型"""
try:
html = await page.content()
for platform in ChallengePlatform:
if f"cType: '{platform.value}'" in html:
return platform.value
return None
except Exception as e:
logger.error(f"挑战检测出错: {e}")
return None
async def generate_human_curve(self, start_x: float, start_y: float,
end_x: float, end_y: float) -> List[Tuple[float, float, float]]:
"""
生成人类式的贝塞尔曲线轨迹
返回: [(x, y, delay), ...]
"""
# 计算距离和基础参数
distance = math.sqrt((end_x - start_x)**2 + (end_y - start_y)**2)
# 根据距离调整参数
if distance < 50:
steps = random.randint(3, 8)
speed_factor = random.uniform(1.5, 3.0)
elif distance < 200:
steps = random.randint(8, 15)
speed_factor = random.uniform(1.0, 2.0)
else:
steps = random.randint(15, 25)
speed_factor = random.uniform(0.8, 1.5)
# 生成控制点 - 模拟人类的非直线移动
mid_x = (start_x + end_x) / 2
mid_y = (start_y + end_y) / 2
# 添加轨迹弯曲
curve_intensity = random.uniform(0.1, 0.4) * distance
angle = math.atan2(end_y - start_y, end_x - start_x)
# 控制点偏移
ctrl1_offset = random.uniform(-curve_intensity, curve_intensity)
ctrl2_offset = random.uniform(-curve_intensity, curve_intensity)
ctrl1_x = start_x + (end_x - start_x) * 0.33 + math.sin(angle + math.pi/2) * ctrl1_offset
ctrl1_y = start_y + (end_y - start_y) * 0.33 + math.cos(angle + math.pi/2) * ctrl1_offset
ctrl2_x = start_x + (end_x - start_x) * 0.66 + math.sin(angle + math.pi/2) * ctrl2_offset
ctrl2_y = start_y + (end_y - start_y) * 0.66 + math.cos(angle + math.pi/2) * ctrl2_offset
points = []
for i in range(steps):
t = i / (steps - 1)
# 贝塞尔曲线公式
x = (1-t)**3 * start_x + 3*(1-t)**2*t * ctrl1_x + 3*(1-t)*t**2 * ctrl2_x + t**3 * end_x
y = (1-t)**3 * start_y + 3*(1-t)**2*t * ctrl1_y + 3*(1-t)*t**2 * ctrl2_y + t**3 * end_y
# 添加随机噪声模拟手抖
noise_x = random.uniform(-self.human_params['movement_noise'], self.human_params['movement_noise'])
noise_y = random.uniform(-self.human_params['movement_noise'], self.human_params['movement_noise'])
x += noise_x
y += noise_y
# 计算延迟 - 模拟人类的非匀速移动
if i == 0:
delay = random.uniform(0.01, 0.03)
elif i == steps - 1:
delay = random.uniform(0.05, 0.15) # 到达目标前的停顿
else:
# 中间点的延迟,模拟加速和减速
progress = i / steps
if progress < 0.3: # 加速阶段
delay = random.uniform(0.02, 0.05)
elif progress > 0.7: # 减速阶段
delay = random.uniform(0.03, 0.08)
else: # 匀速阶段
delay = random.uniform(0.01, 0.04)
# 速度调整
delay *= speed_factor
points.append((x, y, delay))
return points
async def human_mouse_move(self, page, target_x: float, target_y: float):
"""
人类式鼠标移动到目标位置
"""
try:
# 获取当前鼠标位置(模拟,因为playwright无法获取)
viewport = await page.viewport_size()
if not viewport:
return
# 假设当前位置
current_x = random.randint(viewport['width']//4, viewport['width']*3//4)
current_y = random.randint(viewport['height']//4, viewport['height']*3//4)
logger.debug(f"人类式移动: ({current_x}, {current_y}) -> ({target_x}, {target_y})")
# 生成人类轨迹
trajectory = await self.generate_human_curve(current_x, current_y, target_x, target_y)
# 执行移动
for i, (x, y, delay) in enumerate(trajectory):
await page.mouse.move(x, y)
await asyncio.sleep(delay)
# 随机犹豫
if random.random() < self.human_params['hesitation_prob']:
hesitation_time = random.uniform(0.1, 0.4)
await asyncio.sleep(hesitation_time)
logger.debug(f"犹豫停顿: {hesitation_time:.2f}s")
# 到达目标后的微调
await self._target_micro_adjustments(page, target_x, target_y)
except Exception as e:
logger.debug(f"人类鼠标移动失败: {e}")
async def _target_micro_adjustments(self, page, target_x: float, target_y: float):
"""目标位置的微调动作"""
try:
# 过冲效果
if random.random() < self.human_params['overshoot_prob']:
overshoot_x = target_x + random.uniform(-8, 8)
overshoot_y = target_y + random.uniform(-8, 8)
await page.mouse.move(overshoot_x, overshoot_y)
await asyncio.sleep(random.uniform(0.05, 0.15))
logger.debug("执行过冲修正")
# 微小调整
if random.random() < self.human_params['micro_movement_prob']:
for _ in range(random.randint(1, 3)):
micro_x = target_x + random.uniform(-3, 3)
micro_y = target_y + random.uniform(-3, 3)
await page.mouse.move(micro_x, micro_y)
await asyncio.sleep(random.uniform(0.02, 0.08))
# 最终精确定位
await page.mouse.move(target_x, target_y)
await asyncio.sleep(random.uniform(0.1, 0.3))
except Exception as e:
logger.debug(f"微调失败: {e}")
async def human_click_sequence(self, page, x: float, y: float):
"""
人类式点击序列
"""
try:
# 移动到目标
await self.human_mouse_move(page, x, y)
# 点击前的反应时间
reaction_time = random.uniform(*self.human_params['reaction_time'])
await asyncio.sleep(reaction_time)
logger.debug(f"反应时间: {reaction_time:.2f}s")
# 点击前的最后微调
final_x = x + random.uniform(-2, 2)
final_y = y + random.uniform(-2, 2)
await page.mouse.move(final_x, final_y)
await asyncio.sleep(random.uniform(0.05, 0.15))
# 执行点击
click_duration = random.uniform(0.08, 0.25) # 按压时间
await page.mouse.down()
await asyncio.sleep(click_duration)
await page.mouse.up()
logger.debug(f"执行点击: ({final_x:.1f}, {final_y:.1f}), 按压时长: {click_duration:.2f}s")
# 点击后的自然停顿
post_click_delay = random.uniform(0.3, 0.8)
await asyncio.sleep(post_click_delay)
except Exception as e:
logger.error(f"人类点击序列失败: {e}")
async def advanced_human_behavior(self, page):
"""
高级人类行为模拟
"""
try:
viewport = await page.viewport_size()
if not viewport:
return
# 1. 页面浏览行为
await self._simulate_page_browsing(page, viewport)
# 2. 注意力转移
await self._simulate_attention_shifts(page, viewport)
# 3. 阅读模式
await self._simulate_reading_pattern(page, viewport)
except Exception as e:
logger.debug(f"高级行为模拟失败: {e}")
async def _simulate_page_browsing(self, page, viewport):
"""模拟页面浏览"""
try:
# 随机浏览几个位置
browse_points = random.randint(2, 5)
for _ in range(browse_points):
# 选择浏览位置
x = random.randint(100, viewport['width'] - 100)
y = random.randint(100, viewport['height'] - 100)
# 移动并停留
await self.human_mouse_move(page, x, y)
browse_time = random.uniform(0.5, 2.0)
await asyncio.sleep(browse_time)
# 偶尔滚动
if random.random() < 0.4:
scroll_amount = random.randint(-200, 200)
await page.mouse.wheel(0, scroll_amount)
await asyncio.sleep(random.uniform(0.3, 0.8))
except Exception as e:
logger.debug(f"页面浏览模拟失败: {e}")
async def _simulate_attention_shifts(self, page, viewport):
"""模拟注意力转移"""
try:
# 快速扫视几个位置
scan_points = random.randint(3, 6)
for _ in range(scan_points):
x = random.randint(50, viewport['width'] - 50)
y = random.randint(50, viewport['height'] - 50)
# 快速移动
await page.mouse.move(x, y)
await asyncio.sleep(random.uniform(0.1, 0.3))
except Exception as e:
logger.debug(f"注意力转移模拟失败: {e}")
async def _simulate_reading_pattern(self, page, viewport):
"""模拟阅读模式"""
try:
# 模拟从左到右的阅读轨迹
start_x = random.randint(50, 150)
start_y = random.randint(200, 400)
# 阅读几行
lines = random.randint(2, 4)
for line in range(lines):
y = start_y + line * random.randint(20, 40)
# 行内移动
for pos in range(random.randint(3, 7)):
x = start_x + pos * random.randint(30, 80)
if x > viewport['width'] - 100:
break
await page.mouse.move(x, y)
await asyncio.sleep(random.uniform(0.05, 0.2))
# 行间停顿
await asyncio.sleep(random.uniform(0.2, 0.5))
except Exception as e:
logger.debug(f"阅读模式模拟失败: {e}")
async def solve_challenge(self, page) -> None:
"""解决Cloudflare挑战 - 人类行为版本"""
verify_button_pattern = re.compile(
"Verify (I am|you are) (not a bot|(a )?human)"
)
verify_button = page.get_by_role("button", name=verify_button_pattern)
challenge_spinner = page.locator("#challenge-spinner")
challenge_stage = page.locator("#challenge-stage")
start_timestamp = datetime.now()
cookies = await self.cookies(page)
challenge_type = await self.detect_challenge(page)
while (
cookies is None
and challenge_type is not None
and (datetime.now() - start_timestamp).seconds < self._timeout
):
# 高级人类行为模拟
await self.advanced_human_behavior(page)
if await challenge_spinner.is_visible():
await challenge_spinner.wait_for(state="hidden")
turnstile_frame = self._get_turnstile_frame(page)
if await verify_button.is_visible():
# 人类式按钮点击
button_box = await verify_button.bounding_box()
if button_box:
click_x = button_box['x'] + button_box['width'] * random.uniform(0.3, 0.7)
click_y = button_box['y'] + button_box['height'] * random.uniform(0.3, 0.7)
await self.human_click_sequence(page, click_x, click_y)
await challenge_stage.wait_for(state="hidden")
elif turnstile_frame is not None:
# 人类式Turnstile处理
await self._human_turnstile_interaction(page)
await challenge_stage.wait_for(state="hidden")
await page.wait_for_timeout(random.randint(300, 800))
cookies = await self.cookies(page)
async def _human_turnstile_interaction(self, page):
"""人类式Turnstile交互"""
try:
strategies = [
# 策略1: 查找checkbox并人类式点击
lambda: self._human_click_checkbox(page),
# 策略2: 固定位置人类式点击
lambda: self.human_click_sequence(page, 210, 290),
# 策略3: 查找turnstile元素并人类式点击
lambda: self._human_click_turnstile_element(page)
]
for i, strategy in enumerate(strategies):
try:
success = await strategy()
if success:
logger.info(f"人类式Turnstile策略 {i+1} 成功")
break
else:
await asyncio.sleep(random.uniform(0.5, 1.0))
except Exception as e:
logger.debug(f"人类式Turnstile策略 {i+1} 失败: {e}")
continue
except Exception as e:
logger.error(f"人类式Turnstile交互失败: {e}")
async def _human_click_checkbox(self, page) -> bool:
"""人类式点击checkbox"""
try:
checkbox = await page.query_selector('input[type="checkbox"]')
if checkbox and await checkbox.is_visible():
box = await checkbox.bounding_box()
if box:
# 计算点击位置
click_x = box['x'] + box['width'] * random.uniform(0.2, 0.8)
click_y = box['y'] + box['height'] * random.uniform(0.2, 0.8)
await self.human_click_sequence(page, click_x, click_y)
return True
except:
pass
return False
async def _human_click_turnstile_element(self, page) -> bool:
"""人类式点击turnstile元素"""
selectors = ['.cf-turnstile', '[data-sitekey]', 'iframe[src*="turnstile"]']
for selector in selectors:
try:
element = await page.query_selector(selector)
if element and await element.is_visible():
box = await element.bounding_box()
if box:
click_x = box['x'] + box['width'] * random.uniform(0.3, 0.7)
click_y = box['y'] + box['height'] * random.uniform(0.3, 0.7)
await self.human_click_sequence(page, click_x, click_y)
return True
except:
continue
return False
async def detect(self, page):
"""核心CF检测方法"""
clearance_cookie = await self.cookies(page)
if clearance_cookie is None:
challenge_platform = await self.detect_challenge(page)
if challenge_platform is None:
logger.info("未检测到Cloudflare挑战")
return
logger.info(f"解决Cloudflare挑战 [{challenge_platform}]...")
try:
await self.solve_challenge(page)
logger.info("Cloudflare挑战解决完成")
except PlaywrightError as err:
logger.error(f"解决挑战时出错: {err}")
else:
logger.info("已有cf_clearance cookie,无需解决挑战")
async def on_response(self, response):
"""响应拦截器"""
if not response.ok:
return
try:
url = response.url
if any(pattern in url for pattern in [
'smartmoney', 'walletstat', 'token_holders',
'defi/quotation', 'api/v1', '/vas/api/'
]):
logger.info(f"拦截到API响应: {url}")
try:
response_text = await response.text()
if response_text:
# 尝试解析JSON
data = json.loads(response_text)
logger.info(f"✅ 成功获取API数据: {len(response_text)} 字符")
self.api_data = data
self.task_holders_status = True
# 显示数据摘要
if isinstance(data, dict):
logger.info(f"📊 数据字段: {list(data.keys())[:5]}")
if 'code' in data:
logger.info(f"📋 响应代码: {data['code']}")
except json.JSONDecodeError:
logger.warning("响应不是有效JSON")
except Exception as e:
logger.error(f"处理响应数据时出错: {e}")
except Exception as e:
logger.error(f"响应拦截器出错: {e}")
async def bypass_gmgn_api(self, url: str) -> dict:
"""
人类光标动作链版绕过方法
"""
self.total_attempts += 1
result = {
'url': url,
'success': False,
'data': None,
'error': None
}
try:
async with async_playwright() as p:
# 保持成功的启动配置
launch_data = {
"headless": False, # 核心要素:有头浏览器
"args": [
'--no-sandbox',
'--disable-setuid-sandbox'
]
}
browser = await p.chromium.launch(**launch_data)
# 保持成功的context创建方式
context = await browser.new_context(user_agent=self.primary_ua)
context.set_default_timeout(self._timeout * 1000)
page = await context.new_page()
# 设置响应拦截器
page.on('response', self.on_response)
# 初始化状态
self.task_holders_status = False
self.api_data = None
logger.info(f"访问URL(人类行为模式): {url}")
# 保持成功的访问方式
await page.goto(url)
# 保持成功的等待时间
await asyncio.sleep(10)
# 应用人类式CF检测
await self.detect(page)
# 保持成功的等待时间
await asyncio.sleep(5)
# 检查是否成功
if self.task_holders_status and self.api_data:
logger.info("✅ 人类光标动作链绕过成功!")
result.update({
'success': True,
'data': self.api_data
})
self.success_count += 1
else:
logger.warning("未能获取到API数据")
result['error'] = "未能获取到API数据"
await context.close()
await browser.close()
except Exception as e:
logger.error(f"人类行为绕过过程中出错: {e}")
result['error'] = str(e)
return result
def get_success_rate(self) -> float:
"""获取成功率"""
if self.total_attempts == 0:
return 0.0
return (self.success_count / self.total_attempts) * 100
async def test_human_cursor_bypass():
"""测试人类光标动作链绕过"""
logger.info("🚀 测试人类光标动作链GMGN绕过器")
logger.info("=" * 80)
test_url = "https://gmgn.ai/defi/quotation/v1/smartmoney/bsc/walletstat/0x433c51ecf15f2e8ac9bbaa16bd4c848fe6f4b64a?device_id=d70f0ce8-9ed6-4ad6-bf0e-526716c10343&client_id=gmgn_web_20250709-940-10907f4&from_app=gmgn&app_ver=20250709-940-10907f4&tz_name=Asia%2FShanghai&tz_offset=28800&app_lang=en-US&fp_did=c7b93352f7582ce7103c87a70be152a0&os=web&token_address=0x0f7895dab3f8a7f9cc438fa76e7a793e2bd50968&period=1d"
bypasser = HumanCursorBypass()
# 执行测试
result = await bypasser.bypass_gmgn_api(test_url)
print("\n" + "=" * 80)
print("📊 人类光标动作链测试结果")
print("=" * 80)
if result['success']:
print("✅ 测试成功!")
print("🖱 使用了真实人类光标轨迹模拟")
print("🎯 包含贝塞尔曲线、过冲、犹豫、微调等人类特征")
if isinstance(result['data'], dict):
print(f"📊 数据字段: {list(result['data'].keys())[:5]}")
# 显示关键数据
for key in ['code', 'msg', 'data']:
if key in result['data']:
value = result['data'][key]
if isinstance(value, (str, int, float, bool)):
print(f"📋 {key}: {value}")
else:
print(f"📋 {key}: {type(value)}")
print(f"\n🎯 成功率: {bypasser.get_success_rate():.1f}%")
print("🎉 人类光标动作链绕过验证成功!")
print("💪 现在具备最真实的人类行为模拟!")
return True
else:
print("❌ 测试失败")
print(f"❗ 错误: {result['error']}")
print(f"📈 成功率: {bypasser.get_success_rate():.1f}%")
return False
if __name__ == "__main__":
try:
success = asyncio.run(test_human_cursor_bypass())
exit(0 if success else 1)
except KeyboardInterrupt:
print("\n⏹ 测试被中断")
exit(1)
except Exception as e:
print(f"\n💥 测试异常: {e}")
exit(1)
This content originally appeared on DEV Community and was authored by drake