832 lines
31 KiB
Python
832 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Скрипт для создания и обновления SSL сертификата Let's Encrypt
|
||
с использованием DNS-валидации через API reg.ru
|
||
|
||
Автор: GitHub Copilot
|
||
Дата: 27.10.2025
|
||
|
||
Описание:
|
||
Этот скрипт автоматизирует процесс получения и обновления SSL сертификатов
|
||
Let's Encrypt для доменов, зарегистрированных на reg.ru, используя DNS-01 challenge.
|
||
Скрипт напрямую работает с API reg.ru для управления DNS записями.
|
||
|
||
Требования:
|
||
- Python 3.6+
|
||
- requests
|
||
- certbot
|
||
- cryptography
|
||
|
||
Установка зависимостей:
|
||
pip install requests certbot cryptography
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import logging
|
||
import argparse
|
||
import subprocess
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
try:
|
||
import requests
|
||
except ImportError:
|
||
print("ОШИБКА: Необходимо установить модуль 'requests'")
|
||
print("Выполните: pip install requests")
|
||
sys.exit(1)
|
||
|
||
# ==============================================================================
|
||
# КОНФИГУРАЦИЯ
|
||
# ==============================================================================
|
||
|
||
# Настройки по умолчанию
|
||
DEFAULT_CONFIG = {
|
||
# Учетные данные API reg.ru
|
||
"regru_username": "your_username",
|
||
"regru_password": "your_password",
|
||
|
||
# Параметры домена
|
||
"domain": "dfv24.com",
|
||
"wildcard": True, # Создавать wildcard сертификат (*.domain.com)
|
||
|
||
# Email для уведомлений Let's Encrypt
|
||
"email": "admin@dfv24.com",
|
||
|
||
# Директории
|
||
"cert_dir": "/etc/letsencrypt/live",
|
||
"log_file": "/var/log/letsencrypt_regru.log",
|
||
|
||
# Параметры DNS
|
||
"dns_propagation_wait": 60, # Время ожидания распространения DNS (секунды)
|
||
"dns_check_attempts": 10, # Количество попыток проверки DNS
|
||
"dns_check_interval": 10, # Интервал между проверками DNS (секунды)
|
||
}
|
||
|
||
# API endpoints для reg.ru
|
||
REGRU_API_URL = "https://api.reg.ru/api/regru2"
|
||
|
||
# ==============================================================================
|
||
# НАСТРОЙКА ЛОГИРОВАНИЯ
|
||
# ==============================================================================
|
||
|
||
def setup_logging(log_file: str, verbose: bool = False) -> logging.Logger:
|
||
"""
|
||
Настройка системы логирования
|
||
|
||
Args:
|
||
log_file: Путь к файлу лога
|
||
verbose: Режим подробного вывода
|
||
|
||
Returns:
|
||
Logger объект
|
||
"""
|
||
log_level = logging.DEBUG if verbose else logging.INFO
|
||
|
||
# Создаем директорию для логов, если не существует
|
||
log_dir = os.path.dirname(log_file)
|
||
if log_dir and not os.path.exists(log_dir):
|
||
os.makedirs(log_dir, exist_ok=True)
|
||
|
||
# Настройка форматирования
|
||
formatter = logging.Formatter(
|
||
'%(asctime)s - %(levelname)s - %(message)s',
|
||
datefmt='%Y-%m-%d %H:%M:%S'
|
||
)
|
||
|
||
# Создаем logger
|
||
logger = logging.getLogger('LetsEncrypt_RegRU')
|
||
logger.setLevel(log_level)
|
||
|
||
# Обработчик для файла
|
||
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
||
file_handler.setLevel(log_level)
|
||
file_handler.setFormatter(formatter)
|
||
|
||
# Обработчик для консоли
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_handler.setLevel(log_level)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
# Добавляем обработчики
|
||
logger.addHandler(file_handler)
|
||
logger.addHandler(console_handler)
|
||
|
||
return logger
|
||
|
||
|
||
# ==============================================================================
|
||
# КЛАСС ДЛЯ РАБОТЫ С API REG.RU
|
||
# ==============================================================================
|
||
|
||
class RegRuAPI:
|
||
"""Класс для работы с API reg.ru"""
|
||
|
||
def __init__(self, username: str, password: str, logger: logging.Logger):
|
||
"""
|
||
Инициализация API клиента
|
||
|
||
Args:
|
||
username: Имя пользователя reg.ru
|
||
password: Пароль reg.ru
|
||
logger: Logger объект
|
||
"""
|
||
self.username = username
|
||
self.password = password
|
||
self.logger = logger
|
||
self.session = requests.Session()
|
||
|
||
def _make_request(self, method: str, params: Dict) -> Dict:
|
||
"""
|
||
Выполнение запроса к API reg.ru
|
||
|
||
Args:
|
||
method: Название метода API
|
||
params: Параметры запроса
|
||
|
||
Returns:
|
||
Ответ API в формате dict
|
||
"""
|
||
url = f"{REGRU_API_URL}/{method}"
|
||
|
||
# Добавляем учетные данные к параметрам
|
||
params.update({
|
||
"username": self.username,
|
||
"password": self.password,
|
||
"output_format": "json"
|
||
})
|
||
|
||
try:
|
||
self.logger.debug(f"Отправка запроса к API: {method}")
|
||
response = self.session.post(url, data=params)
|
||
response.raise_for_status()
|
||
|
||
result = response.json()
|
||
|
||
if result.get("result") == "success":
|
||
self.logger.debug(f"Запрос {method} выполнен успешно")
|
||
return result
|
||
else:
|
||
error_msg = result.get("error_text", "Неизвестная ошибка")
|
||
self.logger.error(f"Ошибка API: {error_msg}")
|
||
raise Exception(f"API Error: {error_msg}")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
self.logger.error(f"Ошибка HTTP запроса: {e}")
|
||
raise
|
||
|
||
def get_zone_records(self, domain: str) -> List[Dict]:
|
||
"""
|
||
Получение DNS записей домена
|
||
|
||
Args:
|
||
domain: Доменное имя
|
||
|
||
Returns:
|
||
Список DNS записей
|
||
"""
|
||
self.logger.info(f"Получение DNS записей для домена: {domain}")
|
||
|
||
params = {
|
||
"domain": domain,
|
||
}
|
||
|
||
result = self._make_request("zone/get_resource_records", params)
|
||
|
||
if "answer" in result and "records" in result["answer"]:
|
||
records = result["answer"]["records"]
|
||
self.logger.info(f"Получено {len(records)} DNS записей")
|
||
return records
|
||
else:
|
||
self.logger.warning("DNS записи не найдены")
|
||
return []
|
||
|
||
def add_txt_record(self, domain: str, subdomain: str, txt_value: str) -> bool:
|
||
"""
|
||
Добавление TXT записи для DNS валидации
|
||
|
||
Args:
|
||
domain: Основной домен
|
||
subdomain: Поддомен (например, _acme-challenge)
|
||
txt_value: Значение TXT записи
|
||
|
||
Returns:
|
||
True если успешно, False в противном случае
|
||
"""
|
||
self.logger.info(f"Добавление TXT записи: {subdomain}.{domain} = {txt_value}")
|
||
|
||
params = {
|
||
"domain": domain,
|
||
"subdomain": subdomain,
|
||
"text": txt_value,
|
||
"output_content_type": "plain"
|
||
}
|
||
|
||
try:
|
||
self._make_request("zone/add_txt", params)
|
||
self.logger.info("TXT запись успешно добавлена")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Не удалось добавить TXT запись: {e}")
|
||
return False
|
||
|
||
def remove_txt_record(self, domain: str, subdomain: str, txt_value: str) -> bool:
|
||
"""
|
||
Удаление TXT записи
|
||
|
||
Args:
|
||
domain: Основной домен
|
||
subdomain: Поддомен
|
||
txt_value: Значение TXT записи
|
||
|
||
Returns:
|
||
True если успешно, False в противном случае
|
||
"""
|
||
self.logger.info(f"Удаление TXT записи: {subdomain}.{domain}")
|
||
|
||
# Сначала получаем список всех записей
|
||
records = self.get_zone_records(domain)
|
||
|
||
# Ищем нужную TXT запись
|
||
record_id = None
|
||
for record in records:
|
||
if (record.get("rectype") == "TXT" and
|
||
record.get("subdomain") == subdomain and
|
||
record.get("text") == txt_value):
|
||
record_id = record.get("id")
|
||
break
|
||
|
||
if not record_id:
|
||
self.logger.warning("TXT запись для удаления не найдена")
|
||
return False
|
||
|
||
params = {
|
||
"domain": domain,
|
||
"record_id": record_id
|
||
}
|
||
|
||
try:
|
||
self._make_request("zone/remove_record", params)
|
||
self.logger.info("TXT запись успешно удалена")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Не удалось удалить TXT запись: {e}")
|
||
return False
|
||
|
||
|
||
# ==============================================================================
|
||
# КЛАСС ДЛЯ РАБОТЫ С CERTBOT
|
||
# ==============================================================================
|
||
|
||
class LetsEncryptManager:
|
||
"""Класс для управления сертификатами Let's Encrypt"""
|
||
|
||
def __init__(self, config: Dict, api: RegRuAPI, logger: logging.Logger):
|
||
"""
|
||
Инициализация менеджера сертификатов
|
||
|
||
Args:
|
||
config: Конфигурация
|
||
api: API клиент reg.ru
|
||
logger: Logger объект
|
||
"""
|
||
self.config = config
|
||
self.api = api
|
||
self.logger = logger
|
||
self.domain = config["domain"]
|
||
self.email = config["email"]
|
||
self.cert_dir = os.path.join(config["cert_dir"], self.domain)
|
||
|
||
def check_certbot_installed(self) -> bool:
|
||
"""
|
||
Проверка установки certbot
|
||
|
||
Returns:
|
||
True если certbot установлен
|
||
"""
|
||
try:
|
||
result = subprocess.run(
|
||
["certbot", "--version"],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
self.logger.debug(f"Certbot установлен: {result.stdout.strip()}")
|
||
return True
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
self.logger.error("Certbot не установлен!")
|
||
return False
|
||
|
||
def check_certificate_expiry(self) -> Optional[int]:
|
||
"""
|
||
Проверка срока действия сертификата
|
||
|
||
Returns:
|
||
Количество дней до истечения или None если сертификат не найден
|
||
"""
|
||
cert_file = os.path.join(self.cert_dir, "cert.pem")
|
||
|
||
if not os.path.exists(cert_file):
|
||
self.logger.info("Сертификат не найден")
|
||
return None
|
||
|
||
try:
|
||
from cryptography import x509
|
||
from cryptography.hazmat.backends import default_backend
|
||
|
||
with open(cert_file, "rb") as f:
|
||
cert_data = f.read()
|
||
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
|
||
|
||
expiry_date = cert.not_valid_after
|
||
days_left = (expiry_date - datetime.now()).days
|
||
|
||
self.logger.info(f"Сертификат истекает: {expiry_date.strftime('%Y-%m-%d')}")
|
||
self.logger.info(f"Осталось дней: {days_left}")
|
||
|
||
return days_left
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при проверке сертификата: {e}")
|
||
return None
|
||
|
||
def dns_challenge_hook(self, validation_domain: str, validation_token: str) -> bool:
|
||
"""
|
||
Обработчик DNS challenge - добавление TXT записи
|
||
|
||
Args:
|
||
validation_domain: Домен для валидации (_acme-challenge.domain.com)
|
||
validation_token: Токен валидации
|
||
|
||
Returns:
|
||
True если успешно
|
||
"""
|
||
self.logger.info("=== DNS Challenge: Добавление TXT записи ===")
|
||
|
||
# Извлекаем поддомен из validation_domain
|
||
# Формат: _acme-challenge.domain.com или _acme-challenge
|
||
parts = validation_domain.replace(f".{self.domain}", "").split(".")
|
||
subdomain = parts[0] if parts else "_acme-challenge"
|
||
|
||
# Добавляем TXT запись
|
||
success = self.api.add_txt_record(self.domain, subdomain, validation_token)
|
||
|
||
if success:
|
||
# Ждем распространения DNS
|
||
wait_time = self.config.get("dns_propagation_wait", 60)
|
||
self.logger.info(f"Ожидание распространения DNS ({wait_time} секунд)...")
|
||
time.sleep(wait_time)
|
||
|
||
# Проверяем DNS запись
|
||
if self.verify_dns_record(subdomain, validation_token):
|
||
self.logger.info("DNS валидация готова")
|
||
return True
|
||
else:
|
||
self.logger.warning("DNS запись не распространилась вовремя, но продолжаем...")
|
||
return True
|
||
|
||
return False
|
||
|
||
def dns_cleanup_hook(self, validation_domain: str, validation_token: str) -> bool:
|
||
"""
|
||
Обработчик очистки DNS challenge - удаление TXT записи
|
||
|
||
Args:
|
||
validation_domain: Домен валидации
|
||
validation_token: Токен валидации
|
||
|
||
Returns:
|
||
True если успешно
|
||
"""
|
||
self.logger.info("=== DNS Challenge: Удаление TXT записи ===")
|
||
|
||
parts = validation_domain.replace(f".{self.domain}", "").split(".")
|
||
subdomain = parts[0] if parts else "_acme-challenge"
|
||
|
||
return self.api.remove_txt_record(self.domain, subdomain, validation_token)
|
||
|
||
def verify_dns_record(self, subdomain: str, expected_value: str) -> bool:
|
||
"""
|
||
Проверка наличия DNS записи
|
||
|
||
Args:
|
||
subdomain: Поддомен
|
||
expected_value: Ожидаемое значение TXT записи
|
||
|
||
Returns:
|
||
True если запись найдена
|
||
"""
|
||
import socket
|
||
|
||
full_domain = f"{subdomain}.{self.domain}"
|
||
attempts = self.config.get("dns_check_attempts", 10)
|
||
interval = self.config.get("dns_check_interval", 10)
|
||
|
||
self.logger.info(f"Проверка DNS записи для {full_domain}")
|
||
|
||
for attempt in range(attempts):
|
||
try:
|
||
# Используем nslookup или dig через subprocess
|
||
result = subprocess.run(
|
||
["nslookup", "-type=TXT", full_domain],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10
|
||
)
|
||
|
||
if expected_value in result.stdout:
|
||
self.logger.info(f"DNS запись найдена (попытка {attempt + 1})")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"Попытка {attempt + 1}: DNS запись не найдена - {e}")
|
||
|
||
if attempt < attempts - 1:
|
||
time.sleep(interval)
|
||
|
||
self.logger.warning("DNS запись не найдена после всех попыток")
|
||
return False
|
||
|
||
def obtain_certificate(self) -> bool:
|
||
"""
|
||
Получение нового сертификата
|
||
|
||
Returns:
|
||
True если успешно
|
||
"""
|
||
self.logger.info("=== Запрос нового SSL сертификата ===")
|
||
|
||
# Формируем список доменов
|
||
domains = [self.domain]
|
||
if self.config.get("wildcard", False):
|
||
domains.append(f"*.{self.domain}")
|
||
|
||
domain_args = []
|
||
for d in domains:
|
||
domain_args.extend(["-d", d])
|
||
|
||
# Команда certbot
|
||
cmd = [
|
||
"certbot", "certonly",
|
||
"--manual",
|
||
"--preferred-challenges", "dns",
|
||
"--manual-auth-hook", f"{sys.executable} {os.path.abspath(__file__)} --auth-hook",
|
||
"--manual-cleanup-hook", f"{sys.executable} {os.path.abspath(__file__)} --cleanup-hook",
|
||
"--email", self.email,
|
||
"--agree-tos",
|
||
"--non-interactive",
|
||
"--expand",
|
||
] + domain_args
|
||
|
||
self.logger.info(f"Выполнение команды: {' '.join(cmd)}")
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
self.logger.info("Сертификат успешно получен!")
|
||
self.logger.debug(result.stdout)
|
||
return True
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
self.logger.error(f"Ошибка при получении сертификата: {e}")
|
||
self.logger.error(e.stderr)
|
||
return False
|
||
|
||
def renew_certificate(self) -> bool:
|
||
"""
|
||
Обновление существующего сертификата
|
||
|
||
Returns:
|
||
True если успешно
|
||
"""
|
||
self.logger.info("=== Обновление SSL сертификата ===")
|
||
|
||
cmd = [
|
||
"certbot", "renew",
|
||
"--manual",
|
||
"--manual-auth-hook", f"{sys.executable} {os.path.abspath(__file__)} --auth-hook",
|
||
"--manual-cleanup-hook", f"{sys.executable} {os.path.abspath(__file__)} --cleanup-hook",
|
||
"--non-interactive",
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
self.logger.info("Проверка обновления завершена")
|
||
self.logger.debug(result.stdout)
|
||
return True
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
self.logger.error(f"Ошибка при обновлении: {e}")
|
||
self.logger.error(e.stderr)
|
||
return False
|
||
|
||
def display_certificate_info(self):
|
||
"""Вывод информации о сертификате"""
|
||
cert_file = os.path.join(self.cert_dir, "cert.pem")
|
||
|
||
if not os.path.exists(cert_file):
|
||
self.logger.warning("Сертификат не найден")
|
||
return
|
||
|
||
self.logger.info("=" * 60)
|
||
self.logger.info("ИНФОРМАЦИЯ О СЕРТИФИКАТЕ")
|
||
self.logger.info("=" * 60)
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
["openssl", "x509", "-in", cert_file, "-text", "-noout"],
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
# Выводим только основную информацию
|
||
for line in result.stdout.split("\n"):
|
||
if any(keyword in line for keyword in ["Subject:", "Issuer:", "Not Before", "Not After", "DNS:"]):
|
||
self.logger.info(line.strip())
|
||
|
||
self.logger.info("=" * 60)
|
||
self.logger.info("ПУТИ К ФАЙЛАМ СЕРТИФИКАТА:")
|
||
self.logger.info(f" Сертификат: {self.cert_dir}/cert.pem")
|
||
self.logger.info(f" Приватный ключ: {self.cert_dir}/privkey.pem")
|
||
self.logger.info(f" Цепочка: {self.cert_dir}/chain.pem")
|
||
self.logger.info(f" Полная цепочка: {self.cert_dir}/fullchain.pem")
|
||
self.logger.info("=" * 60)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при чтении сертификата: {e}")
|
||
|
||
|
||
# ==============================================================================
|
||
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
|
||
# ==============================================================================
|
||
|
||
def reload_webserver(logger: logging.Logger):
|
||
"""
|
||
Перезагрузка веб-сервера
|
||
|
||
Args:
|
||
logger: Logger объект
|
||
"""
|
||
logger.info("Перезагрузка веб-сервера...")
|
||
|
||
# Проверяем какие сервисы активны
|
||
services = ["nginx", "apache2", "httpd"]
|
||
|
||
for service in services:
|
||
try:
|
||
# Проверяем статус
|
||
result = subprocess.run(
|
||
["systemctl", "is-active", service],
|
||
capture_output=True,
|
||
text=True
|
||
)
|
||
|
||
if result.stdout.strip() == "active":
|
||
# Перезагружаем
|
||
subprocess.run(
|
||
["systemctl", "reload", service],
|
||
check=True
|
||
)
|
||
logger.info(f"Сервис {service} перезагружен")
|
||
return
|
||
|
||
except Exception as e:
|
||
logger.debug(f"Сервис {service} не активен или ошибка: {e}")
|
||
|
||
logger.warning("Активный веб-сервер не найден")
|
||
|
||
|
||
def load_config(config_file: Optional[str] = None) -> Dict:
|
||
"""
|
||
Загрузка конфигурации из файла или использование значений по умолчанию
|
||
|
||
Args:
|
||
config_file: Путь к файлу конфигурации (JSON)
|
||
|
||
Returns:
|
||
Словарь с конфигурацией
|
||
"""
|
||
config = DEFAULT_CONFIG.copy()
|
||
|
||
if config_file and os.path.exists(config_file):
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
user_config = json.load(f)
|
||
config.update(user_config)
|
||
except Exception as e:
|
||
print(f"ОШИБКА: Не удалось загрузить конфигурацию из {config_file}: {e}")
|
||
sys.exit(1)
|
||
|
||
return config
|
||
|
||
|
||
def create_sample_config(output_file: str):
|
||
"""
|
||
Создание примера файла конфигурации
|
||
|
||
Args:
|
||
output_file: Путь к выходному файлу
|
||
"""
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump(DEFAULT_CONFIG, f, indent=4, ensure_ascii=False)
|
||
|
||
print(f"Пример конфигурации создан: {output_file}")
|
||
print("Отредактируйте файл и укажите ваши учетные данные")
|
||
|
||
|
||
# ==============================================================================
|
||
# ОСНОВНАЯ ФУНКЦИЯ
|
||
# ==============================================================================
|
||
|
||
def main():
|
||
"""Основная функция скрипта"""
|
||
|
||
# Парсинг аргументов командной строки
|
||
parser = argparse.ArgumentParser(
|
||
description="Автоматическое управление SSL сертификатами Let's Encrypt через API reg.ru"
|
||
)
|
||
parser.add_argument(
|
||
"-c", "--config",
|
||
help="Путь к файлу конфигурации (JSON)",
|
||
default=None
|
||
)
|
||
parser.add_argument(
|
||
"--create-config",
|
||
help="Создать пример файла конфигурации",
|
||
metavar="FILE"
|
||
)
|
||
parser.add_argument(
|
||
"--obtain",
|
||
help="Получить новый сертификат",
|
||
action="store_true"
|
||
)
|
||
parser.add_argument(
|
||
"--renew",
|
||
help="Обновить существующий сертификат",
|
||
action="store_true"
|
||
)
|
||
parser.add_argument(
|
||
"--check",
|
||
help="Проверить срок действия сертификата",
|
||
action="store_true"
|
||
)
|
||
parser.add_argument(
|
||
"--auth-hook",
|
||
help="Внутренний хук для DNS аутентификации (используется certbot)",
|
||
action="store_true"
|
||
)
|
||
parser.add_argument(
|
||
"--cleanup-hook",
|
||
help="Внутренний хук для очистки DNS (используется certbot)",
|
||
action="store_true"
|
||
)
|
||
parser.add_argument(
|
||
"-v", "--verbose",
|
||
help="Подробный вывод",
|
||
action="store_true"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Создание примера конфигурации
|
||
if args.create_config:
|
||
create_sample_config(args.create_config)
|
||
return 0
|
||
|
||
# Загрузка конфигурации
|
||
config = load_config(args.config)
|
||
|
||
# Настройка логирования
|
||
logger = setup_logging(config["log_file"], args.verbose)
|
||
|
||
# Обработка хуков для certbot
|
||
if args.auth_hook:
|
||
# Certbot передает домен и токен через переменные окружения
|
||
domain = os.environ.get("CERTBOT_DOMAIN")
|
||
token = os.environ.get("CERTBOT_VALIDATION")
|
||
|
||
if domain and token:
|
||
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
|
||
manager = LetsEncryptManager(config, api, logger)
|
||
success = manager.dns_challenge_hook(domain, token)
|
||
return 0 if success else 1
|
||
else:
|
||
logger.error("CERTBOT_DOMAIN или CERTBOT_VALIDATION не установлены")
|
||
return 1
|
||
|
||
if args.cleanup_hook:
|
||
domain = os.environ.get("CERTBOT_DOMAIN")
|
||
token = os.environ.get("CERTBOT_VALIDATION")
|
||
|
||
if domain and token:
|
||
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
|
||
manager = LetsEncryptManager(config, api, logger)
|
||
success = manager.dns_cleanup_hook(domain, token)
|
||
return 0 if success else 1
|
||
else:
|
||
logger.error("CERTBOT_DOMAIN или CERTBOT_VALIDATION не установлены")
|
||
return 1
|
||
|
||
# Проверка прав root
|
||
if os.geteuid() != 0:
|
||
logger.error("Скрипт должен быть запущен от имени root (sudo)")
|
||
return 1
|
||
|
||
# Инициализация API и менеджера
|
||
api = RegRuAPI(config["regru_username"], config["regru_password"], logger)
|
||
manager = LetsEncryptManager(config, api, logger)
|
||
|
||
# Проверка certbot
|
||
if not manager.check_certbot_installed():
|
||
logger.error("Установите certbot: apt-get install certbot")
|
||
return 1
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("СКРИПТ УПРАВЛЕНИЯ SSL СЕРТИФИКАТАМИ LET'S ENCRYPT")
|
||
logger.info("=" * 60)
|
||
|
||
# Выполнение действий
|
||
if args.check:
|
||
# Только проверка срока действия
|
||
days_left = manager.check_certificate_expiry()
|
||
if days_left is None:
|
||
logger.info("Сертификат не найден. Требуется создание нового.")
|
||
return 2
|
||
elif days_left < 30:
|
||
logger.warning(f"Сертификат истекает через {days_left} дней. Требуется обновление!")
|
||
return 1
|
||
else:
|
||
logger.info(f"Сертификат действителен ({days_left} дней)")
|
||
return 0
|
||
|
||
elif args.obtain:
|
||
# Принудительное получение нового сертификата
|
||
success = manager.obtain_certificate()
|
||
if success:
|
||
manager.display_certificate_info()
|
||
reload_webserver(logger)
|
||
logger.info("Новый сертификат успешно создан")
|
||
return 0
|
||
else:
|
||
logger.error("Не удалось получить сертификат")
|
||
return 1
|
||
|
||
elif args.renew:
|
||
# Обновление существующего сертификата
|
||
success = manager.renew_certificate()
|
||
if success:
|
||
manager.display_certificate_info()
|
||
reload_webserver(logger)
|
||
logger.info("Сертификат успешно обновлен")
|
||
return 0
|
||
else:
|
||
logger.error("Не удалось обновить сертификат")
|
||
return 1
|
||
|
||
else:
|
||
# Автоматический режим: проверка и обновление при необходимости
|
||
days_left = manager.check_certificate_expiry()
|
||
|
||
if days_left is None:
|
||
# Сертификат не существует
|
||
logger.info("Сертификат не найден. Создание нового...")
|
||
success = manager.obtain_certificate()
|
||
elif days_left < 30:
|
||
# Сертификат скоро истекает
|
||
logger.info(f"Сертификат истекает через {days_left} дней. Обновление...")
|
||
success = manager.renew_certificate()
|
||
else:
|
||
# Сертификат действителен
|
||
logger.info(f"Сертификат действителен ({days_left} дней). Обновление не требуется.")
|
||
manager.display_certificate_info()
|
||
return 0
|
||
|
||
if success:
|
||
manager.display_certificate_info()
|
||
reload_webserver(logger)
|
||
logger.info("Операция завершена успешно")
|
||
return 0
|
||
else:
|
||
logger.error("Операция завершилась с ошибкой")
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|