DevOps & Scaling

التوسع التلقائي لعمّال حل CAPTCHA

تهدر مجموعات العمال الثابتة الأموال خلال أوقات الهدوء وتخلق اختناقات أثناء فترات الذروة. يعمل التوسع التلقائي على مطابقة عدد العمال مع الطلب الفعلي، مما يؤدي إلى تحسين التكلفة والإنتاجية.


إشارات التحجيم

إشارة رفع مستوى متى خفض الحجم متى
عمق قائمة الانتظار > 20 مهمة معلقة < 5 مهام معلقة
استغلال العمّال > 80% مشغول < 20% مشغول
كمون الحل P95 > 60 ثانية P95 < 20 ثانية
معدل الخطأ > 5% (بحاجة إلى عمال جدد) مستقر <1%
الرصيد N/A الرصيد < $1 (إيقاف القياس)

المقياس التلقائي القائم على الخيط

توسيع نطاق سلاسل العمليات ضمن عملية واحدة:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

المقياس التلقائي القائم على العملية

توسيع نطاق العمليات المنفذة لعزل وحدة المعالجة المركزية:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

القياس المدرك للتوازن

توقف عن التوسع عندما تنخفض الأموال:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

الاندماج في حلقة القياس:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

مقارنة استراتيجيات التوسع

استراتيجية أفضل ل الكمون التعقيد
** مجمّع مسارات التنفيذ ** I/O-bound (استدعاءات واجهة برمجة التطبيقات) منخفض منخفض
** تجمع العمليات ** المعالجة المسبقة المرتبطة بوحدة المعالجة المركزية متوسط متوسط
كوبيرنتس HPA عمليات النشر السحابية الأصلية أعلى عالية
كدا القياس القائم على الحدث متوسط متوسط

استكشاف الأخطاء وإصلاحها

المشكلة السبب الإجراء
يستمر العمال في التوسع قائمة الانتظار لا تستنزف أبدًا تحقق مما إذا كان العمال يقومون بالمعالجة بالفعل
تقليص الحجم بشكل متسارع عتبة منخفضة ارفع تأخير التقليص إلى 30 ثانية أو أكثر
عمليات الزومبي لم يتم تنظيف العمليات استخدم _cleanup_dead() بانتظام
الرصيد يستنزف بسرعة عدد كبير جدًا من العمال أضف التحقق من الرصيد إلى منطق القياس

الأسئلة الشائعة

ما هي النسبة الصحيحة للعامل إلى قائمة الانتظار؟

استهدف عاملًا واحدًا لكل 5 إلى 10 مهام في قائمة الانتظار. يقوم كل عامل بمعالجة ما يقرب من 3-6 اختبارات CAPTCHA في الدقيقة حسب النوع.

هل يجب علي استخدام مسارات التنفيذ أو العمليات؟

خيوط للاتصال النقي بواجهة برمجة التطبيقات (CaptchaAI هو I/O-bound). العمليات عندما تقوم أيضًا بالمعالجة المسبقة للصور أو العمليات الحسابية الثقيلة جنبًا إلى جنب مع الحل.

ما مدى السرعة التي يجب أن أقوم بتوسيع نطاقها؟

قم بالتوسيع بسرعة (كل 10 إلى 15 ثانية من الفحص)، وقم بالتخفيض ببطء (انتظر 30 إلى 60 ثانية من الحمل المنخفض). وهذا يمنع التذبذب المتكرر بين حالتي التوسع والتقليص.


أدلة ذات صلة


للتوسع الذكي، احصل على مفتاح CaptchaAI اليوم.

النقاشات (0)

لا توجد تعليقات بعد.

مقالات ذات صلة

DevOps & Scaling النشر باللونين الأزرق والأخضر للبنية الأساسية لحل اختبار CAPTCHA
دليل تشغيلي لـ النشر باللونين الأزرق والأخضر للبنية الأساسية لحل اختبار CAPTCHA يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Capt...

دليل تشغيلي لـ النشر باللونين الأزرق والأخضر للبنية الأساسية لحل اختبار CAPTCHA يغطّي قرارات البنية، والاعتماد...

Apr 25, 2026
DevOps & Scaling حل CAPTCHA بدون خوادم باستخدام AWS Lambda وCaptchaAI
دليل تشغيلي لـ حل CAPTCHA بدون خوادم باستخدام AWS Lambda وCaptcha AI يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha AI في ب...

دليل تشغيلي لـ حل CAPTCHA بدون خوادم باستخدام AWS Lambda وCaptcha AI يغطّي قرارات البنية، والاعتمادية، والمراق...

Apr 20, 2026
DevOps & Scaling نشر عمّال CaptchaAI باستخدام Ansible
دليل تشغيلي لـ نشر عمّال Captcha AI باستخدام Ansible يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha AI في بيئات الإنتاج.

دليل تشغيلي لـ نشر عمّال Captcha AI باستخدام Ansible يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتم...

Apr 17, 2026