المرجع

أنماط معمارية لحل CAPTCHA على نطاق واسع

أنماط لحل 1000 إلى 100000+ اختبار CAPTCHA في الساعة بشكل موثوق.

إذا كان فريقك لا يزال في نطاق بضع مئات من المهام يوميًا، فغالبًا أنك لا تحتاج إلى هذه الدرجة من التعقيد بعد. قيمة هذا الدليل تظهر عندما تبدأ طوابير الحل بالتراكم، أو عندما يصبح وقت الانتظار وعدم استقرار واجهة API جزءًا من المشكلة التشغيلية نفسها.

كيف تختار النمط قبل البناء؟

السؤال العملي النمط الأقرب السبب
هل الحجم ما زال متوسطًا ويمكن تشغيله داخل عملية واحدة؟ تجمع العمال أقل تعقيدًا وأسهل في التشغيل
هل تحتاج ضغطًا خلفيًا وفصلًا واضحًا بين الإرسال والاستطلاع؟ خط أنابيب قائم على قائمة انتظار يمنع اختناق الطلبات ويراقب المراحل بوضوح
هل المشكلة الأساسية هي تدهور API أو ارتفاع الأخطاء؟ قاطع الدائرة يحمي الأنظمة التابعة من الانهيار المتسلسل
هل تحتاج استجابة فورية من رموز محلولة مسبقًا؟ مخزن مؤقت للرموز يقلل زمن الانتظار في المسارات الحساسة
هل يتوقف العمل بالكامل عندما يتعطل موفّر واحد؟ تجاوز فشل متعدد المزوّدين يضيف استمرارية تشغيل بدل الاعتماد على نقطة واحدة

النمط 1: تجمع العمال البسيط

الأفضل لـ: 100-1000 حل/hour.

┌──────────┐     ┌──────────────┐     ┌────────────┐
│  Scraper  │────▶│  Thread Pool │────▶│ CaptchaAI  │
│  Tasks    │     │  (5-20       │     │   API      │
│           │◀────│   workers)   │◀────│            │
└──────────┘     └──────────────┘     └────────────┘
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests


class SimpleWorkerPool:
    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.max_workers = max_workers
        self.base = "https://ocr.captchaai.com"

    def _solve_one(self, params):
        params["key"] = self.api_key
        params["json"] = 1

        resp = requests.post(f"{self.base}/in.php", data=params).json()
        if resp["status"] != 1:
            return {"error": resp["request"]}

        task_id = resp["request"]
        time.sleep(10)

        for _ in range(60):
            result = requests.get(
                f"{self.base}/res.php",
                params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
            ).json()

            if result["request"] == "CAPCHA_NOT_READY":
                time.sleep(5)
                continue
            if result["status"] == 1:
                return {"token": result["request"]}
            return {"error": result["request"]}

        return {"error": "timeout"}

    def solve_batch(self, tasks):
        """tasks: list of (identifier, params) tuples."""
        results = {}
        with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
            futures = {
                pool.submit(self._solve_one, params): ident
                for ident, params in tasks
            }
            for future in as_completed(futures):
                ident = futures[future]
                try:
                    results[ident] = future.result()
                except Exception as e:
                    results[ident] = {"error": str(e)}
        return results

النمط 2: خط الأنابيب القائم على قائمة الانتظار

الأفضل لـ: 1000-10000 حل /hour مع التحكم في الضغط الخلفي.

┌────────┐     ┌───────────┐     ┌──────────┐     ┌───────────┐     ┌────────┐
│Producer│────▶│ Submit    │────▶│ Pending  │────▶│  Poll     │────▶│Results │
│        │     │ Queue     │     │ Queue    │     │  Workers  │     │ Queue  │
└────────┘     └───────────┘     └──────────┘     └───────────┘     └────────┘
import queue
import threading
import time
import requests


