الدروس التطبيقية

إلغاء تكرار اختبار CAPTCHA حل الطلبات باستخدام قفل قاعدة البيانات

عند قيام عدة عاملين أو إعادة المحاولة بإرسال نفس اختبار CAPTCHA لحله، فإنك تدفع مقابل كل نسخة مكررة. تلتقط طبقة إلغاء البيانات المكررة الطلبات المتطابقة وترجع نفس النتيجة - مما يوفر أرصدة واجهة برمجة التطبيقات (API) ويقلل زمن الوصول.

كيف تحدث التكرارات

السيناريو السبب النفايات
أعد المحاولة قبل وصول النتيجة منطق إعادة المحاولة العدواني تكلفة 2-5x لكل اختبار CAPTCHA
عمال متعددون، نفس الهدف لا يوجد تنسيق بين العمال الحلول الضائعة الموازية
إعادة تشغيل تحديث الصفحة إعادة محاولة الواجهة الأمامية عند انتهاء المهلة حل إضافي لكل تحديث
تم إعادة تشغيل رسالة قائمة الانتظار ضمان التسليم مرة واحدة على الأقل حل مكرر لكل اعادتها

تصميم مفتاح إلغاء البيانات المكررة

قم بإنشاء مفتاح فريد من معلمات الطلب:

import hashlib


def dedup_key(method, sitekey, pageurl):
    """Generate a deduplication key for a CAPTCHA solve request."""
    raw = f"{method}:{sitekey}:{pageurl}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"

التكوين الرئيسي:

نوع التحقق المكونات الرئيسية
reCAPTCHA v2 method + sitekey + pageurl
reCAPTCHA v3 method + sitekey + pageurl + action
hCaptcha method + sitekey + pageurl
Turnstile method + sitekey + pageurl
صورة التحقق method + تجزئة body (محتوى الصورة)

إلغاء البيانات المكررة المستندة إلى Redis

تنفيذ بايثون

import os
import time
import json
import hashlib
import redis
import requests

r = redis.Redis(
    host=os.environ.get("REDIS_HOST", "localhost"),
    port=int(os.environ.get("REDIS_PORT", 6379)),
    decode_responses=True
)

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

# Dedup window: how long to consider a request "in progress"
DEDUP_TTL = 180  # seconds


def dedup_key(method, sitekey, pageurl, extra=""):
    raw = f"{method}:{sitekey}:{pageurl}:{extra}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"


def solve_with_dedup(sitekey, pageurl, method="userrecaptcha"):
    key = dedup_key(method, sitekey, pageurl)

    # Check if this request is already being solved
    existing = r.get(key)
    if existing:
        state = json.loads(existing)
        if state["status"] == "solving":
            # Wait for the result
            return wait_for_result(key)
        elif state["status"] == "solved":
            return {"solution": state["solution"], "source": "dedup_cache"}
        elif state["status"] == "error":
            pass  # Allow retry on error

    # Mark as solving
    r.set(key, json.dumps({"status": "solving", "started": time.time()}), ex=DEDUP_TTL)

    # Submit to CaptchaAI
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": method,
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        r.set(key, json.dumps({"status": "error", "error": data.get("request")}), ex=30)
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get",
            "id": captcha_id, "json": 1
        }).json()

        if result.get("status") == 1:
            solution = result["request"]
            # Cache the result for other workers (short TTL since tokens expire)
            r.set(key, json.dumps({
                "status": "solved",
                "solution": solution,
                "solved_at": time.time()
            }), ex=60)  # Cache result for 60 seconds
            return {"solution": solution, "source": "api"}

        if result.get("request") != "CAPCHA_NOT_READY":
            r.set(key, json.dumps({
                "status": "error", "error": result.get("request")
            }), ex=30)
            return {"error": result.get("request")}

    r.set(key, json.dumps({"status": "error", "error": "TIMEOUT"}), ex=30)
    return {"error": "TIMEOUT"}


def wait_for_result(key, timeout=120):
    """Wait for another worker to finish solving."""
    start = time.time()
    while time.time() - start < timeout:
        data = r.get(key)
        if data:
            state = json.loads(data)
            if state["status"] == "solved":
                return {"solution": state["solution"], "source": "dedup_wait"}
            if state["status"] == "error":
                return {"error": state.get("error", "UNKNOWN")}
        time.sleep(2)
    return {"error": "DEDUP_WAIT_TIMEOUT"}

تنفيذ JavaScript

const Redis = require("ioredis");
const axios = require("axios");
const crypto = require("crypto");

const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const DEDUP_TTL = 180;

function dedupKey(method, sitekey, pageurl) {
  const raw = `${method}:${sitekey}:${pageurl}`;
  const hash = crypto.createHash("sha256").update(raw).digest("hex").slice(0, 16);
  return `captcha:dedup:${hash}`;
}

