تهدر مجموعات العمال الثابتة الأموال خلال أوقات الهدوء وتخلق اختناقات أثناء فترات الذروة. يعمل التوسع التلقائي على مطابقة عدد العمال مع الطلب الفعلي، مما يؤدي إلى تحسين التكلفة والإنتاجية.
إشارات التحجيم
| إشارة | رفع مستوى متى | خفض الحجم متى |
|---|---|---|
| عمق قائمة الانتظار | > 20 مهمة معلقة | < 5 مهام معلقة |
| استغلال العمّال | > 80% مشغول | < 20% مشغول |
| كمون الحل | P95 > 60 ثانية | P95 < 20 ثانية |
| معدل الخطأ | > 5% (بحاجة إلى عمال جدد) | مستقر <1% |
| الرصيد | N/A | الرصيد < $1 (إيقاف القياس) |
المقياس التلقائي القائم على الخيط
توسيع نطاق سلاسل العمليات ضمن عملية واحدة:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
المقياس التلقائي القائم على العملية
توسيع نطاق العمليات المنفذة لعزل وحدة المعالجة المركزية:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
القياس المدرك للتوازن
توقف عن التوسع عندما تنخفض الأموال:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
الاندماج في حلقة القياس:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
مقارنة استراتيجيات التوسع
| استراتيجية | أفضل ل | الكمون | التعقيد |
|---|---|---|---|
| ** مجمّع مسارات التنفيذ ** | I/O-bound (استدعاءات واجهة برمجة التطبيقات) | منخفض | منخفض |
| ** تجمع العمليات ** | المعالجة المسبقة المرتبطة بوحدة المعالجة المركزية | متوسط | متوسط |
| كوبيرنتس HPA | عمليات النشر السحابية الأصلية | أعلى | عالية |
| كدا | القياس القائم على الحدث | متوسط | متوسط |
استكشاف الأخطاء وإصلاحها
| المشكلة | السبب | الإجراء |
|---|---|---|
| يستمر العمال في التوسع | قائمة الانتظار لا تستنزف أبدًا | تحقق مما إذا كان العمال يقومون بالمعالجة بالفعل |
| تقليص الحجم بشكل متسارع | عتبة منخفضة | ارفع تأخير التقليص إلى 30 ثانية أو أكثر |
| عمليات الزومبي | لم يتم تنظيف العمليات | استخدم _cleanup_dead() بانتظام |
| الرصيد يستنزف بسرعة | عدد كبير جدًا من العمال | أضف التحقق من الرصيد إلى منطق القياس |
الأسئلة الشائعة
ما هي النسبة الصحيحة للعامل إلى قائمة الانتظار؟
استهدف عاملًا واحدًا لكل 5 إلى 10 مهام في قائمة الانتظار. يقوم كل عامل بمعالجة ما يقرب من 3-6 اختبارات CAPTCHA في الدقيقة حسب النوع.
هل يجب علي استخدام مسارات التنفيذ أو العمليات؟
خيوط للاتصال النقي بواجهة برمجة التطبيقات (CaptchaAI هو I/O-bound). العمليات عندما تقوم أيضًا بالمعالجة المسبقة للصور أو العمليات الحسابية الثقيلة جنبًا إلى جنب مع الحل.
ما مدى السرعة التي يجب أن أقوم بتوسيع نطاقها؟
قم بالتوسيع بسرعة (كل 10 إلى 15 ثانية من الفحص)، وقم بالتخفيض ببطء (انتظر 30 إلى 60 ثانية من الحمل المنخفض). وهذا يمنع التذبذب المتكرر بين حالتي التوسع والتقليص.
أدلة ذات صلة
للتوسع الذكي، احصل على مفتاح CaptchaAI اليوم.
النقاشات (0)
شارك في النقاش
سجّل الدخول لمشاركة رأيك.
تسجيل الدخوللا توجد تعليقات بعد.