class QueuePipeline:
    def __init__(self, api_key, submit_workers=5, poll_workers=10):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.submit_queue = queue.Queue(maxsize=100)
        self.pending_queue = queue.Queue()
        self.results = {}
        self.results_lock = threading.Lock()
        self._running = False
        self.submit_workers = submit_workers
        self.poll_workers = poll_workers

    def start(self):
        self._running = True
        for _ in range(self.submit_workers):
            threading.Thread(target=self._submit_worker, daemon=True).start()
        for _ in range(self.poll_workers):
            threading.Thread(target=self._poll_worker, daemon=True).start()

    def stop(self):
        self._running = False

    def add(self, ident, params):
        self.submit_queue.put((ident, params))

    def get_result(self, ident, timeout=300):
        deadline = time.time() + timeout
        while time.time() < deadline:
            with self.results_lock:
                if ident in self.results:
                    return self.results.pop(ident)
            time.sleep(1)
        return {"error": "timeout"}

    def _submit_worker(self):
        while self._running:
            try:
                ident, params = self.submit_queue.get(timeout=1)
            except queue.Empty:
                continue
            params["key"] = self.api_key
            params["json"] = 1
            try:
                resp = requests.post(f"{self.base}/in.php", data=params).json()
                if resp["status"] == 1:
                    self.pending_queue.put((ident, resp["request"], time.time()))
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception as e:
                with self.results_lock:
                    self.results[ident] = {"error": str(e)}

    def _poll_worker(self):
        while self._running:
            try:
                ident, task_id, submitted_at = self.pending_queue.get(timeout=1)
            except queue.Empty:
                continue

            # Wait at least 10s from submission
            wait = 10 - (time.time() - submitted_at)
            if wait > 0:
                time.sleep(wait)

            try:
                resp = requests.get(
                    f"{self.base}/res.php",
                    params={"key": self.api_key, "action": "get", "id": task_id, "json": 1},
                ).json()

                if resp["request"] == "CAPCHA_NOT_READY":
                    self.pending_queue.put((ident, task_id, submitted_at))
                    time.sleep(3)
                elif resp["status"] == 1:
                    with self.results_lock:
                        self.results[ident] = {"token": resp["request"]}
                else:
                    with self.results_lock:
                        self.results[ident] = {"error": resp["request"]}
            except Exception:
                self.pending_queue.put((ident, task_id, submitted_at))
                time.sleep(5)

** الاستخدام: **

pipeline = QueuePipeline("YOUR_API_KEY")
pipeline.start()

# Add CAPTCHAs
pipeline.add("page_1", {"method": "turnstile", "sitekey": "KEY", "pageurl": "URL"})
pipeline.add("page_2", {"method": "userrecaptcha", "googlekey": "KEY", "pageurl": "URL"})

# Get results
result1 = pipeline.get_result("page_1")
result2 = pipeline.get_result("page_2")

pipeline.stop()

النمط 3: قواطع دوائر

يمنع حالات الفشل المتتالية عندما تتدهور واجهة برمجة التطبيقات (API) للحل.

import time
import threading


class CircuitBreaker:
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing — reject requests
    HALF_OPEN = "half_open"  # Testing recovery

    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = self.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.lock = threading.Lock()

    def can_proceed(self):
        with self.lock:
            if self.state == self.CLOSED:
                return True
            if self.state == self.OPEN:
                if time.time() - self.last_failure_time > self.reset_timeout:
                    self.state = self.HALF_OPEN
                    return True
                return False
            # HALF_OPEN: allow one request
            return True

    def record_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = self.CLOSED

    def record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = self.OPEN


class ResilientSolver:
    def __init__(self, api_key):
        self.api_key = api_key
        self.breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)

    def solve(self, params):
        if not self.breaker.can_proceed():
            raise Exception("Circuit open — API degraded, try later")

        try:
            result = self._do_solve(params)
            self.breaker.record_success()
            return result
        except Exception as e:
            self.breaker.record_failure()
            raise

    def _do_solve(self, params):
        # Standard solve logic
        pass

النمط 4: الحل المسبق باستخدام المخزن المؤقت للرمز المميز

احتفظ بمخزن مؤقت من الرموز المميزة التي تم حلها مسبقًا للاستخدام الفوري.

import queue
import threading
import time


