أنماط لحل 1000 إلى 100000+ اختبار CAPTCHA في الساعة بشكل موثوق.
إذا كان فريقك لا يزال في نطاق بضع مئات من المهام يوميًا، فغالبًا أنك لا تحتاج إلى هذه الدرجة من التعقيد بعد. قيمة هذا الدليل تظهر عندما تبدأ طوابير الحل بالتراكم، أو عندما يصبح وقت الانتظار وعدم استقرار واجهة API جزءًا من المشكلة التشغيلية نفسها.
كيف تختار النمط قبل البناء؟
| السؤال العملي | النمط الأقرب | السبب |
|---|---|---|
| هل الحجم ما زال متوسطًا ويمكن تشغيله داخل عملية واحدة؟ | تجمع العمال | أقل تعقيدًا وأسهل في التشغيل |
| هل تحتاج ضغطًا خلفيًا وفصلًا واضحًا بين الإرسال والاستطلاع؟ | خط أنابيب قائم على قائمة انتظار | يمنع اختناق الطلبات ويراقب المراحل بوضوح |
| هل المشكلة الأساسية هي تدهور API أو ارتفاع الأخطاء؟ | قاطع الدائرة | يحمي الأنظمة التابعة من الانهيار المتسلسل |
| هل تحتاج استجابة فورية من رموز محلولة مسبقًا؟ | مخزن مؤقت للرموز | يقلل زمن الانتظار في المسارات الحساسة |
| هل يتوقف العمل بالكامل عندما يتعطل موفّر واحد؟ | تجاوز فشل متعدد المزوّدين | يضيف استمرارية تشغيل بدل الاعتماد على نقطة واحدة |
النمط 1: تجمع العمال البسيط
الأفضل لـ: 100-1000 حل/hour.
┌──────────┐ ┌──────────────┐ ┌────────────┐
│ Scraper │────▶│ Thread Pool │────▶│ CaptchaAI │
│ Tasks │ │ (5-20 │ │ API │
│ │◀────│ workers) │◀────│ │
└──────────┘ └──────────────┘ └────────────┘
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
class SimpleWorkerPool:
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.max_workers = max_workers
self.base = "https://ocr.captchaai.com"
def _solve_one(self, params):
params["key"] = self.api_key
params["json"] = 1
resp = requests.post(f"{self.base}/in.php", data=params).json()
if resp["status"] != 1:
return {"error": resp["request"]}
task_id = resp["request"]
time.sleep(10)
for _ in range(60):
result = requests.get(
f"{self.base}/res.php",
params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
).json()
if result["request"] == "CAPCHA_NOT_READY":
time.sleep(5)
continue
if result["status"] == 1:
return {"token": result["request"]}
return {"error": result["request"]}
return {"error": "timeout"}
def solve_batch(self, tasks):
"""tasks: list of (identifier, params) tuples."""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {
pool.submit(self._solve_one, params): ident
for ident, params in tasks
}
for future in as_completed(futures):
ident = futures[future]
try:
results[ident] = future.result()
except Exception as e:
results[ident] = {"error": str(e)}
return results
النمط 2: خط الأنابيب القائم على قائمة الانتظار
الأفضل لـ: 1000-10000 حل /hour مع التحكم في الضغط الخلفي.
┌────────┐ ┌───────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐
│Producer│────▶│ Submit │────▶│ Pending │────▶│ Poll │────▶│Results │
│ │ │ Queue │ │ Queue │ │ Workers │ │ Queue │
└────────┘ └───────────┘ └──────────┘ └───────────┘ └────────┘
import queue
import threading
import time
import requests
class QueuePipeline:
def __init__(self, api_key, submit_workers=5, poll_workers=10):
self.api_key = api_key
self.base = "https://ocr.captchaai.com"
self.submit_queue = queue.Queue(maxsize=100)
self.pending_queue = queue.Queue()
self.results = {}
self.results_lock = threading.Lock()
self._running = False
self.submit_workers = submit_workers
self.poll_workers = poll_workers
def start(self):
self._running = True
for _ in range(self.submit_workers):
threading.Thread(target=self._submit_worker, daemon=True).start()
for _ in range(self.poll_workers):
threading.Thread(target=self._poll_worker, daemon=True).start()
def stop(self):
self._running = False
def add(self, ident, params):
self.submit_queue.put((ident, params))
def get_result(self, ident, timeout=300):
deadline = time.time() + timeout
while time.time() < deadline:
with self.results_lock:
if ident in self.results:
return self.results.pop(ident)
time.sleep(1)
return {"error": "timeout"}
def _submit_worker(self):
while self._running:
try:
ident, params = self.submit_queue.get(timeout=1)
except queue.Empty:
continue
params["key"] = self.api_key
params["json"] = 1
try:
resp = requests.post(f"{self.base}/in.php", data=params).json()
if resp["status"] == 1:
self.pending_queue.put((ident, resp["request"], time.time()))
else:
with self.results_lock:
self.results[ident] = {"error": resp["request"]}
except Exception as e:
with self.results_lock:
self.results[ident] = {"error": str(e)}
def _poll_worker(self):
while self._running:
try:
ident, task_id, submitted_at = self.pending_queue.get(timeout=1)
except queue.Empty:
continue
# Wait at least 10s from submission
wait = 10 - (time.time() - submitted_at)
if wait > 0:
time.sleep(wait)
try:
resp = requests.get(
f"{self.base}/res.php",
params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
).json()
if resp["request"] == "CAPCHA_NOT_READY":
self.pending_queue.put((ident, task_id, submitted_at))
time.sleep(3)
elif resp["status"] == 1:
with self.results_lock:
self.results[ident] = {"token": resp["request"]}
else:
with self.results_lock:
self.results[ident] = {"error": resp["request"]}
except Exception:
self.pending_queue.put((ident, task_id, submitted_at))
time.sleep(5)
** الاستخدام: **
pipeline = QueuePipeline("YOUR_API_KEY")
pipeline.start()
# Add CAPTCHAs
pipeline.add("page_1", {"method": "turnstile", "sitekey": "KEY", "pageurl": "URL"})
pipeline.add("page_2", {"method": "userrecaptcha", "googlekey": "KEY", "pageurl": "URL"})
# Get results
result1 = pipeline.get_result("page_1")
result2 = pipeline.get_result("page_2")
pipeline.stop()
النمط 3: قواطع دوائر
يمنع حالات الفشل المتتالية عندما تتدهور واجهة برمجة التطبيقات (API) للحل.
import time
import threading
class CircuitBreaker:
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject requests
HALF_OPEN = "half_open" # Testing recovery
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.lock = threading.Lock()
def can_proceed(self):
with self.lock:
if self.state == self.CLOSED:
return True
if self.state == self.OPEN:
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = self.HALF_OPEN
return True
return False
# HALF_OPEN: allow one request
return True
def record_success(self):
with self.lock:
self.failure_count = 0
self.state = self.CLOSED
def record_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
class ResilientSolver:
def __init__(self, api_key):
self.api_key = api_key
self.breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)
def solve(self, params):
if not self.breaker.can_proceed():
raise Exception("Circuit open — API degraded, try later")
try:
result = self._do_solve(params)
self.breaker.record_success()
return result
except Exception as e:
self.breaker.record_failure()
raise
def _do_solve(self, params):
# Standard solve logic
pass
النمط 4: الحل المسبق باستخدام المخزن المؤقت للرمز المميز
احتفظ بمخزن مؤقت من الرموز المميزة التي تم حلها مسبقًا للاستخدام الفوري.
import queue
import threading
import time
class TokenBuffer:
def __init__(self, solver, params, buffer_size=5, ttl_seconds=90):
self.solver = solver
self.params = params
self.buffer = queue.Queue(maxsize=buffer_size)
self.ttl = ttl_seconds
self.buffer_size = buffer_size
self._running = False
def start(self):
self._running = True
threading.Thread(target=self._fill_loop, daemon=True).start()
def stop(self):
self._running = False
def get_token(self, timeout=30):
"""Get a pre-solved token. Returns None if buffer empty."""
try:
token, created_at = self.buffer.get(timeout=timeout)
if time.time() - created_at > self.ttl:
# Token expired, try next
return self.get_token(timeout=timeout)
return token
except queue.Empty:
return None
def _fill_loop(self):
while self._running:
if self.buffer.qsize() < self.buffer_size:
try:
token = self.solver.solve(self.params)
self.buffer.put((token, time.time()))
except Exception:
time.sleep(5)
else:
time.sleep(2)
النمط 5: تجاوز فشل الموفرين المتعددين
الطريق إلى موفري النسخ الاحتياطي عند فشل الأساسي.
class MultiProviderSolver:
def __init__(self, providers):
"""providers: list of (name, solver_instance, priority) tuples."""
self.providers = sorted(providers, key=lambda x: x[2])
self.breakers = {name: CircuitBreaker() for name, _, _ in providers}
def solve(self, params):
errors = []
for name, solver, _ in self.providers:
if not self.breakers[name].can_proceed():
continue
try:
result = solver.solve(params)
self.breakers[name].record_success()
return result
except Exception as e:
self.breakers[name].record_failure()
errors.append(f"{name}: {e}")
raise Exception(f"All providers failed: {'; '.join(errors)}")
إرشادات القياس
| الحجم | الهندسة المعمارية | العمال | ملاحظات |
|---|---|---|---|
| <100/hr | مكالمات مباشرة | 1-3 | لا حاجة لهندسة معمارية خاصة |
| 100-1K/hr | تجمع العمال | 5-10 | النمط 1 |
| 1K-10K/hr | خط أنابيب قائمة الانتظار | 10-30 | النموذج 2 + قاطع الدائرة الكهربائية |
| 10K-50K/hr | طوابير موزعة | 30-100 | Redis/RabbitMQ، أجهزة متعددة |
| 50 ألف+/hr | متعدد الموفر | 100+ | النموذج 5 + قائمة الانتظار الموزعة |
المراقبة على نطاق واسع
import logging
from collections import defaultdict
logger = logging.getLogger("captcha_scale")
class ScaleMetrics:
def __init__(self):
self.counts = defaultdict(int)
self.times = defaultdict(list)
def record(self, captcha_type, success, elapsed):
key = f"{captcha_type}_{'ok' if success else 'fail'}"
self.counts[key] += 1
self.times[captcha_type].append(elapsed)
def report(self):
for captcha_type in set(k.rsplit("_", 1)[0] for k in self.counts):
ok = self.counts.get(f"{captcha_type}_ok", 0)
fail = self.counts.get(f"{captcha_type}_fail", 0)
total = ok + fail
rate = (ok / total * 100) if total else 0
times = self.times.get(captcha_type, [])
avg_time = sum(times) / len(times) if times else 0
logger.info(
f"{captcha_type}: {total} solves, {rate:.1f}% success, {avg_time:.1f}s avg"
)
الأسئلة الشائعة
ما هو عدد الحلول المتزامنة التي يمكن لـ CaptchaAI التعامل معها؟
يمكن لـ CaptchaAI التعامل مع التزامن العالي. ابدأ بـ 10-20 طلبًا متزامنًا وقم بالزيادة بناءً على نتائجك.
هل يجب أن أستخدم المزامنة أو سلاسل المعالجة؟
تعمل مسارات التنفيذ بشكل جيد نظرًا لأن حل اختبار CAPTCHA هو I/O-bound (طلبات الشبكة + الانتظار). بالنسبة إلى Python 3.7+، يعد asyncio مع aiohttp ممتازًا أيضًا للتزامن العالي جدًا.
كيف يمكنني منع إرهاق واجهة برمجة التطبيقات (API)؟
استخدم قائمة انتظار ذات حجم محدد (الضغط الخلفي)، ومعدل محدد، وقواطع دوائر. مراقبة معدل ERROR_NO_SLOT_AVAILABLE الخاص بك.
أدلة ذات صلة
- التعامل مع CAPTCHA في اختبار نقاط نهاية API لنماذج الويب
- إنتاجية حل اختبار CAPTCHA: كيفية معالجة 10000 مهمة في الساعة
يمكنك البدء بالنمط الأبسط ثم الترقية تدريجيًا مع CaptchaAI كلما أصبح الحجم أو التفاوت في الحمل سببًا مباشرًا للمشكلات التشغيلية.