Administrator
发布于 2025-10-09 / 10 阅读
0

HAProxy 自动化扫描与负载均衡

HAProxy 自动化扫描与负载均衡

环境准备

安装haproxy

HAProxy是一个使用C语言编写的自由及开放源代码软件,其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。


# 安装
sudo apt update sudo apt install haproxy -y
# 启用
sudo systemctl enable haproxy 
sudo systemctl start haproxy

Python安装 requests 库

pip install requests

文件部署

  1. 创建工作目录

    mkdir ~/proxy-finder
    cd ~/proxy-finder
  2. 创建文件

    /home/your_user/proxy-finder/
    ├── config.ini
    ├── scanner.py
    └── update.sh
  3. 赋予执行权限

    chmod +x update.sh

config.ini

脚本的配置文件

[network]
# 要扫描的网络段
network = 10.16.0.0/17,10.17.0.0/17
; network = 10.16.0.0/17
# 要扫描的端口列表,用逗号分隔
ports = 7890,7897
# 端口扫描超时时间(秒)
timeout = 1
​
[proxy_test]
# 用于测速的大文件URL
download_test_url = https://huggingface.co/microsoft/VibeVoice-1.5B/resolve/main/model-00001-of-00003.safetensors
# 下载测试的数据块大小(字节)
download_chunk_size_bytes = 52428800
# 测速超时时间(秒)
test_timeout_seconds = 20
# 测试时的并发数
test_max_workers = 50
​
[haproxy]
# HAProxy 配置文件路径
config_path = /etc/haproxy/haproxy.cfg
# HAProxy 前端监听端口
frontend_port = 9527
# 要使用的最快代理数量
top_k_proxies = 20
​
[scanning]
# 扫描时的最大并发数
max_workers = 1000
# 进度报告间隔
progress_interval = 1000

scanner.py

  1. 扫描:在指定的 IP 网段中,批量寻找并发现潜在的代理服务器。

  2. 测试:验证这些代理是否可用,并测试其真实速度,然后按速度从快到慢排序。

  3. 配置:自动将最快的一批代理写入 HAProxy 配置文件,创建一个统一、高速且能自动切换故障节点的代理入口。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
​
import socket
import threading
import time
import requests
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import configparser
import os
​
class ProxyScanner:
    def __init__(self, config_file="config.ini"):
        """从配置文件初始化"""
        self.config = self._load_config(config_file)
        self.open_ports = []
        self.working_proxies = []
        self.lock = threading.Lock()
    
    def _load_config(self, config_file):
        """加载配置文件"""
        if not os.path.exists(config_file):
            print(f"错误:配置文件 '{config_file}' 不存在")
            sys.exit(1)
        
        config = configparser.ConfigParser()
        try:
            config.read(config_file, encoding='utf-8')
            
            # 验证必需的配置节是否存在
            required_sections = ['network', 'proxy_test', 'haproxy', 'scanning']
            for section in required_sections:
                if not config.has_section(section):
                    print(f"错误:配置文件缺少必需的节 '[{section}]'")
                    sys.exit(1)
            
            print(f"配置文件 '{config_file}' 加载成功")
            return config
            
        except Exception as e:
            print(f"错误:读取配置文件 '{config_file}' 失败: {e}")
            sys.exit(1)
    
    def _get_ports_list(self):
        """从配置中获取端口列表"""
        ports_str = self.config.get('network', 'ports')
        return [int(port.strip()) for port in ports_str.split(',')]
    
    def scan_port(self, ip, port):
        """扫描单个IP的指定端口"""
        try:
            timeout = self.config.getfloat('network', 'timeout')
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(timeout)
                result = sock.connect_ex((str(ip), port))
                if result == 0:
                    with self.lock:
                        proxy_url = f"{ip}:{port}"
                        self.open_ports.append(proxy_url)
                        print(f"[+] 发现开放端口: {proxy_url}")
                    return True
        except Exception:
            pass
        return False
    
    def scan_network(self):
        """扫描多个网段"""
        network_str = self.config.get('network', 'network')
        ports = self._get_ports_list()
        
        # 支持多个网段,用逗号分隔
        networks = [net.strip() for net in network_str.split(',')]
        
        print(f"开始扫描网段: {networks} | 目标端口: {ports}")
        
        all_hosts = []
        total_ips = 0
        
        # 解析所有网段
        for network_item in networks:
            try:
                network = ipaddress.IPv4Network(network_item, strict=False)
                hosts = list(network.hosts())
                all_hosts.extend(hosts)
                total_ips += len(hosts)
                print(f"网段 {network_item}: {len(hosts)} 个IP")
            except ValueError as e:
                print(f"错误:无效的网段 '{network_item}': {e}")
                continue
        
        if not all_hosts:
            print("错误:没有有效的网段可扫描")
            sys.exit(1)
        
        print(f"总计需要扫描 {total_ips} 个IP地址")
        
        max_workers = self.config.getint('scanning', 'max_workers')
        progress_interval = self.config.getint('scanning', 'progress_interval')
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(self.scan_port, ip, port) 
                    for ip in all_hosts for port in ports]
            
            completed = 0
            total_futures = len(futures)
            for future in as_completed(futures):
                completed += 1
                if completed % progress_interval == 0 or completed == total_futures:
                    print(f"已完成扫描: {completed}/{total_futures}")
        
        print(f"\n扫描完成!发现 {len(self.open_ports)} 个开放端口")
        return self.open_ports