class TokenBuffer:
    def __init__(self, solver, params, buffer_size=5, ttl_seconds=90):
        self.solver = solver
        self.params = params
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.ttl = ttl_seconds
        self.buffer_size = buffer_size
        self._running = False

    def start(self):
        self._running = True
        threading.Thread(target=self._fill_loop, daemon=True).start()

    def stop(self):
        self._running = False

    def get_token(self, timeout=30):
        """Get a pre-solved token. Returns None if buffer empty."""
        try:
            token, created_at = self.buffer.get(timeout=timeout)
            if time.time() - created_at > self.ttl:
                # Token expired, try next
                return self.get_token(timeout=timeout)
            return token
        except queue.Empty:
            return None

    def _fill_loop(self):
        while self._running:
            if self.buffer.qsize() < self.buffer_size:
                try:
                    token = self.solver.solve(self.params)
                    self.buffer.put((token, time.time()))
                except Exception:
                    time.sleep(5)
            else:
                time.sleep(2)

النمط 5: تجاوز فشل الموفرين المتعددين

الطريق إلى موفري النسخ الاحتياطي عند فشل الأساسي.

class MultiProviderSolver:
    def __init__(self, providers):
        """providers: list of (name, solver_instance, priority) tuples."""
        self.providers = sorted(providers, key=lambda x: x[2])
        self.breakers = {name: CircuitBreaker() for name, _, _ in providers}

    def solve(self, params):
        errors = []
        for name, solver, _ in self.providers:
            if not self.breakers[name].can_proceed():
                continue
            try:
                result = solver.solve(params)
                self.breakers[name].record_success()
                return result
            except Exception as e:
                self.breakers[name].record_failure()
                errors.append(f"{name}: {e}")
        raise Exception(f"All providers failed: {'; '.join(errors)}")

إرشادات القياس

الحجم الهندسة المعمارية العمال ملاحظات
<100/hr مكالمات مباشرة 1-3 لا حاجة لهندسة معمارية خاصة
100-1K/hr تجمع العمال 5-10 النمط 1
1K-10K/hr خط أنابيب قائمة الانتظار 10-30 النموذج 2 + قاطع الدائرة الكهربائية
10K-50K/hr طوابير موزعة 30-100 Redis/RabbitMQ، أجهزة متعددة
50 ألف+/hr متعدد الموفر 100+ النموذج 5 + قائمة الانتظار الموزعة

المراقبة على نطاق واسع

import logging
from collections import defaultdict

logger = logging.getLogger("captcha_scale")


class ScaleMetrics:
    def __init__(self):
        self.counts = defaultdict(int)
        self.times = defaultdict(list)

    def record(self, captcha_type, success, elapsed):
        key = f"{captcha_type}_{'ok' if success else 'fail'}"
        self.counts[key] += 1
        self.times[captcha_type].append(elapsed)

    def report(self):
        for captcha_type in set(k.rsplit("_", 1)[0] for k in self.counts):
            ok = self.counts.get(f"{captcha_type}_ok", 0)
            fail = self.counts.get(f"{captcha_type}_fail", 0)
            total = ok + fail
            rate = (ok / total * 100) if total else 0
            times = self.times.get(captcha_type, [])
            avg_time = sum(times) / len(times) if times else 0
            logger.info(
                f"{captcha_type}: {total} solves, {rate:.1f}% success, {avg_time:.1f}s avg"
            )

الأسئلة الشائعة

ما هو عدد الحلول المتزامنة التي يمكن لـ CaptchaAI التعامل معها؟

يمكن لـ CaptchaAI التعامل مع التزامن العالي. ابدأ بـ 10-20 طلبًا متزامنًا وقم بالزيادة بناءً على نتائجك.

هل يجب أن أستخدم المزامنة أو سلاسل المعالجة؟

تعمل مسارات التنفيذ بشكل جيد نظرًا لأن حل اختبار CAPTCHA هو I/O-bound (طلبات الشبكة + الانتظار). بالنسبة إلى Python 3.7+، يعد asyncio مع aiohttp ممتازًا أيضًا للتزامن العالي جدًا.

كيف يمكنني منع إرهاق واجهة برمجة التطبيقات (API)؟

استخدم قائمة انتظار ذات حجم محدد (الضغط الخلفي)، ومعدل محدد، وقواطع دوائر. مراقبة معدل ERROR_NO_SLOT_AVAILABLE الخاص بك.


أدلة ذات صلة


يمكنك البدء بالنمط الأبسط ثم الترقية تدريجيًا مع CaptchaAI كلما أصبح الحجم أو التفاوت في الحمل سببًا مباشرًا للمشكلات التشغيلية.

التعليقات غير مفعّلة لهذا المقال.