عند إجراء عملية تجريف على نطاق واسع، فإنك تحتاج إلى أكثر من حل اختبار CAPTCHA مرة واحدة. يقوم نظام قائمة الانتظار بفصل إرسال اختبار CAPTCHA عن استرجاع النتائج، مما يتيح الحل المتوازي عبر مئات المهام.
لماذا استخدام قائمة الانتظار؟
حل اختبار CAPTCHA بطلب واحد يضيع وقت الانتظار. نظام قائمة الانتظار:
- يرسل جميع اختبارات CAPTCHA على الفور
- يستقصي معرفات المهام المتعددة بالتوازي
- يتم حل المحاولات الفاشلة تلقائيًا
- يتحكم في التزامن لاحترام حدود معدل API
- يوفر تتبع التقدم وعمليات الاسترجاعات
قائمة انتظار المعالجة الأساسية
import time
import threading
import requests
from queue import Queue, Empty
API_KEY = "YOUR_API_KEY"
class CaptchaQueue:
"""Thread-based CAPTCHA solving queue."""
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.task_queue = Queue()
self.result_queue = Queue()
self.max_workers = max_workers
self.workers = []
def submit(self, method, callback=None, **params):
"""Add a CAPTCHA task to the queue."""
task = {
"method": method,
"params": params,
"callback": callback,
}
self.task_queue.put(task)
def start(self):
"""Start worker threads."""
for _ in range(self.max_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def wait(self):
"""Wait for all tasks to complete."""
self.task_queue.join()
def get_results(self):
"""Get all available results."""
results = []
while not self.result_queue.empty():
try:
results.append(self.result_queue.get_nowait())
except Empty:
break
return results
def _worker(self):
while True:
try:
task = self.task_queue.get(timeout=1)
except Empty:
continue
try:
result = self._solve(task["method"], **task["params"])
entry = {"status": "solved", "result": result, "task": task}
self.result_queue.put(entry)
if task["callback"]:
task["callback"](result)
except Exception as e:
entry = {"status": "error", "error": str(e), "task": task}
self.result_queue.put(entry)
finally:
self.task_queue.task_done()
def _solve(self, method, **params):
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}, timeout=30).json()
if submit.get("status") != 1:
raise Exception(f"Submit error: {submit.get('request')}")
task_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()
# Submit multiple CAPTCHAs
urls_and_sitekeys = [
("https://example.com/page1", "SITEKEY_1"),
("https://example.com/page2", "SITEKEY_2"),
("https://example.com/page3", "SITEKEY_3"),
]
for url, sitekey in urls_and_sitekeys:
queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)
queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
print(f" {r['status']}: {r.get('result', r.get('error', ''))[:50]}")
قائمة انتظار غير متزامنة مع غير متزامن
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class AsyncCaptchaQueue:
"""Async CAPTCHA solving queue with concurrency control."""
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def solve_batch(self, tasks):
"""Solve a batch of CAPTCHA tasks concurrently."""
coros = [self._solve_task(task) for task in tasks]
self.results = await asyncio.gather(*coros, return_exceptions=True)
return self.results
async def _solve_task(self, task):
async with self.semaphore:
return await self._solve(task["method"], **task["params"])
async def _solve(self, method, **params):
async with aiohttp.ClientSession() as session:
# Submit
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit error: {data.get('request')}")
task_id = data["request"]
# Poll
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
async def main():
queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(10)
]
results = await queue.solve_batch(tasks)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i}: ERROR — {result}")
else:
print(f"Task {i}: {result[:50]}...")
asyncio.run(main())
نمط المنتج والمستهلك
بالنسبة لأحمال عمل التجريد المستمرة حيث يتم اكتشاف الصفحات ديناميكيًا:
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class ProducerConsumerQueue:
"""Continuous CAPTCHA solving with producer-consumer pattern."""
def __init__(self, api_key, queue_size=100, num_consumers=5):
self.api_key = api_key
self.queue = asyncio.Queue(maxsize=queue_size)
self.num_consumers = num_consumers
self.solved_count = 0
self.error_count = 0
self.running = True
async def produce(self, tasks):
"""Producer: feed CAPTCHA tasks into the queue."""
for task in tasks:
await self.queue.put(task)
# Signal consumers to stop
for _ in range(self.num_consumers):
await self.queue.put(None)
async def consume(self, result_handler):
"""Consumer: solve CAPTCHAs and call result handler."""
async with aiohttp.ClientSession() as session:
while True:
task = await self.queue.get()
if task is None:
self.queue.task_done()
break
try:
result = await self._solve(session, task["method"], **task["params"])
self.solved_count += 1
if result_handler:
await result_handler(task, result)
except Exception as e:
self.error_count += 1
print(f"Error: {e}")
finally:
self.queue.task_done()
async def run(self, tasks, result_handler=None):
"""Run the producer-consumer pipeline."""
# Start producer
producer = asyncio.create_task(self.produce(tasks))
# Start consumers
consumers = [
asyncio.create_task(self.consume(result_handler))
for _ in range(self.num_consumers)
]
# Wait for everything to finish
await producer
await asyncio.gather(*consumers)
print(f"Complete: {self.solved_count} solved, {self.error_count} errors")
async def _solve(self, session, method, **params):
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit: {data.get('request')}")
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError("Timed out")
# Usage
async def handle_result(task, token):
url = task["params"]["pageurl"]
print(f"Solved for {url}: {token[:30]}...")
async def main():
queue = ProducerConsumerQueue(API_KEY, num_consumers=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(20)
]
await queue.run(tasks, result_handler=handle_result)
asyncio.run(main())
قائمة الانتظار ذات الأولوية
عندما تكون بعض اختبارات CAPTCHA أكثر أهمية من غيرها:
import asyncio
from dataclasses import dataclass, field
API_KEY = "YOUR_API_KEY"
@dataclass(order=True)
class PriorityTask:
priority: int
task: dict = field(compare=False)
class PriorityCaptchaQueue:
"""CAPTCHA queue with priority levels."""
def __init__(self, api_key, num_workers=5):
self.api_key = api_key
self.queue = asyncio.PriorityQueue()
self.num_workers = num_workers
self.results = {}
async def submit(self, task_id, method, priority=5, **params):
"""Submit with priority (lower number = higher priority)."""
await self.queue.put(PriorityTask(
priority=priority,
task={"id": task_id, "method": method, "params": params},
))
async def process(self):
"""Process all queued tasks by priority."""
workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]
# Wait for queue to drain
await self.queue.join()
# Cancel workers
for w in workers:
w.cancel()
return self.results
async def _worker(self):
import aiohttp
async with aiohttp.ClientSession() as session:
while True:
item = await self.queue.get()
task = item.task
try:
result = await self._solve(session, task["method"], **task["params"])
self.results[task["id"]] = {"status": "solved", "token": result}
except Exception as e:
self.results[task["id"]] = {"status": "error", "error": str(e)}
finally:
self.queue.task_done()
async def _solve(self, session, method, **params):
import aiohttp
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(data.get("request"))
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError()
# Usage
async def main():
pq = PriorityCaptchaQueue(API_KEY, num_workers=3)
# High priority — checkout pages
await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")
# Normal priority — product pages
for i in range(5):
await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")
# Low priority — info pages
for i in range(3):
await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")
results = await pq.process()
for task_id, result in results.items():
print(f"{task_id}: {result['status']}")
asyncio.run(main())
الرصد والإبلاغ
import time
from dataclasses import dataclass, field
@dataclass
class QueueMetrics:
submitted: int = 0
solved: int = 0
failed: int = 0
total_solve_time: float = 0.0
start_time: float = field(default_factory=time.time)
@property
def avg_solve_time(self):
return self.total_solve_time / self.solved if self.solved else 0
@property
def success_rate(self):
total = self.solved + self.failed
return (self.solved / total * 100) if total else 0
@property
def throughput(self):
elapsed = time.time() - self.start_time
return self.solved / elapsed * 60 if elapsed > 0 else 0
def report(self):
return (
f"Submitted: {self.submitted} | "
f"Solved: {self.solved} | "
f"Failed: {self.failed} | "
f"Avg time: {self.avg_solve_time:.1f}s | "
f"Success: {self.success_rate:.1f}% | "
f"Throughput: {self.throughput:.0f}/min"
)
استكشاف الأخطاء وإصلاحها
| أعراض | السبب | إصلاح |
|---|---|---|
| تتزايد قائمة الانتظار ولكن المهام لا تكتمل | عدد كبير جدًا من العمال يطغى على واجهة برمجة التطبيقات (API). | تقليل max_workers / max_concurrent |
ERROR_NO_SLOT_AVAILABLE |
تم الوصول إلى حد التزامن لواجهة برمجة التطبيقات (API). | أضف تأخيرًا بين عمليات الإرسال |
| المهام عالقة في قائمة الانتظار | ماتت خيوط العامل عند الاستثناء | لف حلقة العامل في محاولة /except |
| الذاكرة تنمو مع مرور الوقت | النتائج لا تستهلك | اتصل بـ get_results() بشكل دوري |
| كتل قائمة الانتظار غير المتزامنة | await مفقود |
تأكد من انتظار جميع المكالمات غير المتزامنة |
الأسئلة المتداولة
كم عدد الحلول المتزامنة التي يمكنني تشغيلها؟
يعالج CaptchaAI التزامن على جانب الخادم. ابدأ بـ 10 عمال متزامنين وقم بالزيادة بناءً على حدود خطتك. تحقق من ERROR_NO_SLOT_AVAILABLE لمعرفة متى يجب الاختناق.
هل يجب علي استخدام الترابط أو عدم التزامن؟
استخدم asyncio للمشاريع الجديدة - فهو يتعامل مع حل I/O-bound CAPTCHA بشكل أكثر كفاءة. استخدم الترابط في حالة الدمج في التعليمات البرمجية المتزامنة الموجودة.
كيف أتعامل مع حدود معدل API؟
استخدم إشارة (غير متزامنة) أو قائمة انتظار محدودة (مؤشر ترابط) لتحديد الطلبات المتزامنة. أضف مهلة قصيرة بين عمليات الإرسال إذا قمت بالضغط على ERROR_NO_SLOT_AVAILABLE.
ملخص
تعمل قائمة انتظار حل اختبار CAPTCHA على فصل الإرسال عن الاقتراع، مما يتيح الحل المتوازي معCaptchaAI. اختر الترابط للتعليمات البرمجية المتزامنة، وعدم المزامنة لـ Python الحديثة، والمنتج والمستهلك لأحمال العمل المستمرة.
مقالات ذات صلة
- بايثون Playwright Captchaai الدليل الكامل
- بناء خطوط أنابيب العميل Captcha Captchaai
- بناء الأتمتة المسؤولة Captchaai