يستهدف مسار حل اختبار CAPTCHA الخاص بك معدل نجاح يصل إلى 95%. في الأسبوع الماضي كان 94.2٪. هل هذه مشكلة؟ بدون ميزانية خطأ، لا يمكنك الإجابة على هذا السؤال من الناحية الكمية. تخبرك ميزانية الخطأ بالضبط بعدد حالات الفشل التي يمكنك تحملها قبل أن تنخفض الموثوقية إلى ما دون مستوى SLO الخاص بك - وما يجب فعله عند نفاد الميزانية.
خطأ في أساسيات الميزانية
مفهوم
التعريف
مثال
SLO
معدل النجاح المستهدف
95% حلول ناجحة
ميزانية خطأ
معدل الفشل المسموح به
5% من إجمالي الحلول يمكن أن تفشل
** معدل الحرق **
مدى سرعة استهلاك الميزانية
2 × يعني استنفاد الميزانية في نصف النافذة
نافذة
فترة القياس
المتداول 24 ساعة أو 7 أيام
إذا كان SLO الخاص بك يبلغ 95% خلال فترة 24 ساعة مع 10000 حل، فإن ميزانية الخطأ الخاصة بك هي 500 فشل. بمجرد وصولك إلى 500 فشل، يجب أن تتوقف عمليات النشر الجديدة أو التغييرات المحفوفة بالمخاطر.
بايثون: خطأ في تعقب الميزانية
import time
import threading
from dataclasses import dataclass, field
from collections import deque
from enum import Enum
API_KEY = "YOUR_API_KEY"
class BudgetStatus(Enum):
HEALTHY = "healthy" # Budget > 50% remaining
WARNING = "warning" # Budget 10-50% remaining
CRITICAL = "critical" # Budget < 10% remaining
EXHAUSTED = "exhausted" # Budget depleted
@dataclass
class SLOConfig:
"""Service Level Objective configuration."""
target_success_rate: float = 0.95 # 95%
window_seconds: int = 86400 # 24 hours
warning_threshold: float = 0.50 # Alert at 50% budget
critical_threshold: float = 0.10 # Alert at 10% budget
@dataclass
class ErrorBudgetEvent:
timestamp: float
success: bool
class ErrorBudgetTracker:
"""Tracks error budget consumption for CAPTCHA solving."""
def __init__(self, config: SLOConfig = SLOConfig()):
self.config = config
self._events: deque[ErrorBudgetEvent] = deque()
self._lock = threading.Lock()
self._callbacks: dict[BudgetStatus, list[callable]] = {
status: [] for status in BudgetStatus
}
self._last_status = BudgetStatus.HEALTHY
def on_status_change(self, status: BudgetStatus, callback: callable):
"""Register a callback for status transitions."""
self._callbacks[status].append(callback)
def record(self, success: bool):
"""Record a solve attempt."""
now = time.monotonic()
event = ErrorBudgetEvent(timestamp=now, success=success)
with self._lock:
self._events.append(event)
self._prune(now)
new_status = self._compute_status()
if new_status != self._last_status:
self._last_status = new_status
for cb in self._callbacks.get(new_status, []):
try:
cb(self.get_report())
except Exception as e:
print(f"[BUDGET] Callback error: {e}")
def _prune(self, now: float):
"""Remove events outside the window."""
cutoff = now - self.config.window_seconds
while self._events and self._events[0].timestamp < cutoff:
self._events.popleft()
def _compute_status(self) -> BudgetStatus:
remaining = self.remaining_fraction
if remaining <= 0:
return BudgetStatus.EXHAUSTED
if remaining < self.config.critical_threshold:
return BudgetStatus.CRITICAL
if remaining < self.config.warning_threshold:
return BudgetStatus.WARNING
return BudgetStatus.HEALTHY
@property
def total_events(self) -> int:
with self._lock:
return len(self._events)
@property
def success_count(self) -> int:
with self._lock:
return sum(1 for e in self._events if e.success)
@property
def failure_count(self) -> int:
with self._lock:
return sum(1 for e in self._events if not e.success)
@property
def current_success_rate(self) -> float:
total = self.total_events
return self.success_count / total if total > 0 else 1.0
@property
def error_budget_total(self) -> float:
"""Total allowed failures in the window."""
total = self.total_events
if total == 0:
return 0
return total * (1 - self.config.target_success_rate)
@property
def error_budget_remaining(self) -> float:
"""Remaining failure allowance."""
return max(0, self.error_budget_total - self.failure_count)
@property
def remaining_fraction(self) -> float:
"""Fraction of error budget remaining (0.0 to 1.0)."""
budget = self.error_budget_total
if budget <= 0:
return 1.0 if self.failure_count == 0 else 0.0
return max(0, self.error_budget_remaining / budget)
@property
def burn_rate(self) -> float:
"""How fast the budget is being consumed (1.0 = normal, 2.0 = 2× faster)."""
total = self.total_events
if total == 0:
return 0.0
expected_failures = total * (1 - self.config.target_success_rate)
if expected_failures == 0:
return 0.0
return self.failure_count / expected_failures
def get_report(self) -> dict:
return {
"status": self._last_status.value,
"slo_target": self.config.target_success_rate,
"current_rate": round(self.current_success_rate, 4),
"total_events": self.total_events,
"successes": self.success_count,
"failures": self.failure_count,
"budget_total": round(self.error_budget_total, 1),
"budget_remaining": round(self.error_budget_remaining, 1),
"budget_remaining_pct": round(self.remaining_fraction * 100, 1),
"burn_rate": round(self.burn_rate, 2),
}
# --- Integration with solver ---
budget = ErrorBudgetTracker(SLOConfig(
target_success_rate=0.95,
window_seconds=3600, # 1-hour window for demo
))
# Register alerts
budget.on_status_change(BudgetStatus.WARNING, lambda r:
print(f"[ALERT] Budget warning: {r['budget_remaining_pct']}% remaining"))
budget.on_status_change(BudgetStatus.CRITICAL, lambda r:
print(f"[ALERT] Budget critical: {r['budget_remaining_pct']}% remaining"))
budget.on_status_change(BudgetStatus.EXHAUSTED, lambda r:
print(f"[ALERT] Budget EXHAUSTED — throttle new requests"))
def solve_with_budget(params: dict) -> str:
"""Solve CAPTCHA while tracking error budget."""
import requests
if budget._last_status == BudgetStatus.EXHAUSTED:
raise RuntimeError("Error budget exhausted — solving paused")
try:
submit_params = {**params, "key": API_KEY, "json": 1}
resp = requests.post(
"https://ocr.captchaai.com/in.php", data=submit_params, timeout=30
).json()
if resp.get("status") != 1:
budget.record(False)
raise RuntimeError(f"Submit: {resp.get('request')}")
task_id = resp["request"]
start = time.monotonic()
while time.monotonic() - start < 180:
time.sleep(5)
poll = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get", "id": task_id, "json": 1,
}, timeout=15).json()
if poll.get("request") == "CAPCHA_NOT_READY":
continue
if poll.get("status") == 1:
budget.record(True)
return poll["request"]
budget.record(False)
raise RuntimeError(f"Solve: {poll.get('request')}")
budget.record(False)
raise RuntimeError("Timeout")
except Exception:
budget.record(False)
raise
# Usage
for i in range(100):
try:
token = solve_with_budget({
"method": "turnstile",
"sitekey": "0x4XXXXXXXXXXXXXXXXX",
"pageurl": "https://example.com",
})
except RuntimeError as e:
if "exhausted" in str(e):
print(f"Stopped at iteration {i}")
break
print(budget.get_report())
دليل تشغيلي لـ التوسع التلقائي لعمّال حل CAPTCHA يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha AI في بيئات الإنتاج.
دليل تشغيلي لـ التوسع التلقائي لعمّال حل CAPTCHA يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة ا...
Apr 19, 2026
DevOps والتوسع
دليل تشغيلي لـ معالجة نتائج CAPTCHA بنمط حدثي باستخدام AWS SNS وCaptcha AI يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha A...
دليل تشغيلي لـ معالجة نتائج CAPTCHA بنمط حدثي باستخدام AWS SNS وCaptcha AI يغطّي قرارات البنية، والاعتمادية، و...
Apr 20, 2026
الدروس التطبيقية
شرح خطوة بخطوة لـ إنشاء قائمة انتظار حل اختبار CAPTCHA في Python باستخدام Captcha AI مع أمثلة مباشرة قابلة لإعادة الاستخدام ومسار واضح لتطبيقه باستخدام Captcha...
شرح خطوة بخطوة لـ إنشاء قائمة انتظار حل اختبار CAPTCHA في Python باستخدام Captcha AI مع أمثلة مباشرة قابلة لإع...
Apr 29, 2026
DevOps والتوسع
دليل تشغيلي لـ نشر عمّال Captcha AI باستخدام Ansible يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha AI في بيئات الإنتاج.
دليل تشغيلي لـ النشر باللونين الأزرق والأخضر للبنية الأساسية لحل اختبار CAPTCHA يغطّي قرارات البنية، والاعتماد...
Apr 25, 2026
DevOps والتوسع
دليل تشغيلي لـ Azure Functions + Captcha AI: تكامل سحابي بدون خوادم يغطّي قرارات البنية، والاعتمادية، والمراقبة، وأنماط الأتمتة المناسبة لتشغيل Captcha AI في بي...