​
    def test_proxy(self, proxy_url):
        """测试代理的速度,并返回 (是否可用, 速度 Mbps)"""
        proxies = {
            'http': f'http://{proxy_url}',
            'https': f'http://{proxy_url}'
        }
        
        try:
            download_url = self.config.get('proxy_test', 'download_test_url')
            chunk_size = self.config.getint('proxy_test', 'download_chunk_size_bytes')
            timeout = self.config.getint('proxy_test', 'test_timeout_seconds')
            
            start_time = time.time()
            downloaded_size = 0
            
            with requests.get(
                download_url,
                proxies=proxies,
                timeout=timeout,
                stream=True,
                headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
            ) as response:
                response.raise_for_status()
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        downloaded_size += len(chunk)
                        if downloaded_size >= chunk_size:
                            break
            
            end_time = time.time()
            duration = end_time - start_time
            
            if duration == 0:
                return (False, 0)
            
            speed_bytes_per_sec = downloaded_size / duration
            speed_mbps = (speed_bytes_per_sec * 8) / (1024 * 1024)
            
            print(f"[✓] 代理可用: {proxy_url} - 速度: {speed_mbps:.2f} Mbps")
            return (True, speed_mbps)
            
        except requests.exceptions.RequestException:
            pass
        except Exception:
            pass
        
        return (False, 0)
    
    def test_all_proxies(self):
        """测试所有发现的代理并按速度排序"""
        if not self.open_ports:
            print("没有发现开放的端口,无法测试代理")
            return []
        
        print(f"\n开始测试 {len(self.open_ports)} 个代理的速度...")
        
        max_workers = self.config.getint('proxy_test', 'test_max_workers')
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_proxy = {executor.submit(self.test_proxy, proxy): proxy 
                             for proxy in self.open_ports}
            
            for future in as_completed(future_to_proxy):
                proxy_url = future_to_proxy[future]
                try:
                    is_working, speed = future.result()
                    if is_working:
                        with self.lock:
                            self.working_proxies.append({'proxy': proxy_url, 'speed': speed})
                except Exception as e:
                    print(f"测试 {proxy_url} 时产生意外错误: {e}")
        
        if self.working_proxies:
            self.working_proxies.sort(key=lambda x: x['speed'], reverse=True)
        
        print(f"\n代理测试完成!发现 {len(self.working_proxies)} 个可用代理")
        return self.working_proxies
    
    def generate_haproxy_config(self):
            """根据配置生成 HAProxy 配置文件"""
            if not self.working_proxies:
                print("没有可用的代理,无法生成 HAProxy 配置。")
                return False
            
            k = self.config.getint('haproxy', 'top_k_proxies')
            frontend_port = self.config.getint('haproxy', 'frontend_port')
            config_path = self.config.get('haproxy', 'config_path')
            
            top_k_proxies = self.working_proxies[:k]
            print(f"将使用最快的 {len(top_k_proxies)} 个代理来生成配置...")
            
            # 这是修正后的模板,拥有了正确的均衡算法和健康检查逻辑
            config_template = f"""
global
    log /dev/log local0
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
    stats timeout 30s
    user haproxy
    group haproxy
    daemon
​
defaults
    log global
    mode tcp
    option tcplog
    option dontlognull
    timeout connect 5000
    timeout client 50000
    timeout server 50000
    # 增加重试机制
    retries 3
    option redispatch
​
frontend clash_proxy_frontend
    bind *:{frontend_port}
    default_backend clash_proxy_backend
    
listen stats
    bind *:9000
    mode http
    stats enable
    stats uri /stats
    stats refresh 10s
    stats admin if LOCALHOST
​
backend clash_proxy_backend
    balance leastconn
    
    # --- 这是修正后的、更可靠的健康检查 ---
    option httpchk
    http-check send meth GET uri http://www.google.com/generate_204 ver HTTP/1.1
    http-check expect status 204
    # --- 健康检查配置结束 ---
​
"""
            
            backend_servers = ""
            for i, proxy_info in enumerate(top_k_proxies):
                proxy_ip_port = proxy_info['proxy']
                # 这是修正后的循环,为每一个动态生成的服务器都加上了稳定性参数
                backend_servers += f"    server server{i+1} {proxy_ip_port} check inter 2s fall 3 rise 2\n"
            final_config = config_template + backend_servers
            
            try:
                with open(config_path, 'w') as f:
                    f.write(final_config)
                print(f"HAProxy 配置文件已成功生成到: {config_path}")
                return True
            except Exception as e:
                print(f"错误: 写入 HAProxy 配置文件 ({config_path}) 时出错: {e}")
                sys.exit(1)
    
    def run(self):
        """运行完整的扫描、测试和配置生成流程"""
        print("=" * 60)
        print("代理扫描及 HAProxy 配置生成器启动")
        print("=" * 60)
        
        start_time = time.time()
        
        self.scan_network()
        
        if not self.open_ports:
            print("未发现任何开放端口,程序结束")
            return
        
        self.test_all_proxies()
        
        if self.working_proxies:
            if not self.generate_haproxy_config():
                print("生成 HAProxy 配置失败。")
        else:
            print("\n未发现可用代理,不更新 HAProxy 配置")
        
        end_time = time.time()
        print(f"\n总耗时: {end_time - start_time:.2f} 秒")
        print("=" * 60)
