# /opt/passwork/passwork.py import os import base64 import time import urllib3 from passwork_client import PassworkClient from passwork_client.exceptions import PassworkError, PassworkResponseError from dotenv import load_dotenv urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ENV_PATH = "/opt/passwork/.env" def save_tokens_to_env(access_token: str, refresh_token: str, host: str, vault_id: str): """Автоматически перезаписывает обновленные токены в файл .env для следующего запуска""" try: with open(ENV_PATH, "w", encoding="utf-8") as f: f.write(f"PASSWORK_HOST={host}\n") f.write(f"PASSWORK_VAULT_ID={vault_id}\n") f.write(f"PASSWORK_ACCESS_TOKEN={access_token}\n") f.write(f"PASSWORK_REFRESH_TOKEN={refresh_token}\n") except Exception as e: print(f"⚠️ [Passwork Module] Не удалось сохранить новые токены в .env: {e}") def get_passwork_secrets(required_cards: list = None) -> dict: """ Выгружает секреты из сейфа Passwork. Принимает необязательный список обязательных карточек для защиты от параллельных коллизий API. """ secrets_pool = {} # Запускаем конвейер защиты: 3 попытки штурма API с паузой в 2 секунды for attempt in range(1, 4): secrets_pool.clear() # Принудительно загружаем самые свежие токены из файла (включая только что ротированные) load_dotenv(ENV_PATH, override=True) host = os.getenv("PASSWORK_HOST") vault_id = os.getenv("PASSWORK_VAULT_ID") access_token = os.getenv("PASSWORK_ACCESS_TOKEN") refresh_token = os.getenv("PASSWORK_REFRESH_TOKEN") if not all([host, vault_id, access_token, refresh_token]): print("❌ [Passwork Module] Ошибка: В /opt/passwork/.env заполнены не все переменные!") return {} # Инициализируем официальный SDK client = PassworkClient(host=host, verify_ssl=False) client.auto_refresh = False client.set_tokens(access_token=access_token, refresh_token=refresh_token) endpoint = f"/api/v1/items?vaultId={vault_id}" registry = None try: # Пробуем сделать прямой запрос по текущему access_token registry = client.call("GET", endpoint) except (PassworkError, PassworkResponseError) as e: error_str = str(e) # Если токен протух (прошло 2.8 часа) или сервер вернул 401 if "expired" in error_str.lower() or "не найден" in error_str.lower() or getattr(e, 'status_code', None) == 401: print(f"🔄 [Passwork v7.2.3] Попытка {attempt}: Токен истек. Ротация через SDK...") try: client.update_tokens() new_access = client.access_token new_refresh = client.refresh_token # Сохраняем новые токены на диск save_tokens_to_env(new_access, new_refresh, host, vault_id) print("✅ Токены успешно обновлены в .env. Повторяю запрос...") registry = client.call("GET", endpoint) except Exception as ref_err: print(f"❌ [Passwork Module] Ротация токенов не удалась: {ref_err}") if attempt == 3: return {} continue else: if attempt == 3: raise e continue if not registry or "items" not in registry: if attempt == 3: return {} continue cards = registry["items"] # Собираем пары Логин-Пароль for card in cards: item_id = card.get("id") if not item_id: continue try: item_info = client.call("GET", f"/api/v1/items/{item_id}") if not item_info: continue raw_password_base64 = item_info.get("passwordEncrypted") if raw_password_base64: try: clean_password = base64.b64decode(raw_password_base64).decode("utf-8") except Exception: clean_password = raw_password_base64 else: clean_password = item_info.get("password", "") clean_login = item_info.get("login", "") card_name = item_info.get("name") if card_name: secrets_pool[card_name.strip(' "\'\r\n')] = { "login": clean_login, "password": clean_password } except Exception: continue # ВАЛИДАЦИЯ ПОЛУЧЕННОГО ПАКЕТА КАРТОЧЕК: if secrets_pool: if required_cards: # Проверяем, все ли запрошенные карточки долетели из API clean_req_cards = [c.strip(' "\'\r\n') for c in required_cards if c] missing = [c for c in clean_req_cards if c not in secrets_pool] if not missing: if attempt > 1: print(f"✨ [Passwork Module] Сейф успешно стабилизирован на попытке {attempt}!") return secrets_pool else: print(f"⚠️ [Passwork Module] Конфликт параллельного запроса (Попытка {attempt}). В слепке нет: {missing}") else: # Если список не передан, отдаем то, что смогли прочитать с первой попытки return secrets_pool # Если дошли сюда и это не последняя попытка — берем тайм-аут, пропуская параллельный поток if attempt < 3: print("⏳ [Passwork Module] Засыпаем на 2 секунды для освобождения сессии API...") time.sleep(2) return secrets_pool if __name__ == "__main__": res = get_passwork_secrets() print("✅ Выгрузка пула:") print(res)