"""Тесты для utils/rate_limiter.py — проверка логики токен-бакета.""" import asyncio import time from utils.rate_limiter import RateLimiter async def test_initial_tokens_full() -> None: """Бакет заполнен до burst при создании.""" limiter = RateLimiter(rate=2.0, burst=5) assert limiter.tokens == 5.0 async def test_acquire_consumes_token() -> None: """acquire() уменьшает количество токенов.""" limiter = RateLimiter(rate=1.0, burst=3) await limiter.acquire() assert limiter.tokens == 2.0 async def test_acquire_waits_when_empty() -> None: """acquire() ждёт, когда токены закончились.""" limiter = RateLimiter(rate=10.0, burst=1) # 10 токенов/сек await limiter.acquire() # бакет пуст start = time.monotonic() await limiter.acquire() # должен ждать ~0.1 сек elapsed = time.monotonic() - start assert elapsed >= 0.05 # допускаем погрешность async def test_burst_cap() -> None: """Токены не превышают burst после долгого простоя.""" limiter = RateLimiter(rate=100.0, burst=3) await asyncio.sleep(0.1) # теоретически +10 токенов, но cap = 3 async with limiter.lock: limiter._refill() assert limiter.tokens == 3.0 async def test_multiple_acquire() -> None: """Можно забрать несколько токенов за раз.""" limiter = RateLimiter(rate=1.0, burst=10) await limiter.acquire(token=5) assert limiter.tokens == 5.0