الدروس التطبيقية

أنشئ لوحة معلومات لتحليل المنافسين باستخدام CaptchaAI

استخرج أسعار المنافسين وقوائم منتجاتهم وصفحات ميزاتهم. خزّن البيانات التاريخية وأنشئ تقارير مقارنة منتظمة.


الهندسة المعمارية

Competitor Sites ──> CAPTCHA Solver ──> Data Extractors
                                             │
                                        SQLite Store
                                             │
                                      Dashboard Report

نماذج البيانات

# models.py
import sqlite3
from datetime import datetime
from dataclasses import dataclass
from typing import Optional


@dataclass
class CompetitorData:
    competitor: str
    metric: str
    value: str
    numeric_value: Optional[float] = None
    url: str = ""
    scraped_at: str = ""

    def __post_init__(self):
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()


class CompetitorDB:
    def __init__(self, path="competitor_data.db"):
        self.conn = sqlite3.connect(path)
        self._init()

    def _init(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                competitor TEXT,
                metric TEXT,
                value TEXT,
                numeric_value REAL,
                url TEXT,
                scraped_at TEXT
            )
        """)
        self.conn.commit()

    def save(self, data: CompetitorData):
        self.conn.execute(
            """INSERT INTO metrics
               (competitor, metric, value, numeric_value, url, scraped_at)
               VALUES (?, ?, ?, ?, ?, ?)""",
            (data.competitor, data.metric, data.value,
             data.numeric_value, data.url, data.scraped_at),
        )
        self.conn.commit()

    def get_history(self, competitor, metric, limit=30):
        cursor = self.conn.execute(
            """SELECT value, numeric_value, scraped_at
               FROM metrics
               WHERE competitor = ? AND metric = ?
               ORDER BY scraped_at DESC LIMIT ?""",
            (competitor, metric, limit),
        )
        return cursor.fetchall()

    def latest_comparison(self, metric):
        cursor = self.conn.execute(
            """SELECT competitor, value, numeric_value, MAX(scraped_at) as latest
               FROM metrics WHERE metric = ?
               GROUP BY competitor ORDER BY numeric_value""",
            (metric,),
        )
        return cursor.fetchall()

حلّال CAPTCHA

# solver.py
import requests
import time
import re
import os


class CaptchaSolver:
    def __init__(self):
        self.api_key = os.environ["CAPTCHAAI_API_KEY"]

    def solve_if_needed(self, session, url, html):
        if "data-sitekey" not in html:
            return html

        match = re.search(r'data-sitekey="([^"]+)"', html)
        if not match:
            return html

        sitekey = match.group(1)
        resp = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key,
            "method": "userrecaptcha",
            "googlekey": sitekey,
            "pageurl": url,
            "json": 1,
        }, timeout=30)
        task_id = resp.json()["request"]

        time.sleep(15)
        for _ in range(24):
            resp = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get",
                "id": task_id, "json": 1,
            }, timeout=15)
            data = resp.json()
            if data.get("status") == 1:
                post_resp = session.post(url, data={
                    "g-recaptcha-response": data["request"],
                }, timeout=30)
                return post_resp.text
            if data["request"] != "CAPCHA_NOT_READY":
                raise RuntimeError(data["request"])
            time.sleep(5)

        raise TimeoutError("CAPTCHA solve timeout")

مكشطة المنافس

# scraper.py
import requests
import re
from bs4 import BeautifulSoup
from solver import CaptchaSolver
from models import CompetitorData


class CompetitorScraper:
    def __init__(self):
        self.solver = CaptchaSolver()
        self.session = requests.Session()
        self.session.headers["User-Agent"] = (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
        )

    def scrape_pricing(self, competitor_name, url, plan_selector, price_selector):
        html = self._fetch(url)
        soup = BeautifulSoup(html, "html.parser")
        plans = soup.select(plan_selector)
        data = []

        for plan in plans:
            name_el = plan.select_one("h3, h2, .plan-name")
            price_el = plan.select_one(price_selector)

            if not name_el or not price_el:
                continue

            price_text = price_el.get_text(strip=True)
            match = re.search(r'[\d,.]+', price_text)
            numeric = float(match.group().replace(",", "")) if match else None

            data.append(CompetitorData(
                competitor=competitor_name,
                metric=f"price_{name_el.get_text(strip=True).lower().replace(' ', '_')}",
                value=price_text,
                numeric_value=numeric,
                url=url,
            ))

        return data

    def scrape_features(self, competitor_name, url, feature_list_selector):
        html = self._fetch(url)
        soup = BeautifulSoup(html, "html.parser")
        features = soup.select(f"{feature_list_selector} li")

        return [
            CompetitorData(
                competitor=competitor_name,
                metric="feature",
                value=f.get_text(strip=True),
                url=url,
            )
            for f in features if f.get_text(strip=True)
        ]

    def scrape_product_count(self, competitor_name, url, count_selector):
        html = self._fetch(url)
        soup = BeautifulSoup(html, "html.parser")
        el = soup.select_one(count_selector)

        if el:
            text = el.get_text(strip=True)
            match = re.search(r'[\d,]+', text)
            if match:
                count = int(match.group().replace(",", ""))
                return CompetitorData(
                    competitor=competitor_name,
                    metric="product_count",
                    value=text,
                    numeric_value=count,
                    url=url,
                )
        return None

    def _fetch(self, url):
        resp = self.session.get(url, timeout=20)
        return self.solver.solve_if_needed(self.session, url, resp.text)

مولد التقرير

# report.py
from models import CompetitorDB


def generate_report(db: CompetitorDB, metrics):
    lines = ["=" * 60, "Competitor Analysis Report", "=" * 60, ""]

    for metric in metrics:
        results = db.latest_comparison(metric)
        if not results:
            continue

        lines.append(f"--- {metric.replace('_', ' ').title()} ---")
        for comp, value, numeric, ts in results:
            marker = ""
            if numeric is not None:
                marker = f" (${numeric:,.2f})" if "price" in metric else f" ({numeric:,.0f})"
            lines.append(f"  {comp}: {value}{marker}")
        lines.append("")

    return "\n".join(lines)


def generate_trend(db: CompetitorDB, competitor, metric, periods=10):
    history = db.get_history(competitor, metric, limit=periods)
    if not history:
        return f"No data for {competitor} — {metric}"

    lines = [f"Trend: {competitor} — {metric}", "-" * 40]
    for value, numeric, ts in reversed(history):
        date = ts[:10]
        lines.append(f"  {date}: {value}")

    return "\n".join(lines)

العداء الرئيسي

# main.py
import time
from models import CompetitorDB
from scraper import CompetitorScraper
from report import generate_report

COMPETITORS = [
    {
        "name": "Competitor A",
        "pricing_url": "https://competitor-a.example.com/pricing",
        "plan_selector": ".pricing-plan",
        "price_selector": ".price",
    },
    {
        "name": "Competitor B",
        "pricing_url": "https://competitor-b.example.com/pricing",
        "plan_selector": ".plan-card",
        "price_selector": ".plan-price",
    },
]


def main():
    db = CompetitorDB()
    scraper = CompetitorScraper()

    for comp in COMPETITORS:
        print(f"Scraping {comp['name']}...")

        try:
            pricing = scraper.scrape_pricing(
                comp["name"], comp["pricing_url"],
                comp["plan_selector"], comp["price_selector"],
            )
            for p in pricing:
                db.save(p)
                print(f"  {p.metric}: {p.value}")
        except Exception as e:
            print(f"  Error: {e}")

        time.sleep(5)

    # Generate report
    metrics = ["price_basic", "price_pro", "price_enterprise", "product_count"]
    report = generate_report(db, metrics)
    print(report)

    with open("competitor_report.txt", "w") as f:
        f.write(report)


if __name__ == "__main__":
    main()

استكشاف الأخطاء وإصلاحها

المشكلة السبب الإجراء
الأسعار لم يتم استخراجها عدم تطابق المحدد فحص HTML للصفحة وتحديث المحددات لكل منافس
البيانات التاريخية مفقودة التشغيل الأول تتراكم البيانات؛ تشغيل يوميا لرؤية الاتجاه
CAPTCHA على صفحة التسعير كشف البوت أضف التأخيرات واستخدم ملفات تعريف الارتباط الخاصة بالجلسة
يظهر التقرير بيانات قديمة تم إعادة إدخال نفس الإدخال استخدم latest_comparison الذي يتم تجميعه حسب تاريخ MAX

الأسئلة الشائعة

كيف يمكنني تصور الاتجاهات؟

قم بتصدير البيانات من SQLite ورسمها باستخدام matplotlib، أو قم بتوجيه مخرجات CSV إلى جداول بيانات Google للحصول على رسوم بيانية مدمجة.

هل يمكنني تتبع المقاييس غير التسعيرية؟

نعم. استخدم scrape_features لقوائم الميزات أو scrape_product_count لأحجام الكتالوج. أضف كاشطات مخصصة لأي مقياس.

كيف أحصل على تنبيهات بشأن تغيرات الأسعار؟

قارن الأسعار المسروقة اليوم مع القيم المخزنة بالأمس وأرسل تنبيهات (Slack/email) عندما يتجاوز الفرق الحد الأدنى.


أدلة ذات صلة


التعليقات غير مفعّلة لهذا المقال.