​
def main():
    """主函数"""
    try:
        # 配置文件路径可以通过命令行参数传递
        config_file = sys.argv[1] if len(sys.argv) > 1 else "config.ini"
        scanner = ProxyScanner(config_file)
        scanner.run()
        
    except KeyboardInterrupt:
        print("\n\n用户中断程序")
        sys.exit(1)
    except Exception as e:
        print(f"\n程序执行出错: {e}")
        sys.exit(1)
​
if __name__ == "__main__":
    main()

update.sh

  1. 运行 Python 脚本来扫描并直接生成新的 HAProxy 配置文件

  2. 验证新生成的配置文件的语法是否正确

  3. 重载 HAProxy 服务

#!/bin/bash
# 脚本出错时立即退出
set -e
​
# --- 配置文件路径 ---
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="${SCRIPT_DIR}/config.ini"
SCANNER_SCRIPT="${SCRIPT_DIR}/scanner.py"
​
# 检查配置文件是否存在
if [[ ! -f "${CONFIG_FILE}" ]]; then
    echo "错误:配置文件 ${CONFIG_FILE} 不存在!"
    exit 1
fi
​
# 从配置文件读取 HAProxy 配置路径
HAPROXY_CONFIG=$(python3 -c "
import configparser
config = configparser.ConfigParser()
config.read('${CONFIG_FILE}')
print(config.get('haproxy', 'config_path'))
")
​
echo "============================================="
echo "开始更新 Clash 代理池 - $(date)"
echo "使用配置文件: ${CONFIG_FILE}"
echo "============================================="
​
# 1. 运行 Python 脚本来扫描并直接生成新的 HAProxy 配置文件
echo "[Step 1/3] 正在运行代理扫描器以生成新配置..."
if ! python3 "${SCANNER_SCRIPT}" "${CONFIG_FILE}"; then
    echo "错误:代理扫描器执行失败!"
    exit 1
fi
​
# 2. 验证新生成的配置文件的语法是否正确
echo "[Step 2/3] 正在验证新的 HAProxy 配置文件 (${HAPROXY_CONFIG})..."
if ! haproxy -c -f "${HAPROXY_CONFIG}"; then
    echo "错误:新生成的 HAProxy 配置文件无效!放弃本次更新。请检查 ${SCANNER_SCRIPT} 的输出。"
    exit 1
fi
echo "配置文件验证通过。"
​
# 3. 重载 HAProxy 服务
echo "[Step 3/3] 正在重载 HAProxy 服务..."
if ! sudo systemctl reload haproxy; then
    echo "错误:HAProxy 重载失败!请使用 'sudo journalctl -u haproxy' 检查日志。"
    exit 1
fi
​
echo "HAProxy 已成功使用新的代理列表重载!"
echo "更新完成。"

运行

手动执行

# 切换到工作目录
cd ~/proxy-finder
​
# 执行更新脚本
# 由于脚本内部会重载haproxy服务,因此需要sudo权限
sudo ./update.sh

客户端配置

# HAPROXY_IP 替换为你的服务器IP
# HAPROXY_PORT 替换为配置文件中的端口
export http_proxy="http://<HAPROXY_IP>:HAPROXY_PORT"
export https_proxy="http://<HAPROXY_IP>:HAPROXY_PORT"
​
# 测试一下
curl www.google.com

自动化定时更新

  1. 编辑crontab

    sudo crontab -e
  2. 添加定时任务:

    # 每天凌晨 02:15 运行代理更新脚本
    15 02 * * * /home/your_user/proxy-finder/update.sh >> /var/log/update_clash_proxies.log 2>&1

/home/your_user/proxy-finder/update.sh: 必须使用绝对路径

监控与验证

HAProxy 状态页面: 脚本生成的 haproxy.cfg 默认启用了一个统计页面。通过浏览器访问 http://<HAProxy_IP>:9000/stats,你可以看到:

  • Frontend (clash_proxy_frontend): 前端监听状态。

  • Backend (clash_proxy_backend): 后端代理服务器列表。

  • 状态 (Status):每个后端代理的状态会实时显示。UP 表示健康,DOWN 表示健康检查失败已被禁用。

  • 会话 (Session):可以看到流量在各个代理上的分布情况。

检查日志

系统日志

sudo journalctl -u haproxy -f

脚本日志

tail -f /var/log/proxy_updater.log