Practical code examples for working with the Avala REST API.
Python and TypeScript SDKs are coming soon. The examples below use requests (Python) and fetch (TypeScript) to call the REST API directly. SDK examples with avala.Client and @avala/sdk will be added once the SDKs are published.
Setup
Configure your environment with your Avala API key before running any examples.
import requests
import os
API_KEY = os.environ["AVALA_API_KEY"]
BASE_URL = "https://server.avala.ai/api/v1"
headers = {"X-Avala-Api-Key": API_KEY}
Dataset Management
List Datasets
Retrieve all datasets accessible to your account.
response = requests.get(f"{BASE_URL}/datasets/", headers=headers)
data = response.json()
for ds in data["results"]:
print(f"{ds['name']} ({ds['item_count']} items)")
Get Dataset Details
Fetch details for a specific dataset by owner and slug.
response = requests.get(
f"{BASE_URL}/datasets/acme-ai/training-v2/",
headers=headers
)
dataset = response.json()
print(f"Name: {dataset['name']}")
print(f"Items: {dataset['item_count']}")
print(f"Data type: {dataset['data_type']}")
Project Workflows
List Projects
Retrieve projects, optionally filtering by status.
response = requests.get(
f"{BASE_URL}/projects/",
headers=headers,
params={"status": "ACTIVE"}
)
projects = response.json()["results"]
for proj in projects:
print(f"{proj['name']} - {proj['task_type']}")
Get Project Metrics
Check the progress and quality metrics for a project.
response = requests.get(
f"{BASE_URL}/projects/proj-uuid-001/metrics/",
headers=headers
)
metrics = response.json()
completion = (metrics["completed_tasks"] / metrics["total_tasks"]) * 100
print(f"Progress: {completion:.1f}%")
print(f"Acceptance rate: {metrics['acceptance_rate'] * 100:.1f}%")
Export Pipeline
Create Export, Poll for Completion, and Download
A complete workflow for exporting annotation data from a project.
import time
response = requests.post(
f"{BASE_URL}/projects/proj-uuid-001/exports/",
headers=headers,
json={"format": "coco"}
)
export = response.json()
export_uid = export["uid"]
while True:
status_response = requests.get(
f"{BASE_URL}/exports/{export_uid}/",
headers=headers
)
export = status_response.json()
if export["status"] == "completed":
print(f"Download URL: {export['download_url']}")
break
elif export["status"] == "failed":
raise Exception(f"Export failed: {export.get('error')}")
time.sleep(5)
Annotation Retrieval
Get Task Results
Retrieve annotation results for completed tasks.
Coming Soon — The Tasks list/get API endpoints are under active development. Task results are currently managed through Mission Control and the sessions/work-unit system. The example below shows the planned API surface.
response = requests.get(
f"{BASE_URL}/projects/proj-uuid-001/tasks/",
headers=headers,
params={"status": "completed"}
)
tasks = response.json()["results"]
for task in tasks:
print(f"Task {task['uid']}: {len(task['annotations'])} annotations")
for ann in task["annotations"]:
print(f" - {ann['label']} ({ann['annotation_type']})")
Organization Management
List Members
Retrieve all members of your organization.
response = requests.get(
f"{BASE_URL}/organizations/acme-ai/members/",
headers=headers
)
members = response.json()["results"]
for member in members:
print(f"{member['user']['username']} - {member['role']}")
Send Invitation
Invite a new member to your organization.
response = requests.post(
f"{BASE_URL}/organizations/acme-ai/invitations/",
headers=headers,
json={
"email": "newuser@example.com",
"role": "annotator"
}
)
invitation = response.json()
print(f"Invitation sent to {invitation['email']}")
Error Handling
Robust Retry with Exponential Backoff
Handle transient errors and rate limits gracefully.
import time
from requests.exceptions import RequestException
def api_call_with_retry(method, url, max_retries=3, **kwargs):
"""Make API call with retry logic for rate limits and errors."""
for attempt in range(max_retries):
try:
response = requests.request(
method, url, headers=headers, **kwargs
)
if response.status_code == 429:
retry_after = int(
response.headers.get("Retry-After", 2 ** attempt)
)
print(f"Rate limited, waiting {retry_after}s...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"Request failed, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
raise Exception("Max retries exceeded")
# Usage
datasets = api_call_with_retry("GET", f"{BASE_URL}/datasets/")
Iterate Through All Results
Fetch all pages of a paginated endpoint.
def fetch_all_pages(url, params=None):
"""Fetch all results from a paginated endpoint."""
all_results = []
params = params or {}
while url:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
all_results.extend(data["results"])
url = data.get("next")
params = {} # Clear params after first request (next URL includes them)
return all_results
# Fetch all datasets
all_datasets = fetch_all_pages(f"{BASE_URL}/datasets/")
print(f"Total datasets: {len(all_datasets)}")
# Fetch all items in a dataset
all_items = fetch_all_pages(
f"{BASE_URL}/datasets/acme-ai/training-v2/items/",
params={"limit": 100}
)
print(f"Total items: {len(all_items)}")
Batch Operations
Process Multiple Datasets
Perform operations across multiple datasets efficiently.
import concurrent.futures
def get_dataset_summary(dataset):
"""Fetch detailed info for a single dataset."""
response = requests.get(
f"{BASE_URL}/datasets/{dataset['owner']}/{dataset['slug']}/",
headers=headers
)
response.raise_for_status()
details = response.json()
return {
"name": details["name"],
"item_count": details["item_count"],
"data_type": details["data_type"],
"created_at": details["created_at"],
}
# Fetch all datasets
datasets_response = requests.get(
f"{BASE_URL}/datasets/", headers=headers
)
datasets = datasets_response.json()["results"]
# Process in parallel (respect rate limits)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(get_dataset_summary, ds): ds
for ds in datasets
}
summaries = []
for future in concurrent.futures.as_completed(futures):
try:
summary = future.result()
summaries.append(summary)
print(f"Processed: {summary['name']}")
except Exception as e:
ds = futures[future]
print(f"Error processing {ds['name']}: {e}")
print(f"\nProcessed {len(summaries)} datasets")