144 lines
7.0 KiB
Python
144 lines
7.0 KiB
Python
# /opt/passwork/passwork.py
|
||
import os
|
||
import base64
|
||
import time
|
||
import urllib3
|
||
from passwork_client import PassworkClient
|
||
from passwork_client.exceptions import PassworkError, PassworkResponseError
|
||
from dotenv import load_dotenv
|
||
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
ENV_PATH = "/opt/passwork/.env"
|
||
|
||
def save_tokens_to_env(access_token: str, refresh_token: str, host: str, vault_id: str):
|
||
"""Автоматически перезаписывает обновленные токены в файл .env для следующего запуска"""
|
||
try:
|
||
with open(ENV_PATH, "w", encoding="utf-8") as f:
|
||
f.write(f"PASSWORK_HOST={host}\n")
|
||
f.write(f"PASSWORK_VAULT_ID={vault_id}\n")
|
||
f.write(f"PASSWORK_ACCESS_TOKEN={access_token}\n")
|
||
f.write(f"PASSWORK_REFRESH_TOKEN={refresh_token}\n")
|
||
except Exception as e:
|
||
print(f"⚠️ [Passwork Module] Не удалось сохранить новые токены в .env: {e}")
|
||
|
||
def get_passwork_secrets(required_cards: list = None) -> dict:
|
||
"""
|
||
Выгружает секреты из сейфа Passwork.
|
||
Принимает необязательный список обязательных карточек для защиты от параллельных коллизий API.
|
||
"""
|
||
secrets_pool = {}
|
||
|
||
# Запускаем конвейер защиты: 3 попытки штурма API с паузой в 2 секунды
|
||
for attempt in range(1, 4):
|
||
secrets_pool.clear()
|
||
|
||
# Принудительно загружаем самые свежие токены из файла (включая только что ротированные)
|
||
load_dotenv(ENV_PATH, override=True)
|
||
|
||
host = os.getenv("PASSWORK_HOST")
|
||
vault_id = os.getenv("PASSWORK_VAULT_ID")
|
||
access_token = os.getenv("PASSWORK_ACCESS_TOKEN")
|
||
refresh_token = os.getenv("PASSWORK_REFRESH_TOKEN")
|
||
|
||
if not all([host, vault_id, access_token, refresh_token]):
|
||
print("❌ [Passwork Module] Ошибка: В /opt/passwork/.env заполнены не все переменные!")
|
||
return {}
|
||
|
||
# Инициализируем официальный SDK
|
||
client = PassworkClient(host=host, verify_ssl=False)
|
||
client.auto_refresh = False
|
||
client.set_tokens(access_token=access_token, refresh_token=refresh_token)
|
||
|
||
endpoint = f"/api/v1/items?vaultId={vault_id}"
|
||
registry = None
|
||
|
||
try:
|
||
# Пробуем сделать прямой запрос по текущему access_token
|
||
registry = client.call("GET", endpoint)
|
||
except (PassworkError, PassworkResponseError) as e:
|
||
error_str = str(e)
|
||
|
||
# Если токен протух (прошло 2.8 часа) или сервер вернул 401
|
||
if "expired" in error_str.lower() or "не найден" in error_str.lower() or getattr(e, 'status_code', None) == 401:
|
||
print(f"🔄 [Passwork v7.2.3] Попытка {attempt}: Токен истек. Ротация через SDK...")
|
||
try:
|
||
client.update_tokens()
|
||
new_access = client.access_token
|
||
new_refresh = client.refresh_token
|
||
|
||
# Сохраняем новые токены на диск
|
||
save_tokens_to_env(new_access, new_refresh, host, vault_id)
|
||
|
||
print("✅ Токены успешно обновлены в .env. Повторяю запрос...")
|
||
registry = client.call("GET", endpoint)
|
||
except Exception as ref_err:
|
||
print(f"❌ [Passwork Module] Ротация токенов не удалась: {ref_err}")
|
||
if attempt == 3: return {}
|
||
continue
|
||
else:
|
||
if attempt == 3: raise e
|
||
continue
|
||
|
||
if not registry or "items" not in registry:
|
||
if attempt == 3: return {}
|
||
continue
|
||
|
||
cards = registry["items"]
|
||
|
||
# Собираем пары Логин-Пароль
|
||
for card in cards:
|
||
item_id = card.get("id")
|
||
if not item_id: continue
|
||
|
||
try:
|
||
item_info = client.call("GET", f"/api/v1/items/{item_id}")
|
||
if not item_info: continue
|
||
|
||
raw_password_base64 = item_info.get("passwordEncrypted")
|
||
if raw_password_base64:
|
||
try:
|
||
clean_password = base64.b64decode(raw_password_base64).decode("utf-8")
|
||
except Exception:
|
||
clean_password = raw_password_base64
|
||
else:
|
||
clean_password = item_info.get("password", "")
|
||
|
||
clean_login = item_info.get("login", "")
|
||
card_name = item_info.get("name")
|
||
|
||
if card_name:
|
||
secrets_pool[card_name.strip(' "\'\r\n')] = {
|
||
"login": clean_login,
|
||
"password": clean_password
|
||
}
|
||
except Exception:
|
||
continue
|
||
|
||
# ВАЛИДАЦИЯ ПОЛУЧЕННОГО ПАКЕТА КАРТОЧЕК:
|
||
if secrets_pool:
|
||
if required_cards:
|
||
# Проверяем, все ли запрошенные карточки долетели из API
|
||
clean_req_cards = [c.strip(' "\'\r\n') for c in required_cards if c]
|
||
missing = [c for c in clean_req_cards if c not in secrets_pool]
|
||
|
||
if not missing:
|
||
if attempt > 1:
|
||
print(f"✨ [Passwork Module] Сейф успешно стабилизирован на попытке {attempt}!")
|
||
return secrets_pool
|
||
else:
|
||
print(f"⚠️ [Passwork Module] Конфликт параллельного запроса (Попытка {attempt}). В слепке нет: {missing}")
|
||
else:
|
||
# Если список не передан, отдаем то, что смогли прочитать с первой попытки
|
||
return secrets_pool
|
||
|
||
# Если дошли сюда и это не последняя попытка — берем тайм-аут, пропуская параллельный поток
|
||
if attempt < 3:
|
||
print("⏳ [Passwork Module] Засыпаем на 2 секунды для освобождения сессии API...")
|
||
time.sleep(2)
|
||
|
||
return secrets_pool
|
||
|
||
if __name__ == "__main__":
|
||
res = get_passwork_secrets()
|
||
print("✅ Выгрузка пула:")
|
||
print(res) |