Asynchronous Usage#
FolioClient provides full async/await support for high-performance applications. This is especially useful when making many concurrent API calls or when integrating with async frameworks.
Basic Async Operations#
All synchronous methods have async counterparts:
import asyncio
from folioclient import FolioClient
async def main():
client = FolioClient(
folio_url="https://your-folio-instance.com",
tenant="your_tenant",
username="your_username",
password="your_password"
)
try:
# Async API calls
users = await client.folio_get_async("/users", "users")
groups = await client.folio_get_async("/groups", "usergroups")
print(f"Found {len(users)} users and {len(groups)} groups")
finally:
await client.async_close()
asyncio.run(main())
Async Method Reference#
Sync Method |
Async Equivalent |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Async Pagination#
Process large datasets asynchronously:
async def process_all_users():
async with FolioClient(...)as client:
# Async iteration over all users
async for user in client.folio_get_all_async("/users", "users"):
print(f"Processing user: {user['username']}")
# Perform async operations on each user
await process_user_data(user)
asyncio.run(process_all_users())
Concurrent Operations#
Make multiple API calls concurrently for better performance:
import asyncio
from folioclient import FolioClient
async def fetch_multiple_resources():
async with FolioClient(...) as client:
# Run multiple requests concurrently
users_task = client.folio_get_async("/users", "users")
groups_task = client.folio_get_async("/groups", "usergroups")
locations_task = client.folio_get_async("/locations", "locations")
# Wait for all to complete
users, groups, locations = await asyncio.gather(
users_task,
groups_task,
locations_task
)
print(f"Loaded {len(users)} users, {len(groups)} groups, {len(locations)} locations")
asyncio.run(fetch_multiple_resources())
Batch Processing#
Process data in batches with controlled concurrency:
import asyncio
from asyncio import Semaphore
async def process_users_in_batches():
async with client = FolioClient(...) as client
semaphore = Semaphore(10) # Limit to 10 concurrent operations
async def process_user_with_limit(user):
async with semaphore:
# Process one user with concurrency limit
return await process_user_details(client, user)
# Get all users
users = await client.folio_get_async("/users", "users")
# Process in batches with concurrency control
tasks = [process_user_with_limit(user) for user in users]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} users")
async def process_user_details(client, user):
# Fetch additional details for each user
user_id = user['id']
loans = await client.folio_get_async(
"/circulation/loans",
"loans",
query=f"userId=={user_id}"
)
return {'user': user, 'loan_count': len(loans)}
asyncio.run(process_users_in_batches())
Error Handling in Async Code#
FolioClient’s automatic retry logic works seamlessly with async operations:
import asyncio
import os
import httpx
from folioclient import FolioClient
from folioclient.exceptions import FolioClientClosed
async def robust_async_operations():
# Configure retries for async operations
os.environ['FOLIOCLIENT_MAX_SERVER_ERROR_RETRIES'] = '3'
os.environ['FOLIOCLIENT_MAX_AUTH_ERROR_RETRIES'] = '2'
async with FolioClient(...) as client:
# Async operations get automatic retry behavior
tasks = []
for endpoint in ["/users", "/groups", "/locations"]:
task = safe_fetch(client, endpoint)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {len(result)} items")
async def safe_fetch(client, endpoint):
try:
# This call includes automatic retry logic for:
# - Server errors (502, 503, 504)
# - Connection errors
# - Auth errors (403) with re-authentication
return await client.folio_get_async(endpoint, endpoint.split('/')[-1])
except httpx.HTTPStatusError as e:
print(f"HTTP error on {endpoint}: {e.response.status_code}")
raise
except FolioClientClosed:
print(f"Client closed during {endpoint} request")
raise
except Exception as e:
print(f"Unexpected error on {endpoint}: {e}")
raise
asyncio.run(robust_async_operations())
Async Retry Benefits#
The retry system provides special benefits for async operations:
Non-blocking retries - Other coroutines continue running during retry delays
Concurrent retry isolation - Each async operation retries independently
Shared connection pooling - Retries reuse connection pools efficiently
Graceful degradation - Failed operations don’t block successful ones
async def concurrent_with_retries():
client = FolioClient(...)
# Configure fast retries for development
os.environ['FOLIOCLIENT_SERVER_ERROR_RETRY_DELAY'] = '1.0'
# Multiple concurrent operations, each with independent retry logic
async def fetch_data(endpoint, key):
# Each call gets its own retry behavior
return await client.folio_get_async(endpoint, key)
# All operations run concurrently, retries don't block each other
users, groups, locations = await asyncio.gather(
fetch_data("/users", "users"),
fetch_data("/groups", "usergroups"),
fetch_data("/locations", "locations")
)
return users, groups, locations
Context Managers with Async#
Use async context managers for proper resource cleanup:
async def using_async_context_manager():
# Usage
async with FolioClient(folio_url="...", tenant="...",
username="...", password="...") as client:
users = await client.folio_get_async("/users", "users")
# Client automatically closed
Integration with Web Frameworks#
FastAPI Integration#
from fastapi import FastAPI, HTTPException
from folioclient import FolioClient
import httpx
app = FastAPI()
# Global client instance (in production, use dependency injection)
folio_client = FolioClient(...)
@app.get("/api/users")
async def get_users(limit: int = 100):
try:
users = await folio_client.folio_get_async(
"/users", "users", limit=limit
)
return {"users": users, "count": len(users)}
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code,
detail=f"FOLIO API error: {e}"
)
@app.on_event("shutdown")
async def shutdown():
await folio_client.async_close()
Aiohttp Integration#
from aiohttp import web
from folioclient import FolioClient
async def get_users(request):
client = request.app['folio_client']
try:
users = await client.folio_get_async("/users", "users")
return web.json_response({"users": users})
except Exception as e:
return web.json_response(
{"error": str(e)},
status=500
)
async def create_app():
app = web.Application()
# Initialize FolioClient
app['folio_client'] = FolioClient(...)
# Add routes
app.router.add_get('/users', get_users)
return app
async def cleanup(app):
await app['folio_client'].close()
if __name__ == '__main__':
app = create_app()
app.on_cleanup.append(cleanup)
web.run_app(app, host='localhost', port=8080)
Performance Considerations#
Async Best Practices#
Use semaphores to limit concurrency and avoid overwhelming FOLIO
Batch operations when possible
Handle exceptions at appropriate levels
Always close clients when done
Use connection pooling for multiple clients
Use a context manager to ensure a persistent httpx.AsyncClient session and proper resource clean-up
# Good async pattern
async def efficient_processing():
semaphore = asyncio.Semaphore(5) # Limit concurrent requests
client = FolioClient(...)
async def process_with_limit(item):
async with semaphore:
return await process_item(client, item)
try:
items = await client.folio_get_async("/items", "items")
tasks = [process_with_limit(item) for item in items[:100]]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Processed: {len(successful)}, Failed: {len(failed)}")
finally:
await client.async_close()
Testing Async Code#
Use pytest-asyncio for testing async functionality:
import pytest
from unittest.mock import AsyncMock, patch
from folioclient import FolioClient
@pytest.mark.asyncio
async def test_async_folio_get():
with patch.object(FolioClient, 'folio_get_async', new_callable=AsyncMock) as mock_get:
mock_get.return_value = [{"id": "1", "username": "test"}]
client = FolioClient(...)
users = await client.folio_get_async("/users", "users")
assert len(users) == 1
assert users[0]["username"] == "test"
mock_get.assert_called_once_with("/users", "users")
Debugging Async Issues#
Enable asyncio debug mode during development:
import asyncio
import logging
# Enable asyncio debug mode
asyncio.get_event_loop().set_debug(True)
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Your async code here
async def debug_example():
client = FolioClient(...)
try:
users = await client.folio_get_async("/users", "users")
finally:
await client.async_close()
asyncio.run(debug_example())