HAProxy 自动化扫描与负载均衡
环境准备
安装haproxy
HAProxy是一个使用C语言编写的自由及开放源代码软件,其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。
# 安装
sudo apt update sudo apt install haproxy -y
# 启用
sudo systemctl enable haproxy
sudo systemctl start haproxyPython安装 requests 库
pip install requests文件部署
创建工作目录:
mkdir ~/proxy-finder cd ~/proxy-finder创建文件:
/home/your_user/proxy-finder/ ├── config.ini ├── scanner.py └── update.sh赋予执行权限:
chmod +x update.sh
config.ini
脚本的配置文件
[network]
# 要扫描的网络段
network = 10.16.0.0/17,10.17.0.0/17
; network = 10.16.0.0/17
# 要扫描的端口列表,用逗号分隔
ports = 7890,7897
# 端口扫描超时时间(秒)
timeout = 1
[proxy_test]
# 用于测速的大文件URL
download_test_url = https://huggingface.co/microsoft/VibeVoice-1.5B/resolve/main/model-00001-of-00003.safetensors
# 下载测试的数据块大小(字节)
download_chunk_size_bytes = 52428800
# 测速超时时间(秒)
test_timeout_seconds = 20
# 测试时的并发数
test_max_workers = 50
[haproxy]
# HAProxy 配置文件路径
config_path = /etc/haproxy/haproxy.cfg
# HAProxy 前端监听端口
frontend_port = 9527
# 要使用的最快代理数量
top_k_proxies = 20
[scanning]
# 扫描时的最大并发数
max_workers = 1000
# 进度报告间隔
progress_interval = 1000scanner.py
扫描:在指定的 IP 网段中,批量寻找并发现潜在的代理服务器。
测试:验证这些代理是否可用,并测试其真实速度,然后按速度从快到慢排序。
配置:自动将最快的一批代理写入 HAProxy 配置文件,创建一个统一、高速且能自动切换故障节点的代理入口。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import threading
import time
import requests
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import configparser
import os
class ProxyScanner:
def __init__(self, config_file="config.ini"):
"""从配置文件初始化"""
self.config = self._load_config(config_file)
self.open_ports = []
self.working_proxies = []
self.lock = threading.Lock()
def _load_config(self, config_file):
"""加载配置文件"""
if not os.path.exists(config_file):
print(f"错误:配置文件 '{config_file}' 不存在")
sys.exit(1)
config = configparser.ConfigParser()
try:
config.read(config_file, encoding='utf-8')
# 验证必需的配置节是否存在
required_sections = ['network', 'proxy_test', 'haproxy', 'scanning']
for section in required_sections:
if not config.has_section(section):
print(f"错误:配置文件缺少必需的节 '[{section}]'")
sys.exit(1)
print(f"配置文件 '{config_file}' 加载成功")
return config
except Exception as e:
print(f"错误:读取配置文件 '{config_file}' 失败: {e}")
sys.exit(1)
def _get_ports_list(self):
"""从配置中获取端口列表"""
ports_str = self.config.get('network', 'ports')
return [int(port.strip()) for port in ports_str.split(',')]
def scan_port(self, ip, port):
"""扫描单个IP的指定端口"""
try:
timeout = self.config.getfloat('network', 'timeout')
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((str(ip), port))
if result == 0:
with self.lock:
proxy_url = f"{ip}:{port}"
self.open_ports.append(proxy_url)
print(f"[+] 发现开放端口: {proxy_url}")
return True
except Exception:
pass
return False
def scan_network(self):
"""扫描多个网段"""
network_str = self.config.get('network', 'network')
ports = self._get_ports_list()
# 支持多个网段,用逗号分隔
networks = [net.strip() for net in network_str.split(',')]
print(f"开始扫描网段: {networks} | 目标端口: {ports}")
all_hosts = []
total_ips = 0
# 解析所有网段
for network_item in networks:
try:
network = ipaddress.IPv4Network(network_item, strict=False)
hosts = list(network.hosts())
all_hosts.extend(hosts)
total_ips += len(hosts)
print(f"网段 {network_item}: {len(hosts)} 个IP")
except ValueError as e:
print(f"错误:无效的网段 '{network_item}': {e}")
continue
if not all_hosts:
print("错误:没有有效的网段可扫描")
sys.exit(1)
print(f"总计需要扫描 {total_ips} 个IP地址")
max_workers = self.config.getint('scanning', 'max_workers')
progress_interval = self.config.getint('scanning', 'progress_interval')
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.scan_port, ip, port)
for ip in all_hosts for port in ports]
completed = 0
total_futures = len(futures)
for future in as_completed(futures):
completed += 1
if completed % progress_interval == 0 or completed == total_futures:
print(f"已完成扫描: {completed}/{total_futures}")
print(f"\n扫描完成!发现 {len(self.open_ports)} 个开放端口")
return self.open_ports
def test_proxy(self, proxy_url):
"""测试代理的速度,并返回 (是否可用, 速度 Mbps)"""
proxies = {
'http': f'http://{proxy_url}',
'https': f'http://{proxy_url}'
}
try:
download_url = self.config.get('proxy_test', 'download_test_url')
chunk_size = self.config.getint('proxy_test', 'download_chunk_size_bytes')
timeout = self.config.getint('proxy_test', 'test_timeout_seconds')
start_time = time.time()
downloaded_size = 0
with requests.get(
download_url,
proxies=proxies,
timeout=timeout,
stream=True,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
if chunk:
downloaded_size += len(chunk)
if downloaded_size >= chunk_size:
break
end_time = time.time()
duration = end_time - start_time
if duration == 0:
return (False, 0)
speed_bytes_per_sec = downloaded_size / duration
speed_mbps = (speed_bytes_per_sec * 8) / (1024 * 1024)
print(f"[✓] 代理可用: {proxy_url} - 速度: {speed_mbps:.2f} Mbps")
return (True, speed_mbps)
except requests.exceptions.RequestException:
pass
except Exception:
pass
return (False, 0)
def test_all_proxies(self):
"""测试所有发现的代理并按速度排序"""
if not self.open_ports:
print("没有发现开放的端口,无法测试代理")
return []
print(f"\n开始测试 {len(self.open_ports)} 个代理的速度...")
max_workers = self.config.getint('proxy_test', 'test_max_workers')
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_proxy = {executor.submit(self.test_proxy, proxy): proxy
for proxy in self.open_ports}
for future in as_completed(future_to_proxy):
proxy_url = future_to_proxy[future]
try:
is_working, speed = future.result()
if is_working:
with self.lock:
self.working_proxies.append({'proxy': proxy_url, 'speed': speed})
except Exception as e:
print(f"测试 {proxy_url} 时产生意外错误: {e}")
if self.working_proxies:
self.working_proxies.sort(key=lambda x: x['speed'], reverse=True)
print(f"\n代理测试完成!发现 {len(self.working_proxies)} 个可用代理")
return self.working_proxies
def generate_haproxy_config(self):
"""根据配置生成 HAProxy 配置文件"""
if not self.working_proxies:
print("没有可用的代理,无法生成 HAProxy 配置。")
return False
k = self.config.getint('haproxy', 'top_k_proxies')
frontend_port = self.config.getint('haproxy', 'frontend_port')
config_path = self.config.get('haproxy', 'config_path')
top_k_proxies = self.working_proxies[:k]
print(f"将使用最快的 {len(top_k_proxies)} 个代理来生成配置...")
# 这是修正后的模板,拥有了正确的均衡算法和健康检查逻辑
config_template = f"""
global
log /dev/log local0
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
stats timeout 30s
user haproxy
group haproxy
daemon
defaults
log global
mode tcp
option tcplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000
# 增加重试机制
retries 3
option redispatch
frontend clash_proxy_frontend
bind *:{frontend_port}
default_backend clash_proxy_backend
listen stats
bind *:9000
mode http
stats enable
stats uri /stats
stats refresh 10s
stats admin if LOCALHOST
backend clash_proxy_backend
balance leastconn
# --- 这是修正后的、更可靠的健康检查 ---
option httpchk
http-check send meth GET uri http://www.google.com/generate_204 ver HTTP/1.1
http-check expect status 204
# --- 健康检查配置结束 ---
"""
backend_servers = ""
for i, proxy_info in enumerate(top_k_proxies):
proxy_ip_port = proxy_info['proxy']
# 这是修正后的循环,为每一个动态生成的服务器都加上了稳定性参数
backend_servers += f" server server{i+1} {proxy_ip_port} check inter 2s fall 3 rise 2\n"
final_config = config_template + backend_servers
try:
with open(config_path, 'w') as f:
f.write(final_config)
print(f"HAProxy 配置文件已成功生成到: {config_path}")
return True
except Exception as e:
print(f"错误: 写入 HAProxy 配置文件 ({config_path}) 时出错: {e}")
sys.exit(1)
def run(self):
"""运行完整的扫描、测试和配置生成流程"""
print("=" * 60)
print("代理扫描及 HAProxy 配置生成器启动")
print("=" * 60)
start_time = time.time()
self.scan_network()
if not self.open_ports:
print("未发现任何开放端口,程序结束")
return
self.test_all_proxies()
if self.working_proxies:
if not self.generate_haproxy_config():
print("生成 HAProxy 配置失败。")
else:
print("\n未发现可用代理,不更新 HAProxy 配置")
end_time = time.time()
print(f"\n总耗时: {end_time - start_time:.2f} 秒")
print("=" * 60)
def main():
"""主函数"""
try:
# 配置文件路径可以通过命令行参数传递
config_file = sys.argv[1] if len(sys.argv) > 1 else "config.ini"
scanner = ProxyScanner(config_file)
scanner.run()
except KeyboardInterrupt:
print("\n\n用户中断程序")
sys.exit(1)
except Exception as e:
print(f"\n程序执行出错: {e}")
sys.exit(1)
if __name__ == "__main__":
main()update.sh
运行 Python 脚本来扫描并直接生成新的 HAProxy 配置文件
验证新生成的配置文件的语法是否正确
重载 HAProxy 服务
#!/bin/bash
# 脚本出错时立即退出
set -e
# --- 配置文件路径 ---
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="${SCRIPT_DIR}/config.ini"
SCANNER_SCRIPT="${SCRIPT_DIR}/scanner.py"
# 检查配置文件是否存在
if [[ ! -f "${CONFIG_FILE}" ]]; then
echo "错误:配置文件 ${CONFIG_FILE} 不存在!"
exit 1
fi
# 从配置文件读取 HAProxy 配置路径
HAPROXY_CONFIG=$(python3 -c "
import configparser
config = configparser.ConfigParser()
config.read('${CONFIG_FILE}')
print(config.get('haproxy', 'config_path'))
")
echo "============================================="
echo "开始更新 Clash 代理池 - $(date)"
echo "使用配置文件: ${CONFIG_FILE}"
echo "============================================="
# 1. 运行 Python 脚本来扫描并直接生成新的 HAProxy 配置文件
echo "[Step 1/3] 正在运行代理扫描器以生成新配置..."
if ! python3 "${SCANNER_SCRIPT}" "${CONFIG_FILE}"; then
echo "错误:代理扫描器执行失败!"
exit 1
fi
# 2. 验证新生成的配置文件的语法是否正确
echo "[Step 2/3] 正在验证新的 HAProxy 配置文件 (${HAPROXY_CONFIG})..."
if ! haproxy -c -f "${HAPROXY_CONFIG}"; then
echo "错误:新生成的 HAProxy 配置文件无效!放弃本次更新。请检查 ${SCANNER_SCRIPT} 的输出。"
exit 1
fi
echo "配置文件验证通过。"
# 3. 重载 HAProxy 服务
echo "[Step 3/3] 正在重载 HAProxy 服务..."
if ! sudo systemctl reload haproxy; then
echo "错误:HAProxy 重载失败!请使用 'sudo journalctl -u haproxy' 检查日志。"
exit 1
fi
echo "HAProxy 已成功使用新的代理列表重载!"
echo "更新完成。"运行
手动执行
# 切换到工作目录
cd ~/proxy-finder
# 执行更新脚本
# 由于脚本内部会重载haproxy服务,因此需要sudo权限
sudo ./update.sh客户端配置
# HAPROXY_IP 替换为你的服务器IP
# HAPROXY_PORT 替换为配置文件中的端口
export http_proxy="http://<HAPROXY_IP>:HAPROXY_PORT"
export https_proxy="http://<HAPROXY_IP>:HAPROXY_PORT"
# 测试一下
curl www.google.com自动化定时更新
编辑crontab:
sudo crontab -e添加定时任务:
# 每天凌晨 02:15 运行代理更新脚本 15 02 * * * /home/your_user/proxy-finder/update.sh >> /var/log/update_clash_proxies.log 2>&1
/home/your_user/proxy-finder/update.sh: 必须使用绝对路径
监控与验证
HAProxy 状态页面: 脚本生成的 haproxy.cfg 默认启用了一个统计页面。通过浏览器访问 http://<HAProxy_IP>:9000/stats,你可以看到:
Frontend (
clash_proxy_frontend): 前端监听状态。Backend (
clash_proxy_backend): 后端代理服务器列表。状态 (Status):每个后端代理的状态会实时显示。
UP表示健康,DOWN表示健康检查失败已被禁用。会话 (Session):可以看到流量在各个代理上的分布情况。
检查日志:
系统日志
sudo journalctl -u haproxy -f脚本日志
tail -f /var/log/proxy_updater.log