الدروس التطبيقية

بناء خطوط أنابيب CAPTCHA للعميل باستخدام CaptchaAI

عندما تقوم بإدارة عملية التجريد أو الأتمتة لعدة عملاء، فإن كل مشروع يصل في النهاية إلى اختبار CAPTCHA. بدلاً من كتابة كود الحل لمرة واحدة لكل مشروع، قم ببناء مسار قابل لإعادة الاستخدام. يتجول هذا الدليل عبر الهندسة المعمارية.


هندسة خطوط الأنابيب

┌──────────────┐    ┌───────────────┐    ┌──────────────┐
│  Client A    │──▶ │               │    │              │
│  Client B    │──▶ │  Task Queue   │──▶ │  CaptchaAI   │
│  Client C    │──▶ │               │    │  API         │
└──────────────┘    └───────────────┘    └──────────────┘
                           │                    │
                           ▼                    ▼
                    ┌───────────────┐    ┌──────────────┐
                    │  Result Store │◀── │  Polling      │
                    │  (Redis/DB)   │    │  Workers      │
                    └───────────────┘    └──────────────┘

المكونات:

  1. تناول المهمة - يتلقى طلبات الحل من كاشطات العميل
  2. قائمة الانتظار — تقوم بتخزين المهام مؤقتًا، وفرض حدود التزامن لكل عميل
  3. عمال الحلول - أرسل إلى CaptchaAI واستطلع النتائج
  4. مخزن النتائج - يحتوي على الرموز المميزة التي تم حلها لاسترجاعها من قبل المستهلك

خط أنابيب بايثون

فئة الحل الأساسية

import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock

SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"

@dataclass
class SolveRequest:
    client_id: str
    method: str
    params: dict
    callback: Optional[callable] = None

@dataclass
class SolveResult:
    client_id: str
    task_id: str
    token: Optional[str] = None
    error: Optional[str] = None


class CaptchaPipeline:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.active = {}
        self.lock = Lock()

    def enqueue(self, request: SolveRequest):
        with self.lock:
            self.queue.append(request)

    def submit_task(self, request: SolveRequest) -> Optional[str]:
        data = {
            "key": self.api_key,
            "method": request.method,
            "json": 1,
            **request.params
        }

        try:
            resp = requests.post(SUBMIT_URL, data=data, timeout=15)
            result = resp.json()

            if result.get("status") == 1:
                return result["request"]
            else:
                print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
                return None
        except requests.RequestException as e:
            print(f"[{request.client_id}] Network error: {e}")
            return None

    def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
        elapsed = 0
        interval = 5
        while elapsed < max_wait:
            time.sleep(interval)
            elapsed += interval

            try:
                resp = requests.get(RESULT_URL, params={
                    "key": self.api_key,
                    "action": "get",
                    "id": task_id,
                    "json": 1
                }, timeout=10)
                result = resp.json()

                if result.get("status") == 1:
                    return result["request"]
                elif result.get("request") == "CAPCHA_NOT_READY":
                    continue
                else:
                    print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
                    return None
            except requests.RequestException:
                continue

        return None

    def process_queue(self):
        while self.queue or self.active:
            # Fill active slots
            with self.lock:
                while self.queue and len(self.active) < self.max_concurrent:
                    request = self.queue.popleft()
                    task_id = self.submit_task(request)
                    if task_id:
                        self.active[task_id] = request

            # Poll active tasks
            completed = []
            for task_id, request in list(self.active.items()):
                token = self.poll_result(task_id, max_wait=10)
                if token:
                    result = SolveResult(
                        client_id=request.client_id,
                        task_id=task_id,
                        token=token
                    )
                    if request.callback:
                        request.callback(result)
                    completed.append(task_id)

            with self.lock:
                for task_id in completed:
                    del self.active[task_id]

استخدام متعدد العملاء

pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)

# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
    client_id="client_a",
    method="userrecaptcha",
    params={
        "googlekey": "6Le-SITEKEY-A",
        "pageurl": "https://client-a-target.com/form"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

# Client B — Turnstile
pipeline.enqueue(SolveRequest(
    client_id="client_b",
    method="turnstile",
    params={
        "sitekey": "0x4AAAA-SITEKEY-B",
        "pageurl": "https://client-b-target.com/login"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

pipeline.process_queue()

خط أنابيب Node.js

const axios = require("axios");

const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  constructor(apiKey, maxConcurrent = 10) {
    this.apiKey = apiKey;
    this.maxConcurrent = maxConcurrent;
    this.queue = [];
    this.activeCount = 0;
  }

  enqueue(clientId, method, params) {
    return new Promise((resolve, reject) => {
      this.queue.push({ clientId, method, params, resolve, reject });
      this._processNext();
    });
  }

  async _processNext() {
    if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;

    this.activeCount++;
    const task = this.queue.shift();

    try {
      const token = await this._solve(task);
      task.resolve({ clientId: task.clientId, token });
    } catch (err) {
      task.reject(err);
    } finally {
      this.activeCount--;
      this._processNext();
    }
  }

  async _solve(task) {
    const submitResp = await axios.post(SUBMIT_URL, null, {
      params: {
        key: this.apiKey,
        method: task.method,
        json: 1,
        ...task.params,
      },
      timeout: 15000,
    });

    if (submitResp.data.status !== 1) {
      throw new Error(submitResp.data.error_text || submitResp.data.request);
    }

    const taskId = submitResp.data.request;
    return this._poll(taskId);
  }

  async _poll(taskId, maxWait = 120000) {
    const interval = 5000;
    let elapsed = 0;

    while (elapsed < maxWait) {
      await new Promise((r) => setTimeout(r, interval));
      elapsed += interval;

      try {
        const resp = await axios.get(RESULT_URL, {
          params: {
            key: this.apiKey,
            action: "get",
            id: taskId,
            json: 1,
          },
          timeout: 10000,
        });

        if (resp.data.status === 1) return resp.data.request;
        if (resp.data.request !== "CAPCHA_NOT_READY") {
          throw new Error(resp.data.error_text || resp.data.request);
        }
      } catch (err) {
        if (err.response) throw err;
      }
    }

    throw new Error(`Timeout waiting for task ${taskId}`);
  }
}

// Usage
(async () => {
  const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);

  const results = await Promise.allSettled([
    pipeline.enqueue("client_a", "userrecaptcha", {
      googlekey: "6Le-SITEKEY-A",
      pageurl: "https://client-a-target.com/form",
    }),
    pipeline.enqueue("client_b", "turnstile", {
      sitekey: "0x4AAAA-SITEKEY-B",
      pageurl: "https://client-b-target.com/login",
    }),
  ]);

  results.forEach((r) => {
    if (r.status === "fulfilled") {
      console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
    } else {
      console.error(`Failed: ${r.reason.message}`);
    }
  });
})();

التكوين لكل عميل

تتبع الإعدادات لكل عميل مثل الوكيل وتفضيلات الحل وحدود الأسعار:

CLIENT_CONFIG = {
    "client_a": {
        "proxy": "host:port:user:pass",
        "proxytype": "HTTP",
        "max_concurrent": 5,
        "default_method": "userrecaptcha"
    },
    "client_b": {
        "proxy": None,
        "proxytype": None,
        "max_concurrent": 10,
        "default_method": "turnstile"
    }
}

def build_params(client_id, params):
    config = CLIENT_CONFIG.get(client_id, {})
    if config.get("proxy"):
        params["proxy"] = config["proxy"]
        params["proxytype"] = config["proxytype"]
    return params

استراتيجية التعامل مع الأخطاء

خطأ الاستجابة
ERROR_ZERO_BALANCE إيقاف قائمة الانتظار، تنبيه جميع العملاء
ERROR_NO_SLOT_AVAILABLE إعادة ترتيب المهمة مع التأخير
ERROR_WRONG_CAPTCHA_ID تجاهل، سجل خطأ
ERROR_CAPTCHA_UNSOLVABLE أعد المحاولة مرة واحدة، ثم فشل
مهلة الشبكة إعادة المحاولة مع التراجع (بحد أقصى 3 محاولات)

استكشاف الأخطاء وإصلاحها

مشكلة السبب إصلاح
قائمة الانتظار تنمو بلا حدود الفتحات النشطة ممتلئة زيادة max_concurrent أو إضافة العمال
رد الاتصال لا يطلق النار فشلت المهمة بصمت تحقق من إرجاع الخطأ في حلقة الاستقصاء
الرموز المختلطة بين العملاء مخزن النتائج المشتركة النتائج الرئيسية بواسطة client_id + task_id
أخطاء حد المعدل (429) هناك عدد كبير جدًا من عمليات الإرسال المتزامنة انخفاض التزامن، إضافة تأخير الإرسال

الأسئلة الشائعة

ما هو عدد المهام المتزامنة التي يجب أن أقوم بتشغيلها لكل عميل؟

ابدأ بـ 5-10. راقب أوقات الحل ومعدلات الخطأ، ثم اضبطها. يدعم CaptchaAI التزامن العالي ولكن قد يكون تجمع الوكيل الخاص بك هو عنق الزجاجة.

هل يجب علي استخدام مفتاح API منفصل لكل عميل؟

أنه يبسط الفواتير. استخدم المعلمة CaptchaAI soft_id إذا كنت بحاجة إلى التتبع تحت مفتاح واحد.

كيف أتعامل مع الطوابير الليلية؟

استمر في قائمة الانتظار (Redis أو قاعدة البيانات). عند إعادة التشغيل، أعد تحميل المهام المعلقة واستأنف المعالجة.


قم ببناء خط أنابيب CAPTCHA الخاص بك باستخدام CaptchaAI

ابدأ في بناء خطوط أنابيب العميل علىcaptchaai.com.


أدلة ذات صلة

التعليقات غير مفعّلة لهذا المقال.