Skip to main content
Practical code examples for working with the Avala REST API.
Python and TypeScript SDKs are coming soon. The examples below use requests (Python) and fetch (TypeScript) to call the REST API directly. SDK examples with avala.Client and @avala/sdk will be added once the SDKs are published.

Setup

Configure your environment with your Avala API key before running any examples.
import requests
import os

API_KEY = os.environ["AVALA_API_KEY"]
BASE_URL = "https://server.avala.ai/api/v1"
headers = {"X-Avala-Api-Key": API_KEY}

Dataset Management

List Datasets

Retrieve all datasets accessible to your account.
response = requests.get(f"{BASE_URL}/datasets/", headers=headers)
data = response.json()
for ds in data["results"]:
    print(f"{ds['name']} ({ds['item_count']} items)")

Get Dataset Details

Fetch details for a specific dataset by owner and slug.
response = requests.get(
    f"{BASE_URL}/datasets/acme-ai/training-v2/",
    headers=headers
)
dataset = response.json()
print(f"Name: {dataset['name']}")
print(f"Items: {dataset['item_count']}")
print(f"Data type: {dataset['data_type']}")

Project Workflows

List Projects

Retrieve projects, optionally filtering by status.
response = requests.get(
    f"{BASE_URL}/projects/",
    headers=headers,
    params={"status": "ACTIVE"}
)
projects = response.json()["results"]
for proj in projects:
    print(f"{proj['name']} - {proj['task_type']}")

Get Project Metrics

Check the progress and quality metrics for a project.
response = requests.get(
    f"{BASE_URL}/projects/proj-uuid-001/metrics/",
    headers=headers
)
metrics = response.json()
completion = (metrics["completed_tasks"] / metrics["total_tasks"]) * 100
print(f"Progress: {completion:.1f}%")
print(f"Acceptance rate: {metrics['acceptance_rate'] * 100:.1f}%")

Export Pipeline

Create Export, Poll for Completion, and Download

A complete workflow for exporting annotation data from a project.
import time

response = requests.post(
    f"{BASE_URL}/projects/proj-uuid-001/exports/",
    headers=headers,
    json={"format": "coco"}
)
export = response.json()
export_uid = export["uid"]

while True:
    status_response = requests.get(
        f"{BASE_URL}/exports/{export_uid}/",
        headers=headers
    )
    export = status_response.json()

    if export["status"] == "completed":
        print(f"Download URL: {export['download_url']}")
        break
    elif export["status"] == "failed":
        raise Exception(f"Export failed: {export.get('error')}")

    time.sleep(5)

Annotation Retrieval

Get Task Results

Retrieve annotation results for completed tasks.
Coming Soon — The Tasks list/get API endpoints are under active development. Task results are currently managed through Mission Control and the sessions/work-unit system. The example below shows the planned API surface.
response = requests.get(
    f"{BASE_URL}/projects/proj-uuid-001/tasks/",
    headers=headers,
    params={"status": "completed"}
)
tasks = response.json()["results"]
for task in tasks:
    print(f"Task {task['uid']}: {len(task['annotations'])} annotations")
    for ann in task["annotations"]:
        print(f"  - {ann['label']} ({ann['annotation_type']})")

Organization Management

List Members

Retrieve all members of your organization.
response = requests.get(
    f"{BASE_URL}/organizations/acme-ai/members/",
    headers=headers
)
members = response.json()["results"]
for member in members:
    print(f"{member['user']['username']} - {member['role']}")

Send Invitation

Invite a new member to your organization.
response = requests.post(
    f"{BASE_URL}/organizations/acme-ai/invitations/",
    headers=headers,
    json={
        "email": "newuser@example.com",
        "role": "annotator"
    }
)
invitation = response.json()
print(f"Invitation sent to {invitation['email']}")

Error Handling

Robust Retry with Exponential Backoff

Handle transient errors and rate limits gracefully.
import time
from requests.exceptions import RequestException

def api_call_with_retry(method, url, max_retries=3, **kwargs):
    """Make API call with retry logic for rate limits and errors."""
    for attempt in range(max_retries):
        try:
            response = requests.request(
                method, url, headers=headers, **kwargs
            )

            if response.status_code == 429:
                retry_after = int(
                    response.headers.get("Retry-After", 2 ** attempt)
                )
                print(f"Rate limited, waiting {retry_after}s...")
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response.json()

        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt
            print(f"Request failed, retrying in {wait_time}s: {e}")
            time.sleep(wait_time)

    raise Exception("Max retries exceeded")

# Usage
datasets = api_call_with_retry("GET", f"{BASE_URL}/datasets/")

Pagination

Iterate Through All Results

Fetch all pages of a paginated endpoint.
def fetch_all_pages(url, params=None):
    """Fetch all results from a paginated endpoint."""
    all_results = []
    params = params or {}

    while url:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()

        all_results.extend(data["results"])
        url = data.get("next")
        params = {}  # Clear params after first request (next URL includes them)

    return all_results

# Fetch all datasets
all_datasets = fetch_all_pages(f"{BASE_URL}/datasets/")
print(f"Total datasets: {len(all_datasets)}")

# Fetch all items in a dataset
all_items = fetch_all_pages(
    f"{BASE_URL}/datasets/acme-ai/training-v2/items/",
    params={"limit": 100}
)
print(f"Total items: {len(all_items)}")

Batch Operations

Process Multiple Datasets

Perform operations across multiple datasets efficiently.
import concurrent.futures

def get_dataset_summary(dataset):
    """Fetch detailed info for a single dataset."""
    response = requests.get(
        f"{BASE_URL}/datasets/{dataset['owner']}/{dataset['slug']}/",
        headers=headers
    )
    response.raise_for_status()
    details = response.json()
    return {
        "name": details["name"],
        "item_count": details["item_count"],
        "data_type": details["data_type"],
        "created_at": details["created_at"],
    }

# Fetch all datasets
datasets_response = requests.get(
    f"{BASE_URL}/datasets/", headers=headers
)
datasets = datasets_response.json()["results"]

# Process in parallel (respect rate limits)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = {
        executor.submit(get_dataset_summary, ds): ds
        for ds in datasets
    }

    summaries = []
    for future in concurrent.futures.as_completed(futures):
        try:
            summary = future.result()
            summaries.append(summary)
            print(f"Processed: {summary['name']}")
        except Exception as e:
            ds = futures[future]
            print(f"Error processing {ds['name']}: {e}")

print(f"\nProcessed {len(summaries)} datasets")