Files
2026-06-27 00:55:18 +07:00

144 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# /opt/passwork/passwork.py
import os
import base64
import time
import urllib3
from passwork_client import PassworkClient
from passwork_client.exceptions import PassworkError, PassworkResponseError
from dotenv import load_dotenv
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
ENV_PATH = "/opt/passwork/.env"
def save_tokens_to_env(access_token: str, refresh_token: str, host: str, vault_id: str):
"""Автоматически перезаписывает обновленные токены в файл .env для следующего запуска"""
try:
with open(ENV_PATH, "w", encoding="utf-8") as f:
f.write(f"PASSWORK_HOST={host}\n")
f.write(f"PASSWORK_VAULT_ID={vault_id}\n")
f.write(f"PASSWORK_ACCESS_TOKEN={access_token}\n")
f.write(f"PASSWORK_REFRESH_TOKEN={refresh_token}\n")
except Exception as e:
print(f"⚠️ [Passwork Module] Не удалось сохранить новые токены в .env: {e}")
def get_passwork_secrets(required_cards: list = None) -> dict:
"""
Выгружает секреты из сейфа Passwork.
Принимает необязательный список обязательных карточек для защиты от параллельных коллизий API.
"""
secrets_pool = {}
# Запускаем конвейер защиты: 3 попытки штурма API с паузой в 2 секунды
for attempt in range(1, 4):
secrets_pool.clear()
# Принудительно загружаем самые свежие токены из файла (включая только что ротированные)
load_dotenv(ENV_PATH, override=True)
host = os.getenv("PASSWORK_HOST")
vault_id = os.getenv("PASSWORK_VAULT_ID")
access_token = os.getenv("PASSWORK_ACCESS_TOKEN")
refresh_token = os.getenv("PASSWORK_REFRESH_TOKEN")
if not all([host, vault_id, access_token, refresh_token]):
print("❌ [Passwork Module] Ошибка: В /opt/passwork/.env заполнены не все переменные!")
return {}
# Инициализируем официальный SDK
client = PassworkClient(host=host, verify_ssl=False)
client.auto_refresh = False
client.set_tokens(access_token=access_token, refresh_token=refresh_token)
endpoint = f"/api/v1/items?vaultId={vault_id}"
registry = None
try:
# Пробуем сделать прямой запрос по текущему access_token
registry = client.call("GET", endpoint)
except (PassworkError, PassworkResponseError) as e:
error_str = str(e)
# Если токен протух (прошло 2.8 часа) или сервер вернул 401
if "expired" in error_str.lower() or "не найден" in error_str.lower() or getattr(e, 'status_code', None) == 401:
print(f"🔄 [Passwork v7.2.3] Попытка {attempt}: Токен истек. Ротация через SDK...")
try:
client.update_tokens()
new_access = client.access_token
new_refresh = client.refresh_token
# Сохраняем новые токены на диск
save_tokens_to_env(new_access, new_refresh, host, vault_id)
print("✅ Токены успешно обновлены в .env. Повторяю запрос...")
registry = client.call("GET", endpoint)
except Exception as ref_err:
print(f"❌ [Passwork Module] Ротация токенов не удалась: {ref_err}")
if attempt == 3: return {}
continue
else:
if attempt == 3: raise e
continue
if not registry or "items" not in registry:
if attempt == 3: return {}
continue
cards = registry["items"]
# Собираем пары Логин-Пароль
for card in cards:
item_id = card.get("id")
if not item_id: continue
try:
item_info = client.call("GET", f"/api/v1/items/{item_id}")
if not item_info: continue
raw_password_base64 = item_info.get("passwordEncrypted")
if raw_password_base64:
try:
clean_password = base64.b64decode(raw_password_base64).decode("utf-8")
except Exception:
clean_password = raw_password_base64
else:
clean_password = item_info.get("password", "")
clean_login = item_info.get("login", "")
card_name = item_info.get("name")
if card_name:
secrets_pool[card_name.strip(' "\'\r\n')] = {
"login": clean_login,
"password": clean_password
}
except Exception:
continue
# ВАЛИДАЦИЯ ПОЛУЧЕННОГО ПАКЕТА КАРТОЧЕК:
if secrets_pool:
if required_cards:
# Проверяем, все ли запрошенные карточки долетели из API
clean_req_cards = [c.strip(' "\'\r\n') for c in required_cards if c]
missing = [c for c in clean_req_cards if c not in secrets_pool]
if not missing:
if attempt > 1:
print(f"✨ [Passwork Module] Сейф успешно стабилизирован на попытке {attempt}!")
return secrets_pool
else:
print(f"⚠️ [Passwork Module] Конфликт параллельного запроса (Попытка {attempt}). В слепке нет: {missing}")
else:
# Если список не передан, отдаем то, что смогли прочитать с первой попытки
return secrets_pool
# Если дошли сюда и это не последняя попытка — берем тайм-аут, пропуская параллельный поток
if attempt < 3:
print("⏳ [Passwork Module] Засыпаем на 2 секунды для освобождения сессии API...")
time.sleep(2)
return secrets_pool
if __name__ == "__main__":
res = get_passwork_secrets()
print("✅ Выгрузка пула:")
print(res)