استخرج أسعار المنافسين وقوائم منتجاتهم وصفحات ميزاتهم. خزّن البيانات التاريخية وأنشئ تقارير مقارنة منتظمة.
الهندسة المعمارية
Competitor Sites ──> CAPTCHA Solver ──> Data Extractors
│
SQLite Store
│
Dashboard Report
نماذج البيانات
# models.py
import sqlite3
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
@dataclass
class CompetitorData:
competitor: str
metric: str
value: str
numeric_value: Optional[float] = None
url: str = ""
scraped_at: str = ""
def __post_init__(self):
if not self.scraped_at:
self.scraped_at = datetime.now().isoformat()
class CompetitorDB:
def __init__(self, path="competitor_data.db"):
self.conn = sqlite3.connect(path)
self._init()
def _init(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
competitor TEXT,
metric TEXT,
value TEXT,
numeric_value REAL,
url TEXT,
scraped_at TEXT
)
""")
self.conn.commit()
def save(self, data: CompetitorData):
self.conn.execute(
"""INSERT INTO metrics
(competitor, metric, value, numeric_value, url, scraped_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(data.competitor, data.metric, data.value,
data.numeric_value, data.url, data.scraped_at),
)
self.conn.commit()
def get_history(self, competitor, metric, limit=30):
cursor = self.conn.execute(
"""SELECT value, numeric_value, scraped_at
FROM metrics
WHERE competitor = ? AND metric = ?
ORDER BY scraped_at DESC LIMIT ?""",
(competitor, metric, limit),
)
return cursor.fetchall()
def latest_comparison(self, metric):
cursor = self.conn.execute(
"""SELECT competitor, value, numeric_value, MAX(scraped_at) as latest
FROM metrics WHERE metric = ?
GROUP BY competitor ORDER BY numeric_value""",
(metric,),
)
return cursor.fetchall()
حلّال CAPTCHA
# solver.py
import requests
import time
import re
import os
class CaptchaSolver:
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_API_KEY"]
def solve_if_needed(self, session, url, html):
if "data-sitekey" not in html:
return html
match = re.search(r'data-sitekey="([^"]+)"', html)
if not match:
return html
sitekey = match.group(1)
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": url,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
post_resp = session.post(url, data={
"g-recaptcha-response": data["request"],
}, timeout=30)
return post_resp.text
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("CAPTCHA solve timeout")
مكشطة المنافس
# scraper.py
import requests
import re
from bs4 import BeautifulSoup
from solver import CaptchaSolver
from models import CompetitorData
class CompetitorScraper:
def __init__(self):
self.solver = CaptchaSolver()
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def scrape_pricing(self, competitor_name, url, plan_selector, price_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
plans = soup.select(plan_selector)
data = []
for plan in plans:
name_el = plan.select_one("h3, h2, .plan-name")
price_el = plan.select_one(price_selector)
if not name_el or not price_el:
continue
price_text = price_el.get_text(strip=True)
match = re.search(r'[\d,.]+', price_text)
numeric = float(match.group().replace(",", "")) if match else None
data.append(CompetitorData(
competitor=competitor_name,
metric=f"price_{name_el.get_text(strip=True).lower().replace(' ', '_')}",
value=price_text,
numeric_value=numeric,
url=url,
))
return data
def scrape_features(self, competitor_name, url, feature_list_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
features = soup.select(f"{feature_list_selector} li")
return [
CompetitorData(
competitor=competitor_name,
metric="feature",
value=f.get_text(strip=True),
url=url,
)
for f in features if f.get_text(strip=True)
]
def scrape_product_count(self, competitor_name, url, count_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
el = soup.select_one(count_selector)
if el:
text = el.get_text(strip=True)
match = re.search(r'[\d,]+', text)
if match:
count = int(match.group().replace(",", ""))
return CompetitorData(
competitor=competitor_name,
metric="product_count",
value=text,
numeric_value=count,
url=url,
)
return None
def _fetch(self, url):
resp = self.session.get(url, timeout=20)
return self.solver.solve_if_needed(self.session, url, resp.text)
مولد التقرير
# report.py
from models import CompetitorDB
def generate_report(db: CompetitorDB, metrics):
lines = ["=" * 60, "Competitor Analysis Report", "=" * 60, ""]
for metric in metrics:
results = db.latest_comparison(metric)
if not results:
continue
lines.append(f"--- {metric.replace('_', ' ').title()} ---")
for comp, value, numeric, ts in results:
marker = ""
if numeric is not None:
marker = f" (${numeric:,.2f})" if "price" in metric else f" ({numeric:,.0f})"
lines.append(f" {comp}: {value}{marker}")
lines.append("")
return "\n".join(lines)
def generate_trend(db: CompetitorDB, competitor, metric, periods=10):
history = db.get_history(competitor, metric, limit=periods)
if not history:
return f"No data for {competitor} — {metric}"
lines = [f"Trend: {competitor} — {metric}", "-" * 40]
for value, numeric, ts in reversed(history):
date = ts[:10]
lines.append(f" {date}: {value}")
return "\n".join(lines)
العداء الرئيسي
# main.py
import time
from models import CompetitorDB
from scraper import CompetitorScraper
from report import generate_report
COMPETITORS = [
{
"name": "Competitor A",
"pricing_url": "https://competitor-a.example.com/pricing",
"plan_selector": ".pricing-plan",
"price_selector": ".price",
},
{
"name": "Competitor B",
"pricing_url": "https://competitor-b.example.com/pricing",
"plan_selector": ".plan-card",
"price_selector": ".plan-price",
},
]
def main():
db = CompetitorDB()
scraper = CompetitorScraper()
for comp in COMPETITORS:
print(f"Scraping {comp['name']}...")
try:
pricing = scraper.scrape_pricing(
comp["name"], comp["pricing_url"],
comp["plan_selector"], comp["price_selector"],
)
for p in pricing:
db.save(p)
print(f" {p.metric}: {p.value}")
except Exception as e:
print(f" Error: {e}")
time.sleep(5)
# Generate report
metrics = ["price_basic", "price_pro", "price_enterprise", "product_count"]
report = generate_report(db, metrics)
print(report)
with open("competitor_report.txt", "w") as f:
f.write(report)
if __name__ == "__main__":
main()
استكشاف الأخطاء وإصلاحها
| المشكلة | السبب | الإجراء |
|---|---|---|
| الأسعار لم يتم استخراجها | عدم تطابق المحدد | فحص HTML للصفحة وتحديث المحددات لكل منافس |
| البيانات التاريخية مفقودة | التشغيل الأول | تتراكم البيانات؛ تشغيل يوميا لرؤية الاتجاه |
| CAPTCHA على صفحة التسعير | كشف البوت | أضف التأخيرات واستخدم ملفات تعريف الارتباط الخاصة بالجلسة |
| يظهر التقرير بيانات قديمة | تم إعادة إدخال نفس الإدخال | استخدم latest_comparison الذي يتم تجميعه حسب تاريخ MAX |
الأسئلة الشائعة
كيف يمكنني تصور الاتجاهات؟
قم بتصدير البيانات من SQLite ورسمها باستخدام matplotlib، أو قم بتوجيه مخرجات CSV إلى جداول بيانات Google للحصول على رسوم بيانية مدمجة.
هل يمكنني تتبع المقاييس غير التسعيرية؟
نعم. استخدم scrape_features لقوائم الميزات أو scrape_product_count لأحجام الكتالوج. أضف كاشطات مخصصة لأي مقياس.
كيف أحصل على تنبيهات بشأن تغيرات الأسعار؟
قارن الأسعار المسروقة اليوم مع القيم المخزنة بالأمس وأرسل تنبيهات (Slack/email) عندما يتجاوز الفرق الحد الأدنى.
أدلة ذات صلة
- تتبع المنافسين على نطاق واسع -ابدأ بـ CaptchaAI.*