Files
configure_nginx_manager/letsencrypt_regru_api.py

1485 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Скрипт для создания и обновления SSL сертификата Let's Encrypt
с использованием DNS-валидации через API reg.ru
Автор: Фофанов Дмитрий
Дата: 27.10.2025
Описание:
Этот скрипт автоматизирует процесс получения и обновления SSL сертификатов
Let's Encrypt для доменов, зарегистрированных на reg.ru, используя DNS-01 challenge.
Скрипт напрямую работает с API reg.ru для управления DNS записями.
Требования:
- Python 3.6+
- requests
- certbot
- cryptography
Установка зависимостей:
pip install requests certbot cryptography
"""
import os
import sys
import json
import time
import logging
import argparse
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
try:
import requests
except ImportError:
print("ОШИБКА: Необходимо установить модуль 'requests'")
print("Выполните: pip install requests")
sys.exit(1)
try:
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtensionOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
except ImportError:
print("ОШИБКА: Необходимо установить модуль 'cryptography'")
print("Выполните: pip install cryptography")
sys.exit(1)
# ==============================================================================
# КОНФИГУРАЦИЯ
# ==============================================================================
# Настройки по умолчанию
DEFAULT_CONFIG = {
# Учетные данные API reg.ru
"regru_username": "your_username",
"regru_password": "your_password",
# Параметры домена
"domain": "dfv24.com",
"wildcard": True, # Создавать wildcard сертификат (*.domain.com)
# Email для уведомлений Let's Encrypt
"email": "admin@dfv24.com",
# Директории
"cert_dir": "/etc/letsencrypt/live",
"log_file": "/var/log/letsencrypt_regru.log",
# Параметры DNS
"dns_propagation_wait": 60, # Время ожидания распространения DNS (секунды)
"dns_check_attempts": 10, # Количество попыток проверки DNS
"dns_check_interval": 10, # Интервал между проверками DNS (секунды)
# Параметры обновления сертификата
"renewal_days": 30, # За сколько дней до истечения обновлять (по умолчанию 30)
# Настройки Nginx Proxy Manager
"npm_enabled": False, # Включить интеграцию с NPM
"npm_host": "http://192.168.10.14:81", # Адрес NPM
"npm_email": "admin@example.com", # Email для входа в NPM
"npm_password": "changeme", # Пароль NPM
}
# API endpoints для reg.ru
REGRU_API_URL = "https://api.reg.ru/api/regru2"
# ==============================================================================
# НАСТРОЙКА ЛОГИРОВАНИЯ
# ==============================================================================
def setup_logging(log_file: str, verbose: bool = False) -> logging.Logger:
"""
Настройка системы логирования
Args:
log_file: Путь к файлу лога
verbose: Режим подробного вывода
Returns:
Logger объект
"""
log_level = logging.DEBUG if verbose else logging.INFO
# Создаем директорию для логов, если не существует
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
# Настройка форматирования
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Создаем logger
logger = logging.getLogger('LetsEncrypt_RegRU')
logger.setLevel(log_level)
# Обработчик для файла
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
# Обработчик для консоли
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
# Добавляем обработчики
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# ==============================================================================
# КЛАСС ДЛЯ РАБОТЫ С API REG.RU
# ==============================================================================
class RegRuAPI:
"""Класс для работы с API reg.ru"""
def __init__(self, username: str, password: str, logger: logging.Logger):
"""
Инициализация API клиента
Args:
username: Имя пользователя reg.ru
password: Пароль reg.ru
logger: Logger объект
"""
self.username = username
self.password = password
self.logger = logger
self.session = requests.Session()
def _make_request(self, method: str, params: Dict) -> Dict:
"""
Выполнение запроса к API reg.ru
Args:
method: Название метода API
params: Параметры запроса
Returns:
Ответ API в формате dict
"""
url = f"{REGRU_API_URL}/{method}"
# Добавляем учетные данные к параметрам
params.update({
"username": self.username,
"password": self.password,
"output_format": "json"
})
try:
self.logger.debug(f"Отправка запроса к API: {method}")
response = self.session.post(url, data=params)
response.raise_for_status()
result = response.json()
if result.get("result") == "success":
self.logger.debug(f"Запрос {method} выполнен успешно")
return result
else:
error_msg = result.get("error_text", "Неизвестная ошибка")
self.logger.error(f"Ошибка API: {error_msg}")
raise Exception(f"API Error: {error_msg}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка HTTP запроса: {e}")
raise
def get_zone_records(self, domain: str) -> List[Dict]:
"""
Получение DNS записей домена
Args:
domain: Доменное имя
Returns:
Список DNS записей
"""
self.logger.info(f"Получение DNS записей для домена: {domain}")
params = {
"domain": domain,
}
result = self._make_request("zone/get_resource_records", params)
if "answer" in result and "records" in result["answer"]:
records = result["answer"]["records"]
self.logger.info(f"Получено {len(records)} DNS записей")
return records
else:
self.logger.warning("DNS записи не найдены")
return []
def add_txt_record(self, domain: str, subdomain: str, txt_value: str) -> bool:
"""
Добавление TXT записи для DNS валидации
Args:
domain: Основной домен
subdomain: Поддомен (например, _acme-challenge)
txt_value: Значение TXT записи
Returns:
True если успешно, False в противном случае
"""
self.logger.info(f"Добавление TXT записи: {subdomain}.{domain} = {txt_value}")
params = {
"domain": domain,
"subdomain": subdomain,
"text": txt_value,
"output_content_type": "plain"
}
try:
self._make_request("zone/add_txt", params)
self.logger.info("TXT запись успешно добавлена")
return True
except Exception as e:
self.logger.error(f"Не удалось добавить TXT запись: {e}")
return False
def remove_txt_record(self, domain: str, subdomain: str, txt_value: str) -> bool:
"""
Удаление TXT записи
Args:
domain: Основной домен
subdomain: Поддомен
txt_value: Значение TXT записи
Returns:
True если успешно, False в противном случае
"""
self.logger.info(f"Удаление TXT записи: {subdomain}.{domain}")
# Сначала получаем список всех записей
records = self.get_zone_records(domain)
# Ищем нужную TXT запись
record_id = None
for record in records:
if (record.get("rectype") == "TXT" and
record.get("subdomain") == subdomain and
record.get("text") == txt_value):
record_id = record.get("id")
break
if not record_id:
self.logger.warning("TXT запись для удаления не найдена")
return False
params = {
"domain": domain,
"record_id": record_id
}
try:
self._make_request("zone/remove_record", params)
self.logger.info("TXT запись успешно удалена")
return True
except Exception as e:
self.logger.error(f"Не удалось удалить TXT запись: {e}")
return False
# ==============================================================================
# КЛАСС ДЛЯ РАБОТЫ С NGINX PROXY MANAGER
# ==============================================================================
class NginxProxyManagerAPI:
"""Класс для работы с API Nginx Proxy Manager"""
def __init__(self, host: str, email: str, password: str, logger: logging.Logger):
"""
Инициализация API клиента NPM
Args:
host: URL адрес NPM (например, http://192.168.10.14:81)
email: Email для входа
password: Пароль
logger: Logger объект
"""
self.host = host.rstrip('/')
self.email = email
self.password = password
self.logger = logger
self.session = requests.Session()
self.token = None
def login(self) -> bool:
"""
Авторизация в Nginx Proxy Manager
Returns:
True если успешно
"""
url = f"{self.host}/api/tokens"
payload = {
"identity": self.email,
"secret": self.password
}
try:
self.logger.info("Авторизация в Nginx Proxy Manager...")
response = self.session.post(url, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
self.token = data.get("token")
if self.token:
# Устанавливаем токен в заголовки для последующих запросов
self.session.headers.update({
"Authorization": f"Bearer {self.token}"
})
self.logger.info("Авторизация в NPM успешна")
return True
else:
self.logger.error("Токен не получен при авторизации")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при авторизации в NPM: {e}")
return False
def get_certificates(self) -> List[Dict]:
"""
Получение списка сертификатов
Returns:
Список сертификатов
"""
url = f"{self.host}/api/nginx/certificates"
try:
self.logger.debug("Получение списка сертификатов из NPM...")
response = self.session.get(url, timeout=10)
response.raise_for_status()
certificates = response.json()
self.logger.debug(f"Получено {len(certificates)} сертификатов")
return certificates
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при получении списка сертификатов: {e}")
return []
def find_certificate_by_domain(self, domain: str) -> Optional[Dict]:
"""
Поиск сертификата по домену
Args:
domain: Доменное имя
Returns:
Данные сертификата или None
"""
certificates = self.get_certificates()
for cert in certificates:
domains = cert.get("domain_names", [])
if domain in domains or f"*.{domain}" in domains:
self.logger.debug(f"Найден существующий сертификат для {domain}")
return cert
return None
def upload_certificate(self, domain: str, cert_path: str, key_path: str,
chain_path: Optional[str] = None) -> Optional[Dict]:
"""
Загрузка нового сертификата в NPM
Args:
domain: Основной домен
cert_path: Путь к файлу сертификата
key_path: Путь к приватному ключу
chain_path: Путь к цепочке сертификатов (опционально)
Returns:
Данные созданного сертификата или None
"""
url = f"{self.host}/api/nginx/certificates"
try:
# Читаем файлы сертификатов
with open(cert_path, 'r') as f:
certificate = f.read()
with open(key_path, 'r') as f:
certificate_key = f.read()
# Если есть цепочка, объединяем с сертификатом
if chain_path and os.path.exists(chain_path):
with open(chain_path, 'r') as f:
chain = f.read()
# NPM ожидает fullchain (cert + chain)
certificate = certificate + "\n" + chain
# Формируем payload для API
payload = {
"provider": "other",
"nice_name": f"{domain} - Let's Encrypt",
"domain_names": [domain],
"certificate": certificate,
"certificate_key": certificate_key,
"meta": {
"letsencrypt_agree": True,
"dns_challenge": True,
"dns_provider": "reg_ru"
}
}
self.logger.info(f"Загрузка сертификата для {domain} в NPM...")
response = self.session.post(url, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
cert_id = result.get("id")
if cert_id:
self.logger.info(f"Сертификат успешно загружен в NPM (ID: {cert_id})")
return result
else:
self.logger.error("Не удалось получить ID созданного сертификата")
return None
except FileNotFoundError as e:
self.logger.error(f"Файл сертификата не найден: {e}")
return None
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при загрузке сертификата в NPM: {e}")
if hasattr(e.response, 'text'):
self.logger.error(f"Ответ сервера: {e.response.text}")
return None
def update_certificate(self, cert_id: int, cert_path: str, key_path: str,
chain_path: Optional[str] = None) -> bool:
"""
Обновление существующего сертификата
Args:
cert_id: ID сертификата в NPM
cert_path: Путь к файлу сертификата
key_path: Путь к приватному ключу
chain_path: Путь к цепочке сертификатов (опционально)
Returns:
True если успешно
"""
url = f"{self.host}/api/nginx/certificates/{cert_id}"
try:
# Читаем файлы сертификатов
with open(cert_path, 'r') as f:
certificate = f.read()
with open(key_path, 'r') as f:
certificate_key = f.read()
# Если есть цепочка, объединяем с сертификатом
if chain_path and os.path.exists(chain_path):
with open(chain_path, 'r') as f:
chain = f.read()
certificate = certificate + "\n" + chain
# Формируем payload для обновления
payload = {
"certificate": certificate,
"certificate_key": certificate_key
}
self.logger.info(f"Обновление сертификата ID {cert_id} в NPM...")
response = self.session.put(url, json=payload, timeout=30)
response.raise_for_status()
self.logger.info("Сертификат успешно обновлен в NPM")
return True
except FileNotFoundError as e:
self.logger.error(f"Файл сертификата не найден: {e}")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"Ошибка при обновлении сертификата в NPM: {e}")
if hasattr(e.response, 'text'):
self.logger.error(f"Ответ сервера: {e.response.text}")
return False
def sync_certificate(self, domain: str, cert_dir: str) -> bool:
"""
Синхронизация сертификата с NPM (создание или обновление)
Args:
domain: Доменное имя
cert_dir: Директория с сертификатами Let's Encrypt
Returns:
True если успешно
"""
# Пути к файлам сертификата
cert_path = os.path.join(cert_dir, "cert.pem")
key_path = os.path.join(cert_dir, "privkey.pem")
chain_path = os.path.join(cert_dir, "chain.pem")
fullchain_path = os.path.join(cert_dir, "fullchain.pem")
# Проверяем наличие файлов
if not os.path.exists(cert_path) or not os.path.exists(key_path):
self.logger.error(f"Файлы сертификата не найдены в {cert_dir}")
return False
# Авторизуемся в NPM
if not self.login():
return False
# Проверяем, существует ли уже сертификат для этого домена
existing_cert = self.find_certificate_by_domain(domain)
# Используем fullchain если доступен, иначе cert + chain
if os.path.exists(fullchain_path):
final_cert_path = fullchain_path
final_chain_path = None
else:
final_cert_path = cert_path
final_chain_path = chain_path if os.path.exists(chain_path) else None
if existing_cert:
# Обновляем существующий сертификат
cert_id = existing_cert.get("id")
self.logger.info(f"Обновление существующего сертификата (ID: {cert_id})")
return self.update_certificate(cert_id, final_cert_path, key_path, final_chain_path)
else:
# Создаем новый сертификат
self.logger.info("Создание нового сертификата в NPM")
result = self.upload_certificate(domain, final_cert_path, key_path, final_chain_path)
return result is not None
# ==============================================================================
# КЛАСС ДЛЯ ГЕНЕРАЦИИ ТЕСТОВЫХ СЕРТИФИКАТОВ
# ==============================================================================
class TestCertificateGenerator:
"""Класс для генерации самоподписанных тестовых SSL сертификатов"""
def __init__(self, logger: logging.Logger):
"""
Инициализация генератора тестовых сертификатов
Args:
logger: Logger объект
"""
self.logger = logger
def generate_self_signed_certificate(
self,
domain: str,
wildcard: bool = False,
output_dir: str = "/etc/letsencrypt/live",
validity_days: int = 90
) -> bool:
"""
Генерация самоподписанного SSL сертификата для тестирования
Args:
domain: Основной домен
wildcard: Создать wildcard сертификат
output_dir: Директория для сохранения сертификата
validity_days: Срок действия сертификата в днях (по умолчанию 90)
Returns:
True если сертификат создан успешно, False в противном случае
"""
try:
self.logger.info("=" * 80)
self.logger.info("ГЕНЕРАЦИЯ ТЕСТОВОГО САМОПОДПИСАННОГО СЕРТИФИКАТА")
self.logger.info("=" * 80)
self.logger.info(f"Домен: {domain}")
self.logger.info(f"Wildcard: {wildcard}")
self.logger.info(f"Срок действия: {validity_days} дней")
self.logger.info("⚠️ ВНИМАНИЕ: Это тестовый сертификат, не для production!")
# Создаем директорию для сертификата
cert_dir = os.path.join(output_dir, domain)
os.makedirs(cert_dir, exist_ok=True)
self.logger.info(f"Директория: {cert_dir}")
# Генерируем приватный ключ
self.logger.info("Генерация приватного ключа RSA 2048 бит...")
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Сохраняем приватный ключ
key_path = os.path.join(cert_dir, "privkey.pem")
with open(key_path, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
os.chmod(key_path, 0o600)
self.logger.info(f"✓ Приватный ключ сохранен: {key_path}")
# Подготовка данных для сертификата
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "RU"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Moscow"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Moscow"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Certificate"),
x509.NameAttribute(NameOID.COMMON_NAME, domain),
])
# Создаем список альтернативных имен (SAN)
san_list = [x509.DNSName(domain)]
if wildcard:
san_list.append(x509.DNSName(f"*.{domain}"))
# Генерируем сертификат
self.logger.info("Генерация самоподписанного сертификата...")
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=validity_days))
.add_extension(
x509.SubjectAlternativeName(san_list),
critical=False,
)
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
]),
critical=False,
)
.sign(private_key, hashes.SHA256(), default_backend())
)
# Сохраняем сертификат
cert_path = os.path.join(cert_dir, "cert.pem")
with open(cert_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
self.logger.info(f"✓ Сертификат сохранен: {cert_path}")
# Создаем fullchain (для самоподписанного это просто копия cert)
fullchain_path = os.path.join(cert_dir, "fullchain.pem")
with open(fullchain_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
self.logger.info(f"✓ Fullchain сохранен: {fullchain_path}")
# Создаем chain.pem (пустой для самоподписанного)
chain_path = os.path.join(cert_dir, "chain.pem")
with open(chain_path, "w") as f:
f.write("")
self.logger.info(f"✓ Chain файл создан: {chain_path}")
# Выводим информацию о сертификате
self.logger.info("")
self.logger.info("=" * 80)
self.logger.info("ИНФОРМАЦИЯ О СЕРТИФИКАТЕ")
self.logger.info("=" * 80)
self.logger.info(f"Домен: {domain}")
if wildcard:
self.logger.info(f"Wildcard: *.{domain}")
self.logger.info(f"Действителен с: {cert.not_valid_before}")
self.logger.info(f"Действителен до: {cert.not_valid_after}")
self.logger.info(f"Серийный номер: {cert.serial_number}")
self.logger.info("")
self.logger.info("📁 Файлы сертификата:")
self.logger.info(f" • Приватный ключ: {key_path}")
self.logger.info(f" • Сертификат: {cert_path}")
self.logger.info(f" • Fullchain: {fullchain_path}")
self.logger.info(f" • Chain: {chain_path}")
self.logger.info("")
self.logger.info("⚠️ ВНИМАНИЕ:")
self.logger.info(" Это самоподписанный тестовый сертификат!")
self.logger.info(" Браузеры будут показывать предупреждение о безопасности.")
self.logger.info(" Используйте ТОЛЬКО для тестирования и разработки!")
self.logger.info(" Для production используйте настоящие Let's Encrypt сертификаты.")
self.logger.info("=" * 80)
return True
except Exception as e:
self.logger.error(f"Ошибка при генерации тестового сертификата: {e}")
import traceback
self.logger.error(traceback.format_exc())
return False
# ==============================================================================
# КЛАСС ДЛЯ РАБОТЫ С CERTBOT
# ==============================================================================
class LetsEncryptManager:
"""Класс для управления сертификатами Let's Encrypt"""
def __init__(self, config: Dict, api: RegRuAPI, logger: logging.Logger):
"""
Инициализация менеджера сертификатов
Args:
config: Конфигурация
api: API клиент reg.ru
logger: Logger объект
"""
self.config = config
self.api = api
self.logger = logger
self.domain = config["domain"]
self.email = config["email"]
self.cert_dir = os.path.join(config["cert_dir"], self.domain)
def check_certbot_installed(self) -> bool:
"""
Проверка установки certbot
Returns:
True если certbot установлен
"""
try:
result = subprocess.run(
["certbot", "--version"],
capture_output=True,
text=True,
check=True
)
self.logger.debug(f"Certbot установлен: {result.stdout.strip()}")
return True
except (subprocess.CalledProcessError, FileNotFoundError):
self.logger.error("Certbot не установлен!")
return False
def check_certificate_expiry(self) -> Optional[int]:
"""
Проверка срока действия сертификата
Returns:
Количество дней до истечения или None если сертификат не найден
"""
cert_file = os.path.join(self.cert_dir, "cert.pem")
if not os.path.exists(cert_file):
self.logger.info("Сертификат не найден")
return None
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
with open(cert_file, "rb") as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
expiry_date = cert.not_valid_after
days_left = (expiry_date - datetime.now()).days
self.logger.info(f"Сертификат истекает: {expiry_date.strftime('%Y-%m-%d')}")
self.logger.info(f"Осталось дней: {days_left}")
return days_left
except Exception as e:
self.logger.error(f"Ошибка при проверке сертификата: {e}")
return None
def dns_challenge_hook(self, validation_domain: str, validation_token: str) -> bool:
"""
Обработчик DNS challenge - добавление TXT записи
Args:
validation_domain: Домен для валидации (_acme-challenge.domain.com)
validation_token: Токен валидации
Returns:
True если успешно
"""
self.logger.info("=== DNS Challenge: Добавление TXT записи ===")
# Извлекаем поддомен из validation_domain
# Формат: _acme-challenge.domain.com или _acme-challenge
parts = validation_domain.replace(f".{self.domain}", "").split(".")
subdomain = parts[0] if parts else "_acme-challenge"
# Добавляем TXT запись
success = self.api.add_txt_record(self.domain, subdomain, validation_token)
if success:
# Ждем распространения DNS
wait_time = self.config.get("dns_propagation_wait", 60)
self.logger.info(f"Ожидание распространения DNS ({wait_time} секунд)...")
time.sleep(wait_time)
# Проверяем DNS запись
if self.verify_dns_record(subdomain, validation_token):
self.logger.info("DNS валидация готова")
return True
else:
self.logger.warning("DNS запись не распространилась вовремя, но продолжаем...")
return True
return False
def dns_cleanup_hook(self, validation_domain: str, validation_token: str) -> bool:
"""
Обработчик очистки DNS challenge - удаление TXT записи
Args:
validation_domain: Домен валидации
validation_token: Токен валидации
Returns:
True если успешно
"""
self.logger.info("=== DNS Challenge: Удаление TXT записи ===")
parts = validation_domain.replace(f".{self.domain}", "").split(".")
subdomain = parts[0] if parts else "_acme-challenge"
return self.api.remove_txt_record(self.domain, subdomain, validation_token)
def verify_dns_record(self, subdomain: str, expected_value: str) -> bool:
"""
Проверка наличия DNS записи
Args:
subdomain: Поддомен
expected_value: Ожидаемое значение TXT записи
Returns:
True если запись найдена
"""
import socket
full_domain = f"{subdomain}.{self.domain}"
attempts = self.config.get("dns_check_attempts", 10)
interval = self.config.get("dns_check_interval", 10)
self.logger.info(f"Проверка DNS записи для {full_domain}")
for attempt in range(attempts):
try:
# Используем nslookup или dig через subprocess
result = subprocess.run(
["nslookup", "-type=TXT", full_domain],
capture_output=True,
text=True,
timeout=10
)
if expected_value in result.stdout:
self.logger.info(f"DNS запись найдена (попытка {attempt + 1})")
return True
except Exception as e:
self.logger.debug(f"Попытка {attempt + 1}: DNS запись не найдена - {e}")
if attempt < attempts - 1:
time.sleep(interval)
self.logger.warning("DNS запись не найдена после всех попыток")
return False
def obtain_certificate(self) -> bool:
"""
Получение нового сертификата
Returns:
True если успешно
"""
self.logger.info("=== Запрос нового SSL сертификата ===")
# Формируем список доменов
domains = [self.domain]
if self.config.get("wildcard", False):
domains.append(f"*.{self.domain}")
domain_args = []
for d in domains:
domain_args.extend(["-d", d])
# Команда certbot
cmd = [
"certbot", "certonly",
"--manual",
"--preferred-challenges", "dns",
"--manual-auth-hook", f"{sys.executable} {os.path.abspath(__file__)} --auth-hook",
"--manual-cleanup-hook", f"{sys.executable} {os.path.abspath(__file__)} --cleanup-hook",
"--email", self.email,
"--agree-tos",
"--non-interactive",
"--expand",
] + domain_args
self.logger.info(f"Выполнение команды: {' '.join(cmd)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
self.logger.info("Сертификат успешно получен!")
self.logger.debug(result.stdout)
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Ошибка при получении сертификата: {e}")
self.logger.error(e.stderr)
return False
def renew_certificate(self) -> bool:
"""
Обновление существующего сертификата
Returns:
True если успешно
"""
self.logger.info("=== Обновление SSL сертификата ===")
cmd = [
"certbot", "renew",
"--manual",
"--manual-auth-hook", f"{sys.executable} {os.path.abspath(__file__)} --auth-hook",
"--manual-cleanup-hook", f"{sys.executable} {os.path.abspath(__file__)} --cleanup-hook",
"--non-interactive",
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
self.logger.info("Проверка обновления завершена")
self.logger.debug(result.stdout)
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Ошибка при обновлении: {e}")
self.logger.error(e.stderr)
return False
def display_certificate_info(self):
"""Вывод информации о сертификате"""
cert_file = os.path.join(self.cert_dir, "cert.pem")
if not os.path.exists(cert_file):
self.logger.warning("Сертификат не найден")
return
self.logger.info("=" * 60)
self.logger.info("ИНФОРМАЦИЯ О СЕРТИФИКАТЕ")
self.logger.info("=" * 60)
try:
result = subprocess.run(
["openssl", "x509", "-in", cert_file, "-text", "-noout"],
capture_output=True,
text=True,
check=True
)
# Выводим только основную информацию
for line in result.stdout.split("\n"):
if any(keyword in line for keyword in ["Subject:", "Issuer:", "Not Before", "Not After", "DNS:"]):
self.logger.info(line.strip())
self.logger.info("=" * 60)
self.logger.info("ПУТИ К ФАЙЛАМ СЕРТИФИКАТА:")
self.logger.info(f" Сертификат: {self.cert_dir}/cert.pem")
self.logger.info(f" Приватный ключ: {self.cert_dir}/privkey.pem")
self.logger.info(f" Цепочка: {self.cert_dir}/chain.pem")
self.logger.info(f" Полная цепочка: {self.cert_dir}/fullchain.pem")
self.logger.info("=" * 60)
except Exception as e:
self.logger.error(f"Ошибка при чтении сертификата: {e}")
def sync_with_npm(self, npm_api: NginxProxyManagerAPI) -> bool:
"""
Синхронизация сертификата с Nginx Proxy Manager
Args:
npm_api: API клиент NPM
Returns:
True если успешно
"""
self.logger.info("=== Синхронизация сертификата с Nginx Proxy Manager ===")
# Проверяем наличие сертификата
if not os.path.exists(self.cert_dir):
self.logger.error(f"Директория сертификата не найдена: {self.cert_dir}")
return False
# Синхронизируем сертификат
return npm_api.sync_certificate(self.domain, self.cert_dir)
# ==============================================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ==============================================================================
def reload_webserver(logger: logging.Logger):
"""
Перезагрузка веб-сервера
Args:
logger: Logger объект
"""
logger.info("Перезагрузка веб-сервера...")
# Проверяем какие сервисы активны
services = ["nginx", "apache2", "httpd"]
for service in services:
try:
# Проверяем статус
result = subprocess.run(
["systemctl", "is-active", service],
capture_output=True,
text=True
)
if result.stdout.strip() == "active":
# Перезагружаем
subprocess.run(
["systemctl", "reload", service],
check=True
)
logger.info(f"Сервис {service} перезагружен")
return
except Exception as e:
logger.debug(f"Сервис {service} не активен или ошибка: {e}")
logger.warning("Активный веб-сервер не найден")
def load_config(config_file: Optional[str] = None) -> Dict:
"""
Загрузка конфигурации из файла или использование значений по умолчанию
Args:
config_file: Путь к файлу конфигурации (JSON)
Returns:
Словарь с конфигурацией
"""
config = DEFAULT_CONFIG.copy()
if config_file and os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
config.update(user_config)
except Exception as e:
print(f"ОШИБКА: Не удалось загрузить конфигурацию из {config_file}: {e}")
sys.exit(1)
return config
def create_sample_config(output_file: str):
"""
Создание примера файла конфигурации
Args:
output_file: Путь к выходному файлу
"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(DEFAULT_CONFIG, f, indent=4, ensure_ascii=False)
print(f"Пример конфигурации создан: {output_file}")
print("Отредактируйте файл и укажите ваши учетные данные")
# ==============================================================================
# ОСНОВНАЯ ФУНКЦИЯ
# ==============================================================================
def main():
"""Основная функция скрипта"""
# Парсинг аргументов командной строки
parser = argparse.ArgumentParser(
description="Автоматическое управление SSL сертификатами Let's Encrypt через API reg.ru"
)
parser.add_argument(
"-c", "--config",
help="Путь к файлу конфигурации (JSON)",
default=None
)
parser.add_argument(
"--create-config",
help="Создать пример файла конфигурации",
metavar="FILE"
)
parser.add_argument(
"--obtain",
help="Получить новый сертификат",
action="store_true"
)
parser.add_argument(
"--renew",
help="Обновить существующий сертификат",
action="store_true"
)
parser.add_argument(
"--check",
help="Проверить срок действия сертификата",
action="store_true"
)
parser.add_argument(
"--auth-hook",
help="Внутренний хук для DNS аутентификации (используется certbot)",
action="store_true"
)
parser.add_argument(
"--cleanup-hook",
help="Внутренний хук для очистки DNS (используется certbot)",
action="store_true"
)
parser.add_argument(
"-v", "--verbose",
help="Подробный вывод",
action="store_true"
)
parser.add_argument(
"--auto",
help="Автоматический режим: проверка и обновление при необходимости",
action="store_true"
)
parser.add_argument(
"--test-cert",
help="Создать самоподписанный тестовый сертификат (для разработки и тестирования)",
action="store_true"
)
args = parser.parse_args()
# Создание примера конфигурации
if args.create_config:
create_sample_config(args.create_config)
return 0
# Загрузка конфигурации
config = load_config(args.config)
# Настройка логирования
logger = setup_logging(config["log_file"], args.verbose)
# Генерация тестового сертификата
if args.test_cert:
logger.info("=" * 80)
logger.info("РЕЖИМ: Генерация тестового самоподписанного сертификата")
logger.info("=" * 80)
test_gen = TestCertificateGenerator(logger)
success = test_gen.generate_self_signed_certificate(
domain=config["domain"],
wildcard=config.get("wildcard", False),
output_dir=config["cert_dir"],
validity_days=90
)
if success:
# Опционально загружаем в NPM
if config.get("npm_enabled", False):
logger.info("")
logger.info("=" * 80)
logger.info("ЗАГРУЗКА ТЕСТОВОГО СЕРТИФИКАТА В NGINX PROXY MANAGER")
logger.info("=" * 80)
npm_api = NginxProxyManagerAPI(
config["npm_host"],
config["npm_email"],
config["npm_password"],
logger
)
if npm_api.login():
cert_dir = os.path.join(config["cert_dir"], config["domain"])
cert_path = os.path.join(cert_dir, "fullchain.pem")
key_path = os.path.join(cert_dir, "privkey.pem")
# Проверяем существующий сертификат
existing = npm_api.find_certificate_by_domain(config["domain"])
if existing:
# Обновляем существующий
cert_id = existing.get("id")
logger.info(f"Обновление существующего сертификата в NPM (ID: {cert_id})")
if npm_api.update_certificate(cert_id, cert_path, key_path):
logger.info("✅ Тестовый сертификат успешно обновлен в NPM")
else:
logger.warning("⚠️ Не удалось обновить сертификат в NPM")
else:
# Создаем новый
logger.info("Загрузка нового тестового сертификата в NPM")
if npm_api.upload_certificate(config["domain"], cert_path, key_path):
logger.info("✅ Тестовый сертификат успешно загружен в NPM")
else:
logger.warning("⚠️ Не удалось загрузить сертификат в NPM")
else:
logger.error("Не удалось подключиться к Nginx Proxy Manager")
logger.info("")
logger.info("=" * 80)
logger.info("✅ ТЕСТОВЫЙ СЕРТИФИКАТ УСПЕШНО СОЗДАН")
logger.info("=" * 80)
return 0
else:
logger.error("Не удалось создать тестовый сертификат")
return 1
# Обработка хуков для certbot
if args.auth_hook:
# Certbot передает домен и токен через переменные окружения
domain = os.environ.get("CERTBOT_DOMAIN")
token = os.environ.get("CERTBOT_VALIDATION")
if domain and token:
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
manager = LetsEncryptManager(config, api, logger)
success = manager.dns_challenge_hook(domain, token)
return 0 if success else 1
else:
logger.error("CERTBOT_DOMAIN или CERTBOT_VALIDATION не установлены")
return 1
if args.cleanup_hook:
domain = os.environ.get("CERTBOT_DOMAIN")
token = os.environ.get("CERTBOT_VALIDATION")
if domain and token:
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
manager = LetsEncryptManager(config, api, logger)
success = manager.dns_cleanup_hook(domain, token)
return 0 if success else 1
else:
logger.error("CERTBOT_DOMAIN или CERTBOT_VALIDATION не установлены")
return 1
# Проверка прав root
if os.geteuid() != 0:
logger.error("Скрипт должен быть запущен от имени root (sudo)")
return 1
# Инициализация API и менеджера
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
manager = LetsEncryptManager(config, api, logger)
# Проверка certbot
if not manager.check_certbot_installed():
logger.error("Установите certbot: apt-get install certbot")
return 1
logger.info("=" * 60)
logger.info("СКРИПТ УПРАВЛЕНИЯ SSL СЕРТИФИКАТАМИ LET'S ENCRYPT")
logger.info("=" * 60)
# Выполнение действий
if args.check:
# Только проверка срока действия
days_left = manager.check_certificate_expiry()
if days_left is None:
logger.info("Сертификат не найден. Требуется создание нового.")
return 2
elif days_left < 30:
logger.warning(f"Сертификат истекает через {days_left} дней. Требуется обновление!")
return 1
else:
logger.info(f"Сертификат действителен ({days_left} дней)")
return 0
elif args.obtain:
# Принудительное получение нового сертификата
success = manager.obtain_certificate()
if success:
manager.display_certificate_info()
reload_webserver(logger)
# Синхронизация с Nginx Proxy Manager
if config.get("npm_enabled", False):
npm_api = NginxProxyManagerAPI(
config["npm_host"],
config["npm_email"],
config["npm_password"],
logger
)
if manager.sync_with_npm(npm_api):
logger.info("Сертификат успешно добавлен в Nginx Proxy Manager")
else:
logger.warning("Не удалось синхронизировать сертификат с NPM")
logger.info("Новый сертификат успешно создан")
return 0
else:
logger.error("Не удалось получить сертификат")
return 1
elif args.renew:
# Обновление существующего сертификата
success = manager.renew_certificate()
if success:
manager.display_certificate_info()
reload_webserver(logger)
# Синхронизация с Nginx Proxy Manager
if config.get("npm_enabled", False):
npm_api = NginxProxyManagerAPI(
config["npm_host"],
config["npm_email"],
config["npm_password"],
logger
)
if manager.sync_with_npm(npm_api):
logger.info("Сертификат успешно обновлен в Nginx Proxy Manager")
else:
logger.warning("Не удалось синхронизировать сертификат с NPM")
logger.info("Сертификат успешно обновлен")
return 0
else:
logger.error("Не удалось обновить сертификат")
return 1
else:
# Автоматический режим: проверка и обновление при необходимости
logger.info("=" * 60)
logger.info("АВТОМАТИЧЕСКАЯ ПРОВЕРКА И ОБНОВЛЕНИЕ СЕРТИФИКАТА")
logger.info("=" * 60)
# Получаем порог для обновления из конфигурации
renewal_days = config.get("renewal_days", 30)
logger.info(f"Порог обновления: {renewal_days} дней до истечения")
# Проверяем срок действия сертификата
days_left = manager.check_certificate_expiry()
if days_left is None:
# Сертификат не существует - создаем новый
logger.info("=" * 60)
logger.info("СТАТУС: Сертификат не найден")
logger.info("ДЕЙСТВИЕ: Создание нового сертификата")
logger.info("=" * 60)
success = manager.obtain_certificate()
action = "создан"
elif days_left < renewal_days:
# Сертификат скоро истекает - обновляем
logger.info("=" * 60)
logger.info(f"СТАТУС: Сертификат истекает через {days_left} дней")
logger.info(f"ДЕЙСТВИЕ: Обновление сертификата (порог: {renewal_days} дней)")
logger.info("=" * 60)
success = manager.renew_certificate()
action = "обновлен"
else:
# Сертификат действителен - ничего не делаем
logger.info("=" * 60)
logger.info(f"СТАТУС: Сертификат действителен ({days_left} дней)")
logger.info("ДЕЙСТВИЕ: Обновление не требуется")
logger.info("=" * 60)
manager.display_certificate_info()
# Проверяем синхронизацию с NPM даже если сертификат действителен
if config.get("npm_enabled", False):
logger.info("Проверка синхронизации с Nginx Proxy Manager...")
npm_api = NginxProxyManagerAPI(
config["npm_host"],
config["npm_email"],
config["npm_password"],
logger
)
existing_cert = npm_api.login() and npm_api.find_certificate_by_domain(manager.domain)
if existing_cert:
logger.info(f"Сертификат найден в NPM (ID: {existing_cert.get('id')})")
else:
logger.info("Сертификат не найден в NPM. Синхронизация...")
if manager.sync_with_npm(npm_api):
logger.info("Сертификат успешно синхронизирован с NPM")
return 0
# Если был создан или обновлен сертификат
if success:
logger.info("=" * 60)
logger.info(f"РЕЗУЛЬТАТ: Сертификат успешно {action}")
logger.info("=" * 60)
manager.display_certificate_info()
reload_webserver(logger)
# Синхронизация с Nginx Proxy Manager
if config.get("npm_enabled", False):
logger.info("=" * 60)
logger.info("СИНХРОНИЗАЦИЯ С NGINX PROXY MANAGER")
logger.info("=" * 60)
npm_api = NginxProxyManagerAPI(
config["npm_host"],
config["npm_email"],
config["npm_password"],
logger
)
if manager.sync_with_npm(npm_api):
logger.info(f"✅ Сертификат успешно {action} в Nginx Proxy Manager")
else:
logger.warning("⚠️ Не удалось синхронизировать сертификат с NPM")
logger.info("=" * 60)
logger.info("ОПЕРАЦИЯ ЗАВЕРШЕНА УСПЕШНО")
logger.info("=" * 60)
return 0
else:
logger.error("Операция завершилась с ошибкой")
return 1
if __name__ == "__main__":
sys.exit(main())