Auto-generated from SDK source. Last updated: February 2026.
Client
avala.Client
Synchronous client for the Avala API.
from avala import Client
client = Client(
api_key="avk_...", # or set AVALA_API_KEY env var
base_url="https://api.avala.ai/api/v1", # default
timeout=30.0, # seconds
max_retries=2,
)
| Parameter | Type | Default | Description |
|---|
api_key | str | None | AVALA_API_KEY env var | Your Avala API key. |
base_url | str | None | https://api.avala.ai/api/v1 | API base URL. |
timeout | float | 30.0 | Request timeout in seconds. |
max_retries | int | 2 | Automatic retries on transient failures (5xx, timeouts). |
Resources:
| Property | Type | Description |
|---|
client.datasets | Datasets | Dataset operations |
client.projects | Projects | Project operations (read-only) |
client.exports | Exports | Export operations |
client.tasks | Tasks | Task operations (read-only) |
client.storage_configs | StorageConfigs | Cloud storage operations |
client.agents | Agents | Automation agent operations |
client.inference_providers | InferenceProviders | Inference provider operations |
client.auto_label_jobs | AutoLabelJobs | Auto-labeling job operations |
client.quality_targets | QualityTargets | Quality target operations (project-scoped) |
client.consensus | Consensus | Consensus scoring operations (project-scoped) |
client.webhooks | Webhooks | Webhook subscription operations |
client.webhook_deliveries | WebhookDeliveries | Webhook delivery log operations (read-only) |
client.organizations | Organizations | Organization management |
client.slices | Slices | Slice operations |
Methods:
| Method | Returns | Description |
|---|
client.rate_limit_info | dict[str, str | None] | Rate limit headers from last response |
client.close() | None | Close the HTTP transport |
Supports context manager:
with Client() as client:
datasets = client.datasets.list()
avala.AsyncClient
Asynchronous client with the same interface. All resource methods are async.
from avala import AsyncClient
async with AsyncClient() as client:
datasets = await client.datasets.list()
Resources
datasets
client.datasets.create()
Create a new dataset.
dataset = client.datasets.create(
name="LiDAR Captures Q1",
slug="lidar-captures-q1",
data_type="lidar",
is_sequence=True,
visibility="private",
provider_config={
"provider": "aws_s3",
"s3_bucket_name": "my-datasets",
"s3_bucket_region": "us-east-1",
"s3_bucket_prefix": "captures/q1/",
"s3_access_key_id": "AKIA...",
"s3_secret_access_key": "your-secret-key",
},
owner_name="my-org",
)
| Parameter | Type | Default | Description |
|---|
name | str | — | Display name for the dataset. |
slug | str | — | URL-friendly identifier. |
data_type | str | — | Data type: image, video, lidar, or mcap. |
is_sequence | bool | False | Whether the dataset contains sequences. |
visibility | str | "private" | "private" or "public". |
create_metadata | bool | True | Whether to create dataset metadata. |
provider_config | dict | None | None | Cloud storage provider configuration. |
owner_name | str | None | None | Dataset owner username or email. |
Returns: Dataset
client.datasets.list()
List datasets with optional filtering and cursor-based pagination.
page = client.datasets.list(data_type="image", name="highway", limit=20)
| Parameter | Type | Default | Description |
|---|
data_type | str | None | None | Filter by data type: image, video, lidar, mcap, or splat. |
name | str | None | None | Filter by name (case-insensitive substring match). |
status | str | None | None | Filter by status: creating or created. |
visibility | str | None | None | Filter by visibility: private or public. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor from a previous response. |
Returns: CursorPage[Dataset]
client.datasets.get()
Get a single dataset by UID.
dataset = client.datasets.get("550e8400-e29b-41d4-a716-446655440000")
| Parameter | Type | Description |
|---|
uid | str | Dataset unique identifier. |
Returns: Dataset
client.datasets.list_items()
List items in a dataset. Uses the owner/slug path.
page = client.datasets.list_items("my-org", "my-dataset", limit=50)
| Parameter | Type | Default | Description |
|---|
owner | str | — | Dataset owner (organization slug). |
slug | str | — | Dataset slug. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[DatasetItem]
client.datasets.get_item()
Get a single item within a dataset.
item = client.datasets.get_item("my-org", "my-dataset", "item_uid")
| Parameter | Type | Description |
|---|
owner | str | Dataset owner (organization slug). |
slug | str | Dataset slug. |
item_uid | str | Item unique identifier. |
Returns: DatasetItem
client.datasets.list_sequences()
List sequences in a dataset.
page = client.datasets.list_sequences("my-org", "my-dataset", limit=50)
| Parameter | Type | Default | Description |
|---|
owner | str | — | Dataset owner (organization slug). |
slug | str | — | Dataset slug. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[DatasetSequence]
client.datasets.get_sequence()
Get a single sequence within a dataset.
seq = client.datasets.get_sequence("my-org", "my-dataset", "sequence_uid")
| Parameter | Type | Description |
|---|
owner | str | Dataset owner (organization slug). |
slug | str | Dataset slug. |
sequence_uid | str | Sequence unique identifier. |
Returns: DatasetSequence
projects
client.projects.list()
List projects with cursor-based pagination.
page = client.projects.list(limit=20)
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Project]
client.projects.get()
Get a single project by UID.
project = client.projects.get("770a9600-a40d-63f6-c938-668877660000")
| Parameter | Type | Description |
|---|
uid | str | Project unique identifier. |
Returns: Project
tasks
client.tasks.list()
List tasks with optional filtering and cursor-based pagination.
page = client.tasks.list(project="proj_uid", status="pending", limit=50)
| Parameter | Type | Default | Description |
|---|
project | str | None | None | Filter by project UID. |
status | str | None | None | Filter by task status. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Task]
client.tasks.get()
Get a single task by UID.
task = client.tasks.get("990c1800-b62f-85a8-e150-880099880000")
| Parameter | Type | Description |
|---|
uid | str | Task unique identifier. |
Returns: Task
exports
client.exports.list()
List exports with cursor-based pagination.
page = client.exports.list(limit=10)
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Export]
client.exports.create()
Create a new export. Provide either project or dataset.
export = client.exports.create(project="proj_uid")
| Parameter | Type | Default | Description |
|---|
project | str | None | None | Project UID to export. |
dataset | str | None | None | Dataset UID to export. |
Returns: Export
client.exports.get()
Get a single export by UID.
export = client.exports.get("export_uid")
| Parameter | Type | Description |
|---|
uid | str | Export unique identifier. |
Returns: Export
storage_configs
client.storage_configs.list()
List storage configurations.
page = client.storage_configs.list()
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[StorageConfig]
client.storage_configs.create()
Create a new storage configuration.
import os
config = client.storage_configs.create(
name="Production S3",
provider="s3",
s3_bucket_name="my-bucket",
s3_bucket_region="us-west-2",
s3_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
s3_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Configuration name. |
provider | str | Yes | "s3" or "google_cloud". |
s3_bucket_name | str | None | No | S3 bucket name. |
s3_bucket_region | str | None | No | S3 region (e.g., us-west-2). |
s3_bucket_prefix | str | None | No | S3 key prefix. |
s3_access_key_id | str | None | No | AWS access key ID. |
s3_secret_access_key | str | None | No | AWS secret access key. |
s3_is_accelerated | bool | None | No | Enable S3 Transfer Acceleration. |
gc_storage_bucket_name | str | None | No | GCS bucket name. |
gc_storage_prefix | str | None | No | GCS key prefix. |
gc_storage_auth_json_content | str | None | No | GCS service account JSON. |
Returns: StorageConfig
client.storage_configs.test()
Test a storage configuration.
result = client.storage_configs.test("config_uid")
Returns: dict[str, Any]
client.storage_configs.delete()
Delete a storage configuration.
client.storage_configs.delete("config_uid")
Returns: None
agents
client.agents.list()
List automation agents.
page = client.agents.list(limit=20)
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Agent]
client.agents.get()
Get a single agent by UID.
agent = client.agents.get("agent_uid")
| Parameter | Type | Description |
|---|
uid | str | Agent unique identifier. |
Returns: Agent
client.agents.create()
Create a new automation agent.
agent = client.agents.create(
name="QA Bot",
events=["task.completed"],
callback_url="https://example.com/webhook",
task_types=["annotation"],
)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Agent name. |
events | list[str] | None | No | Events the agent listens to. |
description | str | None | No | Agent description. |
callback_url | str | None | No | URL called when events fire. |
is_active | bool | None | No | Enable or disable the agent. |
project | str | None | No | Scope to a specific project UID. |
task_types | list[str] | None | No | Task types the agent handles. |
Returns: Agent
client.agents.update()
Update an existing agent.
agent = client.agents.update("agent_uid", is_active=False, callback_url="https://example.com/new-webhook")
| Parameter | Type | Required | Description |
|---|
uid | str | Yes | Agent unique identifier. |
name | str | None | No | Agent name. |
events | list[str] | None | No | Events to listen to. |
description | str | None | No | Agent description. |
callback_url | str | None | No | Callback URL. |
is_active | bool | None | No | Enable or disable the agent. |
project | str | None | No | Project UID scope. |
task_types | list[str] | None | No | Task types. |
Returns: Agent
client.agents.delete()
Delete an agent.
client.agents.delete("agent_uid")
Returns: None
client.agents.list_executions()
List execution history for an agent.
page = client.agents.list_executions("agent_uid", limit=50)
| Parameter | Type | Default | Description |
|---|
uid | str | — | Agent unique identifier. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[AgentExecution]
client.agents.test()
Send a test event to an agent.
result = client.agents.test("agent_uid")
# {"success": True}
Returns: dict[str, Any]
inference_providers
client.inference_providers.list()
List inference providers.
page = client.inference_providers.list()
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[InferenceProvider]
client.inference_providers.get()
Get a single inference provider by UID.
provider = client.inference_providers.get("provider_uid")
Returns: InferenceProvider
client.inference_providers.create()
Create a new inference provider.
provider = client.inference_providers.create(
name="My SageMaker",
provider_type="sagemaker",
config={"endpoint": "my-endpoint", "region": "us-east-1"},
)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Provider name. |
provider_type | str | Yes | Provider type (e.g. "sagemaker", "http"). |
config | dict[str, Any] | Yes | Provider-specific configuration. |
description | str | None | No | Provider description. |
is_active | bool | None | No | Enable or disable. |
project | str | None | No | Scope to a project UID. |
Returns: InferenceProvider
client.inference_providers.update()
Update an inference provider.
provider = client.inference_providers.update("provider_uid", is_active=False)
| Parameter | Type | Required | Description |
|---|
uid | str | Yes | Provider unique identifier. |
name | str | None | No | Provider name. |
description | str | None | No | Provider description. |
provider_type | str | None | No | Provider type. |
config | dict[str, Any] | None | No | Configuration object. |
is_active | bool | None | No | Enable or disable. |
project | str | None | No | Project UID scope. |
Returns: InferenceProvider
client.inference_providers.delete()
Delete an inference provider.
client.inference_providers.delete("provider_uid")
Returns: None
client.inference_providers.test()
Test connectivity to an inference provider.
result = client.inference_providers.test("provider_uid")
# {"success": True, "message": "Connection successful", "tested_at": "2026-02-24T..."}
Returns: dict[str, Any]
auto_label_jobs
client.auto_label_jobs.list()
List auto-label jobs with optional filtering.
page = client.auto_label_jobs.list(project="proj_uid", limit=20)
| Parameter | Type | Default | Description |
|---|
project | str | None | None | Filter by project UID. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[AutoLabelJob]
client.auto_label_jobs.get()
Get a single auto-label job by UID.
job = client.auto_label_jobs.get("job_uid")
print(f"Progress: {job.progress_pct}%")
Returns: AutoLabelJob
client.auto_label_jobs.create()
Create a new auto-label job for a project.
job = client.auto_label_jobs.create(
"proj_uid",
model_type="sam3",
confidence_threshold=0.85,
labels=["car", "truck"],
)
| Parameter | Type | Required | Description |
|---|
project_uid | str | Yes | Project to auto-label. |
model_type | str | None | No | Model type to use. |
confidence_threshold | float | None | No | Minimum confidence score. |
labels | list[str] | None | No | Labels to apply. |
dry_run | bool | None | No | Preview without applying labels. |
Returns: AutoLabelJob
client.auto_label_jobs.cancel()
Cancel a running auto-label job.
client.auto_label_jobs.cancel("job_uid")
Returns: None
quality_targets
All quality target methods are scoped to a project.
client.quality_targets.list()
List quality targets for a project.
page = client.quality_targets.list("proj_uid")
| Parameter | Type | Default | Description |
|---|
project_uid | str | — | Project unique identifier. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[QualityTarget]
client.quality_targets.get()
Get a single quality target.
target = client.quality_targets.get("proj_uid", "target_uid")
| Parameter | Type | Description |
|---|
project_uid | str | Project unique identifier. |
uid | str | Quality target unique identifier. |
Returns: QualityTarget
client.quality_targets.create()
Create a quality target for a project.
target = client.quality_targets.create(
"proj_uid",
name="Accuracy Target",
metric="accuracy",
threshold=0.95,
operator="gte",
severity="critical",
notify_webhook=True,
notify_emails=["alerts@example.com"],
)
| Parameter | Type | Required | Description |
|---|
project_uid | str | Yes | Project unique identifier. |
name | str | Yes | Target name. |
metric | str | Yes | Metric to track. |
threshold | float | Yes | Threshold value. |
operator | str | None | No | Comparison operator ("gte", "lte", etc.). |
severity | str | None | No | Severity level. |
is_active | bool | None | No | Enable or disable. |
notify_webhook | bool | None | No | Send webhook on breach. |
notify_emails | list[str] | None | No | Email addresses for breach alerts. |
Returns: QualityTarget
client.quality_targets.update()
Update a quality target.
target = client.quality_targets.update("proj_uid", "target_uid", threshold=0.9)
| Parameter | Type | Required | Description |
|---|
project_uid | str | Yes | Project unique identifier. |
uid | str | Yes | Quality target unique identifier. |
name | str | None | No | Target name. |
metric | str | None | No | Metric to track. |
threshold | float | None | No | Threshold value. |
operator | str | None | No | Comparison operator. |
severity | str | None | No | Severity level. |
is_active | bool | None | No | Enable or disable. |
notify_webhook | bool | None | No | Send webhook on breach. |
notify_emails | list[str] | None | No | Email addresses for breach alerts. |
Returns: QualityTarget
client.quality_targets.delete()
Delete a quality target.
client.quality_targets.delete("proj_uid", "target_uid")
Returns: None
client.quality_targets.evaluate()
Evaluate all quality targets for a project and return current results.
results = client.quality_targets.evaluate("proj_uid")
for r in results:
print(f"{r.name}: {r.current_value} (breached: {r.is_breached})")
Returns: list[QualityTargetEvaluation]
consensus
All consensus methods are scoped to a project.
client.consensus.get_summary()
Get consensus summary statistics for a project.
summary = client.consensus.get_summary("proj_uid")
print(f"Mean score: {summary.mean_score}")
print(f"Items with consensus: {summary.items_with_consensus}/{summary.total_items}")
Returns: ConsensusSummary
client.consensus.list_scores()
List consensus scores for individual items.
page = client.consensus.list_scores("proj_uid", limit=50)
| Parameter | Type | Default | Description |
|---|
project_uid | str | — | Project unique identifier. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[ConsensusScore]
client.consensus.compute()
Trigger consensus computation for a project.
result = client.consensus.compute("proj_uid")
print(result.status) # "started"
Returns: ConsensusComputeResult
client.consensus.get_config()
Get consensus configuration for a project.
config = client.consensus.get_config("proj_uid")
print(f"IoU threshold: {config.iou_threshold}")
Returns: ConsensusConfig
client.consensus.update_config()
Update consensus configuration. Uses PUT (full replacement).
config = client.consensus.update_config(
"proj_uid",
iou_threshold=0.7,
min_agreement_ratio=0.8,
min_annotations=3,
)
| Parameter | Type | Required | Description |
|---|
project_uid | str | Yes | Project unique identifier. |
iou_threshold | float | None | No | IoU threshold for matching. |
min_agreement_ratio | float | None | No | Minimum agreement ratio. |
min_annotations | int | None | No | Minimum annotations required. |
Returns: ConsensusConfig
webhooks
client.webhooks.list()
List webhook subscriptions.
page = client.webhooks.list()
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Webhook]
client.webhooks.get()
Get a single webhook by UID.
webhook = client.webhooks.get("webhook_uid")
Returns: Webhook
client.webhooks.create()
Create a new webhook subscription.
webhook = client.webhooks.create(
target_url="https://example.com/webhook",
events=["task.completed", "export.ready"],
is_active=True,
)
| Parameter | Type | Required | Description |
|---|
target_url | str | Yes | URL to receive events. |
events | list[str] | Yes | Events to subscribe to. |
is_active | bool | None | No | Enable the webhook (default: True). |
secret | str | None | No | HMAC signing secret (auto-generated if omitted). |
Returns: Webhook
client.webhooks.update()
Update a webhook subscription.
webhook = client.webhooks.update("webhook_uid", is_active=False)
| Parameter | Type | Required | Description |
|---|
uid | str | Yes | Webhook unique identifier. |
target_url | str | None | No | URL to receive events. |
events | list[str] | None | No | Events to subscribe to. |
is_active | bool | None | No | Enable or disable. |
Returns: Webhook
client.webhooks.delete()
Delete a webhook subscription.
client.webhooks.delete("webhook_uid")
Returns: None
client.webhooks.test()
Send a test event to a webhook.
result = client.webhooks.test("webhook_uid")
# {"success": True}
Returns: dict[str, Any]
webhook_deliveries
client.webhook_deliveries.list()
List webhook delivery attempts.
page = client.webhook_deliveries.list(limit=20)
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[WebhookDelivery]
client.webhook_deliveries.get()
Get a single webhook delivery by UID.
delivery = client.webhook_deliveries.get("delivery_uid")
print(f"Status: {delivery.status}, Attempts: {delivery.attempts}")
Returns: WebhookDelivery
organizations
client.organizations.list()
List organizations.
page = client.organizations.list(limit=20)
| Parameter | Type | Default | Description |
|---|
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Organization]
client.organizations.get()
Get a single organization by slug.
org = client.organizations.get("my-org")
| Parameter | Type | Description |
|---|
slug | str | Organization slug. |
Returns: Organization
client.organizations.create()
Create a new organization.
org = client.organizations.create(
name="My Team",
visibility="private",
industry="technology",
)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Organization name. |
description | str | None | No | Organization description. |
logo | str | None | No | Logo URL. |
website | str | None | No | Website URL. |
industry | str | None | No | Industry (use industry_choices() for options). |
email | str | None | No | Contact email. |
phone | str | None | No | Contact phone. |
visibility | str | None | No | "public" or "private". |
Returns: Organization
client.organizations.update()
Update an organization.
org = client.organizations.update("my-org", name="New Name", description="Updated desc")
| Parameter | Type | Required | Description |
|---|
slug | str | Yes | Organization slug. |
name | str | None | No | Organization name. |
description | str | None | No | Description. |
logo | str | None | No | Logo URL. |
website | str | None | No | Website URL. |
handle | str | None | No | Organization handle. |
industry | str | None | No | Industry. |
email | str | None | No | Contact email. |
phone | str | None | No | Contact phone. |
visibility | str | None | No | Visibility setting. |
Returns: Organization
client.organizations.delete()
Deleting an organization is irreversible and may cascade-delete all associated projects, datasets, and other resources. Double-check the organization slug before calling this method.
Delete an organization.
client.organizations.delete("my-org")
Returns: None
client.organizations.industry_choices()
Get the list of valid industry choices.
choices = client.organizations.industry_choices()
Returns: Dict[str, Any] — A dictionary containing industry choices.
client.organizations.list_members()
List members of an organization.
page = client.organizations.list_members("my-org", role="admin")
| Parameter | Type | Default | Description |
|---|
slug | str | — | Organization slug. |
search | str | None | None | Search by name or email. |
role | str | None | None | Filter by role. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[OrganizationMember]
client.organizations.remove_member()
Remove a member from an organization.
client.organizations.remove_member("my-org", "user_uid")
Returns: None
client.organizations.update_member_role()
Update a member’s role in an organization.
client.organizations.update_member_role("my-org", "user_uid", role="admin")
Returns: None
client.organizations.leave()
Leave an organization (as the current user).
client.organizations.leave("my-org")
Returns: None
client.organizations.transfer_ownership()
Transfer organization ownership to another member.
client.organizations.transfer_ownership("my-org", new_owner_uid="new_owner_uid")
Returns: None
client.organizations.list_invitations()
List pending invitations for an organization.
invitations = client.organizations.list_invitations("my-org")
Returns: list[Invitation]
client.organizations.create_invitation()
Invite a user to an organization.
invitation = client.organizations.create_invitation("my-org", email="user@example.com", role="member")
| Parameter | Type | Required | Description |
|---|
slug | str | Yes | Organization slug. |
email | str | Yes | Email to invite. |
role | str | None | No | Role for the invitee. |
Returns: Invitation
client.organizations.resend_invitation()
Resend a pending invitation.
client.organizations.resend_invitation("my-org", "invitation_uid")
Returns: None
client.organizations.cancel_invitation()
Cancel a pending invitation.
client.organizations.cancel_invitation("my-org", "invitation_uid")
Returns: None
client.organizations.list_teams()
List teams in an organization.
teams = client.organizations.list_teams("my-org")
Returns: list[Team]
client.organizations.create_team()
Create a team in an organization.
team = client.organizations.create_team("my-org", name="ML Engineers", description="Machine learning team")
| Parameter | Type | Required | Description |
|---|
slug | str | Yes | Organization slug. |
name | str | Yes | Team name. |
description | str | None | No | Team description. |
color | str | None | No | Team color. |
Returns: Team
client.organizations.get_team()
Get a team by slug.
team = client.organizations.get_team("my-org", "team_slug")
Returns: Team
client.organizations.update_team()
Update a team.
team = client.organizations.update_team("my-org", "team_slug", name="Updated Name")
Returns: Team
client.organizations.delete_team()
Delete a team.
client.organizations.delete_team("my-org", "team_slug")
Returns: None
client.organizations.list_team_members()
List members of a team.
members = client.organizations.list_team_members("my-org", "team_slug")
Returns: list[TeamMember]
client.organizations.add_team_member()
Add a member to a team.
member = client.organizations.add_team_member("my-org", "team_slug", user_uid="user_uid", role="member")
Returns: TeamMember
client.organizations.remove_team_member()
Remove a member from a team.
client.organizations.remove_team_member("my-org", "team_slug", "user_uid")
Returns: None
client.organizations.update_team_member_role()
Update a team member’s role.
client.organizations.update_team_member_role("my-org", "team_slug", "user_uid", role="lead")
Returns: None
slices
client.slices.list()
List slices for an owner.
page = client.slices.list("my-org", limit=20)
| Parameter | Type | Default | Description |
|---|
owner | str | — | Owner slug (organization or user). |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[Slice]
client.slices.get()
Get a single slice.
s = client.slices.get("my-org", "my-slice")
| Parameter | Type | Description |
|---|
owner | str | Owner slug. |
slug | str | Slice slug. |
Returns: Slice
client.slices.create()
Create a new slice.
s = client.slices.create(
name="Validation Set",
visibility="private",
sub_slices=[{"dataset": "dataset_uid", "filters": {}}],
organization="my-org",
)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Slice name. |
visibility | str | Yes | "public" or "private". |
sub_slices | list[dict[str, Any]] | Yes | Sub-slice definitions (dataset + filters). |
organization | str | None | No | Organization slug to own the slice. |
random_selection_count | int | None | No | Randomly select N items. |
Returns: Slice
client.slices.list_items()
List items in a slice.
page = client.slices.list_items("my-org", "my-slice", limit=50)
| Parameter | Type | Default | Description |
|---|
owner | str | — | Owner slug. |
slug | str | — | Slice slug. |
limit | int | None | None | Maximum results per page. |
cursor | str | None | None | Pagination cursor. |
Returns: CursorPage[SliceItem]
client.slices.get_item()
Get a single item within a slice.
item = client.slices.get_item("my-org", "my-slice", "item_uid")
| Parameter | Type | Description |
|---|
owner | str | Owner slug. |
slug | str | Slice slug. |
item_uid | str | Item unique identifier. |
Returns: SliceItem
Types
Dataset
class Dataset(BaseModel):
uid: str
name: str
slug: str
item_count: int = 0
data_type: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
DatasetItem
class DatasetItem(BaseModel):
id: int | None = None
uid: str
key: str | None = None
dataset: str | None = None
url: str | None = None
gpu_texture_url: str | None = None
thumbnails: list[str] | None = None
video_thumbnail: str | None = None
metadata: dict[str, Any] | None = None
export_snippet: dict[str, Any] | None = None
annotations: dict[str, Any] | None = None
crop_data: dict[str, Any] | None = None
related_items: list[dict[str, Any]] | None = None
related_sequence_uid: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
DatasetSequence
class DatasetSequence(BaseModel):
uid: str
key: str | None = None
custom_uuid: str | None = None
status: str | None = None
featured_image: str | None = None
number_of_frames: int | None = None
views: list[dict[str, Any]] | None = None
crop_data: dict[str, Any] | None = None
predefined_labels: list[dict[str, Any]] | None = None
frames: list[dict[str, Any]] | None = None
metrics: dict[str, Any] | None = None
dataset_uid: str | None = None
allow_lidar_calibration: bool | None = None
lidar_calibration_enabled: bool | None = None
camera_calibration_enabled: bool | None = None
Project
class Project(BaseModel):
uid: str
name: str
status: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
Task
class Task(BaseModel):
uid: str
type: str | None = None
name: str | None = None
status: str | None = None
project: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
Export
class Export(BaseModel):
uid: str
status: str | None = None
download_url: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
StorageConfig
class StorageConfig(BaseModel):
uid: str
name: str
provider: str
s3_bucket_name: str | None = None
s3_bucket_region: str | None = None
s3_bucket_prefix: str | None = None
s3_is_accelerated: bool = False
gc_storage_bucket_name: str | None = None
gc_storage_prefix: str | None = None
is_verified: bool = False
last_verified_at: datetime | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
Agent
class Agent(BaseModel):
uid: str
name: str
description: str | None = None
events: list[str] = []
callback_url: str | None = None
is_active: bool = True
project: str | None = None
task_types: list[str] = []
secret: str | None = None
execution_stats: dict[str, int] | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
AgentExecution
class AgentExecution(BaseModel):
uid: str
registration: str
event_type: str
task: str | None = None
result: str | None = None
status: str | None = None
action: str | None = None
event_payload: dict[str, Any] | None = None
response_payload: dict[str, Any] | None = None
error_message: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
InferenceProvider
class InferenceProvider(BaseModel):
uid: str
name: str
description: str | None = None
provider_type: str | None = None
config: dict[str, Any] | None = None
is_active: bool = True
project: str | None = None
last_test_at: datetime | None = None
last_test_success: bool | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
AutoLabelJob
class AutoLabelJob(BaseModel):
uid: str
status: str | None = None
model_type: str | None = None
confidence_threshold: float | None = None
labels: list[str] = []
dry_run: bool = False
total_items: int = 0
processed_items: int = 0
successful_items: int = 0
failed_items: int = 0
skipped_items: int = 0
progress_pct: float | None = None
error_message: str | None = None
summary: dict[str, Any] | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
created_at: datetime | None = None
QualityTarget
class QualityTarget(BaseModel):
uid: str
name: str
metric: str
operator: str
threshold: float
severity: str | None = None
is_active: bool = True
notify_webhook: bool = True
notify_emails: list[str] = []
last_evaluated_at: datetime | None = None
last_value: float | None = None
is_breached: bool = False
breach_count: int = 0
last_breached_at: datetime | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
QualityTargetEvaluation
class QualityTargetEvaluation(BaseModel):
uid: str
name: str
metric: str
threshold: float
operator: str
current_value: float
is_breached: bool
severity: str | None = None
ConsensusConfig
class ConsensusConfig(BaseModel):
uid: str
iou_threshold: float = 0.5
min_agreement_ratio: float = 0.5
min_annotations: int = 2
created_at: datetime | None = None
updated_at: datetime | None = None
ConsensusScore
class ConsensusScore(BaseModel):
uid: str
dataset_item_uid: str
task_name: str
score_type: str | None = None
score: float
annotator_count: int
details: dict[str, Any] | None = None
created_at: datetime | None = None
ConsensusSummary
class ConsensusSummary(BaseModel):
mean_score: float
median_score: float
min_score: float
max_score: float
total_items: int
items_with_consensus: int
score_distribution: dict[str, Any] | None = None
by_task_name: list[Any] | None = None
ConsensusComputeResult
class ConsensusComputeResult(BaseModel):
status: str
message: str
Webhook
class Webhook(BaseModel):
uid: str
target_url: str
events: list[str] = []
is_active: bool = True
secret: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
WebhookDelivery
class WebhookDelivery(BaseModel):
uid: str
subscription: str
event_type: str
payload: dict[str, Any] | None = None
response_status: int | None = None
response_body: str | None = None
attempts: int = 0
next_retry_at: datetime | None = None
status: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
Organization
class Organization(BaseModel):
uid: str
name: str
slug: str
handle: str | None = None
description: str | None = None
logo: str | None = None
website: str | None = None
industry: str | None = None
email: str | None = None
phone: str | None = None
visibility: str | None = None
plan: str | None = None
is_verified: bool = False
is_active: bool = True
member_count: int | None = None
team_count: int | None = None
dataset_count: int | None = None
project_count: int | None = None
slice_count: int | None = None
role: str | None = None
joined_at: datetime | None = None
allowed_domains: list[str] | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
OrganizationMember
class OrganizationMember(BaseModel):
user_uid: str
username: str | None = None
email: str | None = None
full_name: str | None = None
picture: str | None = None
role: str | None = None
created_at: datetime | None = None
Invitation
class Invitation(BaseModel):
uid: str
organization_name: str | None = None
organization_slug: str | None = None
invited_email: str
invited_by_username: str | None = None
role: str | None = None
status: str | None = None
expires_at: datetime | None = None
is_expired: bool | None = None
accept_url: str | None = None
copy_link: str | None = None
created_at: datetime | None = None
Team
class Team(BaseModel):
uid: str
name: str
slug: str | None = None
description: str | None = None
color: str | None = None
organization_uid: str | None = None
organization_name: str | None = None
organization_slug: str | None = None
member_count: int | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
TeamMember
class TeamMember(BaseModel):
user_uid: str
username: str | None = None
email: str | None = None
full_name: str | None = None
picture: str | None = None
role: str | None = None
created_at: datetime | None = None
Slice
class Slice(BaseModel):
uid: str
name: str
slug: str | None = None
owner_name: str | None = None
organization: dict[str, Any] | None = None
visibility: str | None = None
status: str | None = None
item_count: int | None = None
sub_slices: list[dict[str, Any]] | None = None
source_data: Any | None = None
featured_slice_item_urls: list[str] | None = None
SliceItem
class SliceItem(BaseModel):
id: int | None = None
uid: str
key: str | None = None
dataset: str | None = None
url: str | None = None
gpu_texture_url: str | None = None
thumbnails: list[str] | None = None
video_thumbnail: str | None = None
metadata: dict[str, Any] | None = None
export_snippet: dict[str, Any] | None = None
annotations: dict[str, Any] | None = None
crop_data: dict[str, Any] | None = None
related_items: list[dict[str, Any]] | None = None
related_sequence_uid: str | None = None
created_at: datetime | None = None
updated_at: datetime | None = None
CursorPage
All list() methods return a CursorPage[T]:
@dataclass
class CursorPage(Generic[T]):
items: list[T]
next_cursor: str | None = None
previous_cursor: str | None = None
| Property | Type | Description |
|---|
items | list[T] | Items on the current page. |
next_cursor | str | None | Cursor for the next page. None if last page. |
previous_cursor | str | None | Cursor for the previous page. |
has_more | bool | True if next_cursor is not None. |
Supports iteration and len():
page = client.datasets.list(limit=20)
for dataset in page:
print(dataset.name)
print(f"{len(page)} datasets on this page")
if page.has_more:
next_page = client.datasets.list(limit=20, cursor=page.next_cursor)
Errors
All errors inherit from AvalaError:
from avala.errors import (
AvalaError,
AuthenticationError,
NotFoundError,
RateLimitError,
ValidationError,
ServerError,
)
| Error | HTTP Status | Attributes |
|---|
AvalaError | Any | message, status_code, body |
AuthenticationError | 401 | Inherits from AvalaError |
NotFoundError | 404 | Inherits from AvalaError |
RateLimitError | 429 | retry_after: float | None |
ValidationError | 400 / 422 | details: list[Any] |
ServerError | 5xx | Inherits from AvalaError |
try:
dataset = client.datasets.get("nonexistent")
except NotFoundError as e:
print(f"Not found: {e.message}")
except RateLimitError as e:
print(f"Retry after {e.retry_after}s")
except AvalaError as e:
print(f"Error ({e.status_code}): {e.message}")
Environment Variables
| Variable | Description | Default |
|---|
AVALA_API_KEY | API key (used when api_key not passed to client). | Required |
AVALA_BASE_URL | Base API URL override. | https://api.avala.ai/api/v1 |