async function solveWithDedup(sitekey, pageurl, method = "userrecaptcha") {
  const key = dedupKey(method, sitekey, pageurl);

  // Check existing
  const existing = await redis.get(key);
  if (existing) {
    const state = JSON.parse(existing);
    if (state.status === "solving") return await waitForResult(key);
    if (state.status === "solved") return { solution: state.solution, source: "dedup_cache" };
  }

  // Mark as solving
  await redis.set(key, JSON.stringify({ status: "solving", started: Date.now() }), "EX", DEDUP_TTL);

  // Submit
  const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: { key: API_KEY, method, googlekey: sitekey, pageurl, json: 1 },
  });

  if (submit.data.status !== 1) {
    await redis.set(key, JSON.stringify({ status: "error", error: submit.data.request }), "EX", 30);
    return { error: submit.data.request };
  }

  const captchaId = submit.data.request;

  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    const poll = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (poll.data.status === 1) {
      await redis.set(key, JSON.stringify({ status: "solved", solution: poll.data.request }), "EX", 60);
      return { solution: poll.data.request, source: "api" };
    }
    if (poll.data.request !== "CAPCHA_NOT_READY") {
      await redis.set(key, JSON.stringify({ status: "error", error: poll.data.request }), "EX", 30);
      return { error: poll.data.request };
    }
  }

  await redis.set(key, JSON.stringify({ status: "error", error: "TIMEOUT" }), "EX", 30);
  return { error: "TIMEOUT" };
}

async function waitForResult(key, timeout = 120000) {
  const start = Date.now();
  while (Date.now() - start < timeout) {
    const data = await redis.get(key);
    if (data) {
      const state = JSON.parse(data);
      if (state.status === "solved") return { solution: state.solution, source: "dedup_wait" };
      if (state.status === "error") return { error: state.error };
    }
    await new Promise((r) => setTimeout(r, 2000));
  }
  return { error: "DEDUP_WAIT_TIMEOUT" };
}

بديل قفل قاعدة البيانات

بالنسبة للإلغاء المستند إلى PostgreSQL بدون Redis:

import psycopg2


def solve_with_pg_dedup(conn, sitekey, pageurl):
    """Use PostgreSQL advisory locks for deduplication."""
    # Generate a numeric lock key from the dedup key
    lock_id = hash(f"{sitekey}:{pageurl}") & 0x7FFFFFFF

    cursor = conn.cursor()

    # Try to acquire advisory lock (non-blocking)
    cursor.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,))
    acquired = cursor.fetchone()[0]

    if not acquired:
        # Another worker is solving — wait for result
        cursor.execute("SELECT pg_advisory_lock(%s)", (lock_id,))
        # Lock acquired means other worker finished — check cache
        cursor.execute(
            "SELECT solution FROM captcha_cache "
            "WHERE sitekey = %s AND pageurl = %s "
            "AND created_at > NOW() - INTERVAL '60 seconds'",
            (sitekey, pageurl)
        )
        row = cursor.fetchone()
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
        if row:
            return {"solution": row[0], "source": "pg_cache"}
        return {"error": "NO_CACHED_RESULT"}

    try:
        # Solve the CAPTCHA
        solution = solve_via_api(sitekey, pageurl)
        if solution:
            cursor.execute(
                "INSERT INTO captcha_cache (sitekey, pageurl, solution) "
                "VALUES (%s, %s, %s)",
                (sitekey, pageurl, solution)
            )
            conn.commit()
        return {"solution": solution} if solution else {"error": "SOLVE_FAILED"}
    finally:
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))

مقاييس فعالية الحذف

تتبع وفورات إلغاء البيانات المكررة:

def track_dedup_stats(source):
    """Increment counters for dedup tracking."""
    today = time.strftime("%Y-%m-%d")
    r.hincrby(f"dedup:stats:{today}", source, 1)
    r.expire(f"dedup:stats:{today}", 7 * 86400)


def get_dedup_report():
    today = time.strftime("%Y-%m-%d")
    stats = r.hgetall(f"dedup:stats:{today}")
    total = sum(int(v) for v in stats.values())
    saved = int(stats.get("dedup_cache", 0)) + int(stats.get("dedup_wait", 0))
    return {
        "total_requests": total,
        "deduplicated": saved,
        "savings_pct": f"{saved / total * 100:.1f}%" if total else "0%",
        "breakdown": stats
    }

استكشاف الأخطاء وإصلاحها

المشكلة السبب الإجراء
يُنشأ الرمز المميز لكن الجهة المستهدفة ترفضه مفتاح الموقع أو الصفحة أو سياق الجلسة لا يتطابق التقط المعلمات من جديد وأعد استخدام الرمز داخل جلسة HTTP أو المتصفح نفسها
تنتهي عملية الاستطلاع بمهلة الفاصل الزمني أو وقت الانتظار أو معالجة الأخطاء صارمة أكثر من اللازم استطلع كل 5 إلى 10 ثوانٍ وافصل بين انتهاء المهلة والأخطاء الفعلية وسجّل السبب
ينجح المثال محليًا لكنه يفشل داخل سير العمل رد النداء أو حقل النموذج أو حقن الرمز مفقود في السلسلة الفعلية تحقق من المسار الكامل بين أداة الحل والطلب النهائي إلى الموقع المستهدف

الخطوات التالية

أدلة ذات صلة

التعليقات غير مفعّلة لهذا المقال.