> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cyberdesk.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Python SDK

> Complete guide for integrating Cyberdesk into your Python applications

## Quick Start

Get up and running with Cyberdesk in under 5 minutes. This guide assumes you've already created workflows in the [Cyberdesk Dashboard](https://cyberdesk.io/dashboard).

<Steps>
  <Step title="Install the SDK">
    <CodeGroup>
      ```bash pip theme={null}
      pip install cyberdesk
      ```

      ```bash poetry theme={null}
      poetry add cyberdesk
      ```

      ```bash pipenv theme={null}
      pipenv install cyberdesk
      ```
    </CodeGroup>
  </Step>

  <Step title="Initialize the client and create a run">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        import asyncio
        from cyberdesk import CyberdeskClient, RunCreate

        async def main():
            # Initialize the client
            client = CyberdeskClient('YOUR_API_KEY')
            
            # Create a run for your workflow
            run_data = RunCreate(
                workflow_id='your-workflow-id',
                machine_id='your-machine-id',
                input_values={
                    'patient_id': '12345',
                    'patient_first_name': 'John',
                    'patient_last_name': 'Doe'
                }
            )
            
            response = await client.runs.create(run_data)
            
            if response.error:
                print(f"Error creating run: {response.error}")
                return
                
            run = response.data
            
            # Wait for the run to complete
            while run.status in ['scheduling', 'running']:
                await asyncio.sleep(5)  # Wait 5 seconds
                response = await client.runs.get(run.id)
                run = response.data
            
            # Get the output data
            if run.status == 'success':
                print('Patient data:', run.output_data)
            else:
                print('Run failed:', ', '.join(run.error or []))

        # Run the async function
        asyncio.run(main())
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import CyberdeskClient, RunCreate
        import time

        # Initialize the client
        client = CyberdeskClient('YOUR_API_KEY')

        # Create a run for your workflow
        run_data = RunCreate(
            workflow_id='your-workflow-id',
            machine_id='your-machine-id',
            input_values={
                'patient_id': '12345',
                'patient_first_name': 'John',
                'patient_last_name': 'Doe'
            }
        )

        response = client.runs.create_sync(run_data)

        if response.error:
            print(f"Error creating run: {response.error}")
        else:
            run = response.data
            
            # Wait for the run to complete
            while run.status in ['scheduling', 'running']:
                time.sleep(5)  # Wait 5 seconds
                response = client.runs.get_sync(run.id)
                run = response.data
            
            # Get the output data
            if run.status == 'success':
                print('Patient data:', run.output_data)
            else:
                print('Run failed:', ', '.join(run.error or []))
        ```
      </Tab>
    </Tabs>
  </Step>
</Steps>

<Info>
  Create and manage workflows in the [Cyberdesk Dashboard](https://cyberdesk.io/dashboard). The dashboard supports rich, multimodal prompts — you can insert images alongside text to help the agent understand tricky UI elements. Use the SDK to execute runs against those workflows.
</Info>

## Installation & Setup

### Prerequisites

* Python 3.10 or higher
* pip, poetry, or pipenv for package management

### Installation

<Tabs>
  <Tab title="pip">
    ```bash theme={null}
    pip install cyberdesk
    ```
  </Tab>

  <Tab title="poetry">
    ```bash theme={null}
    poetry add cyberdesk
    ```
  </Tab>

  <Tab title="pipenv">
    ```bash theme={null}
    pipenv install cyberdesk
    ```
  </Tab>
</Tabs>

### Virtual Environment Setup

<Tip>
  Always use a virtual environment to avoid dependency conflicts:

  ```bash theme={null}
  # Create virtual environment
  python -m venv venv

  # Activate it
  # On macOS/Linux:
  source venv/bin/activate

  # On Windows:
  venv\Scripts\activate

  # Install the SDK
  pip install cyberdesk
  ```
</Tip>

## Authentication

### Creating a Client

```python theme={null}
from cyberdesk import CyberdeskClient

client = CyberdeskClient('YOUR_API_KEY')
```

### Custom Base URL

For self-hosted or enterprise deployments:

```python theme={null}
client = CyberdeskClient('YOUR_API_KEY', base_url='https://api.your-domain.com')
```

### Using Context Managers

The client supports standard context managers for proper resource cleanup:

```python theme={null}
with CyberdeskClient('YOUR_API_KEY') as client:
    response = client.runs.list_sync()
    # Client automatically cleans up when done
```

<Warning>
  Never hardcode API keys in your source code. Use environment variables:

  ```python theme={null}
  import os
  from cyberdesk import CyberdeskClient

  client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])
  ```
</Warning>

## Automatic Retries

The SDK automatically retries failed requests with exponential backoff, following industry best practices similar to the Stripe SDK. This handles transient network issues and server errors gracefully without requiring any code changes.

### Default Behavior

* **Retry count**: 3 retries (4 total attempts)
* **Retryable errors**: Network failures, connection timeouts, DNS errors
* **Retryable status codes**: 408, 429, 500, 502, 503, 504, and `409` only when `Idempotency-Status: in_progress`
* **Non-retryable**: Most 4xx client errors
* **Backoff strategy**: Exponential backoff with full jitter (250ms initial, 8s max)
* **Retry-After header**: Respected when present

### Configuring Retries

```python theme={null}
from cyberdesk import CyberdeskClient, RetryConfig
import httpx

# Default: 3 retries (4 total attempts)
client = CyberdeskClient('YOUR_API_KEY')

# More retries for unreliable networks
client = CyberdeskClient('YOUR_API_KEY', retry=RetryConfig(max_retries=5))

# Disable retries entirely
client = CyberdeskClient('YOUR_API_KEY', retry=RetryConfig(max_retries=0))

# With custom timeout configuration
client = CyberdeskClient(
    'YOUR_API_KEY',
    retry=RetryConfig(max_retries=3),
    timeout=httpx.Timeout(60.0, connect=10.0)
)
```

<Info>
  Retries only occur for network-level failures and specific HTTP status codes that indicate temporary server issues. Most client errors (4xx) are not retried because they indicate a problem with the request itself.
</Info>

## Idempotency

For write requests (POST, PUT, PATCH, DELETE), the SDK automatically adds an `Idempotency-Key` header. If a completed request is retried with the same key, the API replays the stored response. If the original request is still in progress, the API returns `409` with `Retry-After`. `5xx` responses are not replay-cached, so a later retry with the same key may execute again.

### Default Behavior

* **Enabled by default**: All write requests automatically include an idempotency key
* **Key generation**: Uses `uuid.uuid4()` to generate unique keys
* **Server-side handling**: Completed responses are replayed for duplicate keys; in-progress duplicates return `409`

### Configuring Idempotency

```python theme={null}
from cyberdesk import CyberdeskClient
import uuid

# Default: idempotency enabled with auto-generated keys
client = CyberdeskClient('YOUR_API_KEY')

# Disable idempotency (not recommended for production)
client = CyberdeskClient('YOUR_API_KEY', idempotency_enabled=False)

# Custom key generator (e.g., for deterministic keys)
client = CyberdeskClient(
    'YOUR_API_KEY',
    idempotency_key_generator=lambda: f"order-{order_id}-{uuid.uuid4()}"
)
```

<Tip>
  You generally don't need to configure idempotency — the defaults work well. The SDK handles everything automatically, making completed write retries safe without extra plumbing in your app.
</Tip>

## Sync vs Async

The Cyberdesk Python SDK provides both synchronous and asynchronous methods for all operations.

### When to Use Async

Use async methods when:

* Building web applications with async frameworks (FastAPI, aiohttp)
* Making multiple concurrent API calls
* Integrating with other async libraries
* Building high-performance applications

### When to Use Sync

Use sync methods when:

* Writing simple scripts
* Working in Jupyter notebooks
* Integrating with sync-only frameworks (Django, Flask)
* Learning or prototyping

### Method Naming Convention

* Async methods: `client.resource.method()`
* Sync methods: `client.resource.method_sync()`

```python theme={null}
# Async
response = await client.runs.create(data)

# Sync
response = client.runs.create_sync(data)
```

## Working with Runs

Runs are the primary way to execute workflows in Cyberdesk. Here's everything you need to know about managing runs through the SDK.

### Creating a Run

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import RunCreate

    async def create_patient_data_run():
        run_data = RunCreate(
            workflow_id='workflow-uuid',
            machine_id='machine-uuid',
            input_values={
                'patient_id': '12345',
                'patient_first_name': 'John',
                'patient_last_name': 'Doe',
                'insurance': {'provider': 'Blue Cross', 'policy_number': 'BC123'}
            }
        )
        
        response = await client.runs.create(run_data)
        
        if response.error:
            print(f"Failed to create run: {response.error}")
        else:
            print(f"Run created: {response.data.id}")
            return response.data
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import RunCreate

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        machine_id='machine-uuid',
        input_values={
            'patient_id': '12345',
            'patient_first_name': 'John',
            'patient_last_name': 'Doe',
            'insurance': {'provider': 'Blue Cross', 'policy_number': 'BC123'}
        }
    )

    response = client.runs.create_sync(run_data)

    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created: {response.data.id}")
    ```
  </Tab>
</Tabs>

<Tip>
  Pass nested objects and access them in prompts with dot notation like `{insurance.provider}`. See [Structured Inputs](/concepts/structured-inputs).
</Tip>

### Creating a Run with Sensitive Input Values

If your workflow prompt references sensitive variables using the `{$variable}` syntax (for example, `{$password}`), pass those values via `sensitive_input_values`.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import RunCreate

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        machine_id='machine-uuid',
        input_values={
            # non-sensitive inputs
            'patient_id': '12345'
        },
        sensitive_input_values={
            # referenced in your prompt as {$password}
            'password': 's3cr3tP@ss'
        }
    )

    response = await client.runs.create(run_data)
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import RunCreate

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        machine_id='machine-uuid',
        input_values={
            'patient_id': '12345'
        },
        sensitive_input_values={
            'password': 's3cr3tP@ss'
        }
    )

    response = client.runs.create_sync(run_data)
    ```
  </Tab>
</Tabs>

<Info>
  Sensitive inputs are stored in a secure third‑party secret vault (Basis Theory) only for the duration of the run. They are not logged in Cyberdesk, and they are not sent to any LLMs. The values are only resolved at the last moment during actual computer actions (e.g., when typing). After the run completes, these sensitive values are deleted from the vault. On the dashboard, sensitive inputs are never displayed and will not be prefilled when repeating a run.
</Info>

### Creating a Run with Machine Pools

You can specify pool requirements when creating a run. This ensures your run is executed on a machine that belongs to ALL specified pools. This is especially useful for:

* Running workflows on customer-specific machines
* Requiring machines with specific software installed
* Organizing machines by location or capability

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import RunCreate

    async def create_run_with_pools():
        # Get pool IDs (typically from your configuration or database)
        customer_pool_id = 'pool-uuid-1'  # e.g., "Customer A" pool
        excel_pool_id = 'pool-uuid-2'     # e.g., "Has Excel" pool
        
        run_data = RunCreate(
            workflow_id='workflow-uuid',
            # Machine must be in BOTH pools (intersection, not union)
            pool_ids=[customer_pool_id, excel_pool_id],
            input_values={
                'patient_id': '12345',
                'patient_first_name': 'John',
                'patient_last_name': 'Doe'
            }
        )
        
        response = await client.runs.create(run_data)
        
        if response.error:
            print(f"Failed to create run: {response.error}")
        else:
            print(f"Run created: {response.data.id}")
            print(f"Will execute on machine in pools: {run_data.pool_ids}")
            return response.data
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import RunCreate

    # Get pool IDs (typically from your configuration or database)
    customer_pool_id = 'pool-uuid-1'  # e.g., "Customer A" pool
    excel_pool_id = 'pool-uuid-2'     # e.g., "Has Excel" pool

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        # Machine must be in BOTH pools (intersection, not union)
        pool_ids=[customer_pool_id, excel_pool_id],
        input_values={
            'patient_id': '12345',
            'patient_first_name': 'John',
            'patient_last_name': 'Doe'
        }
    )

    response = client.runs.create_sync(run_data)

    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created: {response.data.id}")
        print(f"Will execute on machine in pools: {run_data.pool_ids}")
    ```
  </Tab>
</Tabs>

<Note>
  **Pool Matching Logic:** When you specify multiple pools, Cyberdesk will only select machines that belong to **ALL** specified pools (intersection). For example, if you specify `["Customer A", "Has Excel"]`, only machines that are in both pools will be considered.
</Note>

<Note>
  If you provide a `machine_id` when creating a run, `pool_ids` are ignored. Cyberdesk will only attempt the specified machine; if it's busy or unavailable, the run will wait until that machine is free (no fallback to other machines or pools).
</Note>

<Tip>
  **Creating and Managing Pools:** While you can manage pools via the SDK, we recommend using the [Cyberdesk Dashboard](https://cyberdesk.io/dashboard) for a more intuitive experience:

  1. Navigate to any machine in the dashboard
  2. Click on the machine to view its details
  3. Add the machine to existing pools or create new pools
  4. Assign multiple pools to organize machines by customer, capability, or location

  Common pool strategies:

  * **By Customer**: "Customer A", "Customer B", etc.
  * **By Software**: "Has Excel", "Has Chrome", "Has Epic EHR"
  * **By Environment**: "Production", "Staging", "Development"
  * **By Location**: "US-East", "EU-West", etc.
</Tip>

### Creating a Run with File Inputs

You can attach files to a run at creation. This is useful for workflows that need to process or manipulate files on the remote machine.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    import base64
    from cyberdesk import RunCreate, FileInput

    async def create_run_with_file():
        # Read and encode a file
        with open("path/to/your/file.txt", "rb") as f:
            content = base64.b64encode(f.read()).decode("utf-8")

        run_data = RunCreate(
            workflow_id='workflow-uuid',
            file_inputs=[
                FileInput(
                    filename="file.txt",
                    content=content,
                    target_path="C:/Users/Default/Desktop/file.txt", # Optional
                    cleanup_imports_after_run=True # Optional
                )
            ]
        )
        
        response = await client.runs.create(run_data)
        
        if response.error:
            print(f"Failed to create run: {response.error}")
        else:
            print(f"Run created with file attachment: {response.data.id}")
            return response.data
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    import base64
    from cyberdesk import RunCreate, FileInput

    # Read and encode a file
    with open("path/to/your/file.txt", "rb") as f:
        content = base64.b64encode(f.read()).decode("utf-8")

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        file_inputs=[
            FileInput(
                filename="file.txt",
                content=content,
                target_path="C:/Users/Default/Desktop/file.txt", # Optional
                cleanup_imports_after_run=True # Optional
            )
        ]
    )

    response = client.runs.create_sync(run_data)

    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created with file attachment: {response.data.id}")
    ```
  </Tab>
</Tabs>

<ResponseField name="filename" type="string" required>
  The name of the file, including its extension.
</ResponseField>

<ResponseField name="content" type="string" required>
  The base64-encoded content of the file.
</ResponseField>

<ResponseField name="target_path" type="string">
  The absolute path on the remote machine where the file should be saved. If not provided, it defaults to `~/CyberdeskTransfers/`.
</ResponseField>

<ResponseField name="cleanup_imports_after_run" type="boolean">
  If `True`, the file will be deleted from the remote machine after the run completes (whether it succeeds or fails). Defaults to `False`.
</ResponseField>

### Listing Runs

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    # List all runs
    response = await client.runs.list()
    runs = response.data.items

    # List with pagination
    response = await client.runs.list(skip=0, limit=20)

    # Filter by status
    from cyberdesk import RunStatus
    response = await client.runs.list(status=RunStatus.SUCCESS)

    # Filter by workflow
    response = await client.runs.list(workflow_id='workflow-uuid')

    # Multiple filters
    response = await client.runs.list(
        workflow_id='workflow-uuid',
        status=RunStatus.RUNNING,
        limit=10
    )
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    # List all runs
    response = client.runs.list_sync()
    runs = response.data.items

    # List with pagination
    response = client.runs.list_sync(skip=0, limit=20)

    # Filter by status
    from cyberdesk import RunStatus
    response = client.runs.list_sync(status=RunStatus.SUCCESS)

    # Filter by workflow
    response = client.runs.list_sync(workflow_id='workflow-uuid')
    ```
  </Tab>
</Tabs>

#### Faster lists with fields projection

<Note>
  Use the optional `fields` parameter to return only selected fields per run. This reduces payload size and speeds up listing, especially when you do not need heavy JSON fields like `run_message_history`.
</Note>

<Info>
  Need to fetch screenshots referenced in `run_message_history`? See [Run Screenshots](/concepts/run-screenshots) for signed URL examples.
</Info>

* Base fields always included: `id`, `workflow_id`, `machine_id`, `status`, `created_at`.
* Add more by passing `fields=[...]`.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk.client import RunField

    # Minimal (base fields only)
    res = await client.runs.list()
    for item in res.data.items:
        print(item.id, item.status)

    # Include inputs only (still avoids run_message_history)
    res = await client.runs.list(fields=[RunField.INPUT_VALUES])

    # Include a couple of specific fields
    res = await client.runs.list(fields=[RunField.INPUT_VALUES, RunField.SESSION_ID])

    # Include attachments but skip history for speed
    res = await client.runs.list(fields=[RunField.INPUT_ATTACHMENT_IDS, RunField.OUTPUT_ATTACHMENT_IDS])
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk.client import RunField

    # Minimal (base fields only)
    res = client.runs.list_sync()

    # Include inputs only
    res = client.runs.list_sync(fields=[RunField.INPUT_VALUES])

    # Include attachments (skipping run_message_history)
    res = client.runs.list_sync(fields=[RunField.INPUT_ATTACHMENT_IDS, RunField.OUTPUT_ATTACHMENT_IDS])
    ```
  </Tab>
</Tabs>

<Info>
  If you need full records (including `run_message_history`), call `list()` without `fields`.
</Info>

### Getting a Specific Run

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    response = await client.runs.get('run-uuid')

    if response.data:
        run = response.data
        print(f"Run status: {run.status}")
        print(f"Output data: {run.output_data}")
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    response = client.runs.get_sync('run-uuid')

    if response.data:
        run = response.data
        print(f"Run status: {run.status}")
        print(f"Output data: {run.output_data}")
    ```
  </Tab>
</Tabs>

### Updating a Run

<Note>
  Run updates are typically handled automatically by the Cyberdesk system. Manual updates are rarely needed.
</Note>

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import RunUpdate, RunStatus

    update_data = RunUpdate(status=RunStatus.CANCELLED)
    response = await client.runs.update('run-uuid', update_data)
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import RunUpdate, RunStatus

    update_data = RunUpdate(status=RunStatus.CANCELLED)
    response = client.runs.update_sync('run-uuid', update_data)
    ```
  </Tab>
</Tabs>

### Retrying a Run (same run\_id)

Use retry when you want to re-run the exact same run id, clearing outputs and optionally providing fresh inputs/files.
**Sensitive values:** always re-send `sensitive_input_values` on retry; secrets are deleted after each run.
**File inputs:** if you set `cleanup_imports_after_run=True`, files are deleted from the remote machine after the run; include `file_inputs` again if you need a fresh copy or when no input attachments exist (providing `file_inputs` replaces prior input attachments).
**Regular inputs:** only send `input_values` if you want to change them; otherwise the previous ones are reused.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    import os
    from cyberdesk import FileInput
    from cyberdesk.client import RunRetry

    retry_data = RunRetry(
        input_values={'query': 'new query'},           # optional
        sensitive_input_values={'password': os.environ['APP_PASSWORD']},
        file_inputs=[                                   # providing file_inputs replaces prior input attachments
            FileInput(filename='input.pdf', content=base64_pdf)
        ],
        reuse_session=True,                            # default: keep existing session
        # session_id='existing-session-uuid',
        # release_session_after=True,
        # machine_id='specific-machine-uuid',
        # pool_ids=['pool-a', 'pool-b'],               # used only when no machine_id is set
    )

    response = await client.runs.retry('run-uuid', retry_data)
    if response.error:
        print('Failed to retry run:', response.error)
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import FileInput
    from cyberdesk.client import RunRetry

    retry_data = RunRetry(
        input_values={'query': 'new query'},
        file_inputs=[FileInput(filename='input.pdf', content=base64_pdf)],
        reuse_session=True,
    )

    response = client.runs.retry_sync('run-uuid', retry_data)
    if response.error:
        print('Failed to retry run:', response.error)
    ```
  </Tab>
</Tabs>

<Info>
  Behavior:

  * Retry is allowed only for terminal runs: <code>success</code>, <code>task\_failed</code>, <code>error</code>, or <code>cancelled</code>.
  * Outputs, history, and output attachments are always cleared.
  * Prior input attachments are kept unless you provide <code>file\_inputs</code> (then they are replaced).
  * If you provide <code>sensitive\_input\_values</code>, new secrets are created; otherwise sensitive aliases are cleared.
  * When a <code>session\_id</code> is present and the session is busy, immediate assignment is skipped and the retried run queues.
  * Avoid sending both <code>machine\_id</code> and <code>pool\_ids</code> on retry; if both are present, <code>pool\_ids</code> take precedence.
</Info>

### Deleting a Run

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    response = await client.runs.delete('run-uuid')

    if not response.error:
        print("Run deleted successfully")
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    response = client.runs.delete_sync('run-uuid')

    if not response.error:
        print("Run deleted successfully")
    ```
  </Tab>
</Tabs>

### Polling for Run Completion

Here's a robust pattern for waiting for runs to complete:

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    import asyncio
    from datetime import datetime, timedelta

    async def wait_for_run_completion(client, run_id, timeout_seconds=300):
        """Wait for a run to complete with timeout."""
        start_time = datetime.now()
        timeout = timedelta(seconds=timeout_seconds)
        
        while datetime.now() - start_time < timeout:
            response = await client.runs.get(run_id)
            
            if response.error:
                raise Exception(f"Failed to get run status: {response.error}")
            
            run = response.data
            
            if run.status == 'success':
                return run
            
            if run.status in ['error', 'task_failed', 'cancelled']:
                raise Exception(f"Run {run.status}: {', '.join(run.error or ['Unknown error'])}")
            
            await asyncio.sleep(5)  # Poll every 5 seconds
        
        raise TimeoutError(f"Run timed out after {timeout_seconds} seconds")

    # Usage
    try:
        completed_run = await wait_for_run_completion(client, run.id)
        print("Output:", completed_run.output_data)
    except Exception as e:
        print(f"Run failed: {e}")
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    import time
    from datetime import datetime, timedelta

    def wait_for_run_completion_sync(client, run_id, timeout_seconds=300):
        """Wait for a run to complete with timeout."""
        start_time = datetime.now()
        timeout = timedelta(seconds=timeout_seconds)
        
        while datetime.now() - start_time < timeout:
            response = client.runs.get_sync(run_id)
            
            if response.error:
                raise Exception(f"Failed to get run status: {response.error}")
            
            run = response.data
            
            if run.status == 'success':
                return run
            
            if run.status in ['error', 'task_failed', 'cancelled']:
                raise Exception(f"Run {run.status}: {', '.join(run.error or ['Unknown error'])}")
            
            time.sleep(5)  # Poll every 5 seconds
        
        raise TimeoutError(f"Run timed out after {timeout_seconds} seconds")

    # Usage
    try:
        completed_run = wait_for_run_completion_sync(client, run.id)
        print("Output:", completed_run.output_data)
    except Exception as e:
        print(f"Run failed: {e}")
    ```
  </Tab>
</Tabs>

## Working with File Attachments

Manage files associated with your runs, such as input files uploaded at creation or output files generated by a workflow.

### Listing Run Attachments

You can list all attachments for a specific run and filter them by type (`INPUT` or `OUTPUT`).

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import AttachmentType

    # List all attachments for a run
    response = await client.run_attachments.list(run_id='run-uuid')
    attachments = response.data.items

    # List only output attachments
    response = await client.run_attachments.list(
        run_id='run-uuid',
        attachment_type=AttachmentType.OUTPUT
    )
    output_files = response.data.items
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import AttachmentType

    # List all attachments for a run
    response = client.run_attachments.list_sync(run_id='run-uuid')
    attachments = response.data.items

    # List only input attachments
    response = client.run_attachments.list_sync(
        run_id='run-uuid',
        attachment_type=AttachmentType.INPUT
    )
    input_files = response.data.items
    ```
  </Tab>
</Tabs>

### Downloading an Attachment

There are multiple ways to download attachments depending on your use case:

#### Method 1: Get a Download URL

Get a signed URL that triggers automatic download when accessed. This is perfect for web applications where you want to provide download links to users.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    # Get a download URL with custom expiration (default: 5 minutes)
    response = await client.run_attachments.get_download_url(
        'attachment-uuid',
        expires_in=600  # 10 minutes
    )

    if response.data:
        print(f"Download URL: {response.data.url}")
        print(f"Expires in: {response.data.expires_in} seconds")
        
        # You can use this URL in your web app or share it
        # The URL will trigger automatic download when accessed
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    # Get a download URL with custom expiration
    response = client.run_attachments.get_download_url_sync(
        'attachment-uuid',
        expires_in=300  # 5 minutes (default)
    )

    if response.data:
        print(f"Download URL: {response.data.url}")
        print(f"Expires in: {response.data.expires_in} seconds")
    ```
  </Tab>
</Tabs>

#### Method 2: Download Raw File Content

Download the file content directly as bytes. Useful when you need to process the file in memory.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    # Get the attachment metadata first
    response = await client.run_attachments.get('attachment-uuid')
    attachment_info = response.data

    # Download the file content
    response = await client.run_attachments.download(attachment_info.id)

    if not response.error:
        # Save the file
        with open(attachment_info.filename, "wb") as f:
            f.write(response.data)
        print(f"Downloaded {attachment_info.filename}")
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    # Get the attachment metadata first
    response = client.run_attachments.get_sync('attachment-uuid')
    attachment_info = response.data

    # Download the file content
    response = client.run_attachments.download_sync(attachment_info.id)

    if not response.error:
        # Save the file
        with open(attachment_info.filename, "wb") as f:
            f.write(response.data)
        print(f"Downloaded {attachment_info.filename}")
    ```
  </Tab>
</Tabs>

#### Method 3: Save to File (Convenience Method)

The SDK provides a convenience method that downloads and saves the file in one operation.

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    # Save directly to a file
    response = await client.run_attachments.save_to_file(
        'attachment-uuid',
        output_path='./downloads/'  # Will use original filename
    )

    if response.data:
        print(f"Saved to: {response.data['path']}")
        print(f"File size: {response.data['size']} bytes")

    # Or specify a custom filename
    response = await client.run_attachments.save_to_file(
        'attachment-uuid',
        output_path='./downloads/custom-name.pdf'
    )
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    # Save directly to a file
    response = client.run_attachments.save_to_file_sync(
        'attachment-uuid',
        output_path='./downloads/'  # Will use original filename
    )

    if response.data:
        print(f"Saved to: {response.data['path']}")
        print(f"File size: {response.data['size']} bytes")
    ```
  </Tab>
</Tabs>

### Example: Upload, Process, and Download

Here's a full example of a workflow that processes a file.

1. **Workflow Prompt**: `"Take the file at ~/CyberdeskTransfers/report.txt, add a summary to the end of it, and mark it for export."`
2. **Workflow Setting**: `includes_file_exports` is set to `True`.

```python theme={null}
import asyncio
import base64
from cyberdesk import CyberdeskClient, RunCreate, FileInput, AttachmentType

async def main():
    with CyberdeskClient("YOUR_API_KEY") as client:
        # 1. Prepare and upload the input file
        report_content = "This is the initial report content."
        encoded_content = base64.b64encode(report_content.encode()).decode()

        run_data = RunCreate(
            workflow_id="your-file-processing-workflow-id",
            file_inputs=[
                FileInput(filename="report.txt", content=encoded_content)
            ]
        )
        response = await client.runs.create(run_data)
        run = response.data
        print(f"Run started: {run.id}")

        # 2. Wait for the run to complete
        completed_run = await wait_for_run_completion(client, run.id)
        print("Run finished with status:", completed_run.status)

        # 3. Find and download the output attachment
        if completed_run.status == 'success':
            response = await client.run_attachments.list(
                run_id=completed_run.id,
                attachment_type=AttachmentType.OUTPUT
            )
            output_attachments = response.data.items
            
            if output_attachments:
                processed_report = output_attachments[0]
                
                # Option 1: Get a download URL (for web apps)
                url_response = await client.run_attachments.get_download_url(processed_report.id)
                if url_response.data:
                    print(f"Download URL: {url_response.data.url}")
                    print(f"Valid for: {url_response.data.expires_in} seconds")
                
                # Option 2: Download the processed file directly
                response = await client.run_attachments.download(processed_report.id)
                
                if not response.error:
                    # Decode and print the content
                    processed_content = response.data.decode()
                    print("\n--- Processed Report ---")
                    print(processed_content)
                    print("------------------------")
                else:
                    print(f"Failed to download processed file: {response.error}")
            else:
                print("No output files were generated.")

# Assuming wait_for_run_completion is defined as in the previous examples
asyncio.run(main())
```

This example demonstrates the complete lifecycle: uploading a file with a run, executing a workflow that modifies it, and then retrieving the processed file from the run's output attachments.

## Sessions and Chained Runs

At its core, a session is a reservation of a single machine. While a session is active, that machine is dedicated to your session only — no unrelated runs will be scheduled onto it. This guarantees your multi‑step automations run back‑to‑back on the same desktop without interference.

What you get from a session:

* Exclusive access to one machine for the session's duration (strong scheduling guarantee)
* Deterministic "step 1 → step 2 → …" behavior with no opportunistic interleaving

**Chains** are a convenient way to create multiple runs that execute back‑to‑back in the same session. Instead of manually creating individual runs and managing their sequencing, you can define all your workflow steps upfront and let Cyberdesk handle the session management and execution order.

### Passing data between steps with refs

Once you have multiple workflows running in the same session, you'll often want to pass outputs from earlier steps as inputs to later ones. Refs make this seamless — simply reference a previous step's output using a JSON object:

```python theme={null}
{"$ref": "step1.outputs.result"}
```

You can construct these as plain Python dicts when building chain steps.

### Start a new session and run a chain (best when you know the whole sequence)

```python theme={null}
import os
from cyberdesk import CyberdeskClient
from cyberdesk.client import WorkflowChainCreate

client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])

chain = WorkflowChainCreate.from_dict({
    "shared_inputs": {
        "search_query": "red panda facts"
    },
    "shared_sensitive_inputs": {
        "api_key": "shared-secret-key"
    },
    # Filter machines by pools; or use machine_id to target one machine
    "pool_ids": ["pool-with-chrome", "customer-a"],
    "keep_session_after_completion": False,
    "steps": [
        {
            "workflow_id": "step-1-workflow-id",
            "session_alias": "step1",
            "inputs": {
                "topic": "red panda"
            },
            # Step-specific sensitive inputs that override or extend shared_sensitive_inputs
            "sensitive_inputs": {
                "username": "user1",
                "password": "secret123"
            }
        },
        {
            "workflow_id": "step-2-workflow-id",
            "session_alias": "step2",
            "inputs": {
                # Use output of step1 as an input to step2
                "search_query": {"$ref": "step1.outputs.result"}
            },
            # Step-specific sensitive inputs that override or extend shared_sensitive_inputs
            "sensitive_inputs": {
                "security_token": "step2-token"
            }
        }
    ]
})

resp = client.runs.chain_sync(chain)
print("session:", resp.data.session_id)
print("run_ids:", resp.data.run_ids)
```

Notes:

* Provide `machine_id` to target a specific machine, or `pool_ids` to match any machine in **all** specified pools (intersection).
* The chain runs on one reserved session. If you omit `session_id`, the API creates one and reserves a machine before step 1.
* `shared_inputs` are merged into each step, and step-level `inputs` override shared values when the same key appears in both places.
* **`shared_sensitive_inputs`** are available to all steps in the chain.
* **`sensitive_inputs`** in individual steps provide step-specific sensitive values that override or extend the shared ones. If the same key exists in both, the step-specific value takes precedence.
* `shared_file_inputs` (if provided) are attached to the first run in the chain.

### Join an existing session

```python theme={null}
from cyberdesk.client import WorkflowChainCreate

chain = WorkflowChainCreate.from_dict({
    "session_id": "existing-session-uuid",
    "steps": [
        {"workflow_id": "wf-a", "session_alias": "warmup"},
        {"workflow_id": "wf-b", "session_alias": "extract", "inputs": {"query": "current patient"}},
    ]
})
client.runs.chain_sync(chain)
```

For chains, provide either `session_id` or `machine_id`/`pool_ids`, not both.

### Keep the session alive after the chain

```python theme={null}
from cyberdesk.client import WorkflowChainCreate

client.runs.chain_sync(WorkflowChainCreate.from_dict({
    "pool_ids": ["customer-a"],
    "keep_session_after_completion": True,
    "steps": [ ... ]
}))
```

Later you can start another chain with `session_id` to continue work on the same machine.

### Ad‑hoc sessions without a chain (start with a single run, then add more)

You can start a session with a normal run and then submit additional runs referencing the same `session_id` — useful when downstream steps are conditional or discovered at runtime.

```python theme={null}
# 1) Start a session and warm up the desktop
warmup = client.runs.create_sync(RunCreate(
    workflow_id='login-workflow-id',
    pool_ids=['customer-a'],
    start_session=True,
    input_values={'username': 'alice'}
)).data

session_id = warmup.session_id

# 2) Add another run in the same session — scheduling remains exclusive
client.runs.create_sync(RunCreate(
    workflow_id='search-workflow-id',
    session_id=session_id,
    input_values={'query': 'recent orders'}
))

# 3) Final run that releases the session when complete
client.runs.create_sync(RunCreate(
    workflow_id='cleanup-workflow-id',
    session_id=session_id,
    release_session_after=True,  # Release the session after this run completes
    input_values={'cleanup': 'true'}
))
```

### Automatic session release with release\_session\_after

When creating individual runs in a session (not using chains), you can use `release_session_after=True` to automatically release the session when that run completes (regardless of success or failure):

```python theme={null}
# This run will release the session after it completes
final_run = client.runs.create_sync(RunCreate(
    workflow_id='final-workflow-id',
    session_id=existing_session_id,
    release_session_after=True,
    input_values={'finalize': 'true'}
))
```

This is useful mainly as a convenience, so you don't have to decouple creating a session ending run and actually ending the session.

Note: The session is released when the run completes, whether it succeeds, fails, or is cancelled. This ensures the session doesn't remain locked if something goes wrong.

### Detecting session completion via webhooks

The `release_session_after` field on a run indicates whether this run released the session. You can use this in your webhook handler to detect when all runs in a session are complete:

```python theme={null}
# In your webhook handler for "run_complete" events
if event["run"].get("release_session_after") is True:
    # This run released the session - all runs in this session are done
    print(f"Session {event['run']['session_id']} was released by run {event['run']['id']}")
    print(f"Final status: {event['run']['status']}")  # 'success', 'task_failed', 'error', or 'cancelled'
```

This field is automatically set to `True` when:

* You explicitly set `release_session_after=True` on a run
* A chain completes with `keep_session_after_completion=False` (the last run gets this flag)
* A run errors, task-fails, or is cancelled and causes the session to be released

See [Detecting session completion via webhooks](/concepts/sessions-and-chains#detecting-session-completion-via-webhooks) for more details.

### Polling chain runs

The chain API returns run\_ids in creation order; you can poll them individually, or [receive a webhook when any of those runs complete](/webhooks/quickstart)

```python theme={null}
chain = client.runs.chain_sync(...).data
for run_id in chain.run_ids:
    completed = wait_for_run_completion_sync(client, run_id, 600)
    print(completed.status, completed.output_data)
```

### Real‑world cases that require sessions

* **EHR workflows**: Log into Epic, navigate to a specific patient, extract their data, then upload documents to their chart — all with no interruptions from other miscellaneous runs.
* **Financial reporting**: Export monthly reports from your ERP system, transform the data in Excel, then re‑import the processed results — all back‑to‑back without interference.
* **Document processing**: Download files from a web portal, process them with a local application, then upload the results back — ensuring no other runs interfere with your workflow.

### Bulk Creating Runs with Pools

When creating multiple runs in bulk, you can also specify pool requirements. All runs will be distributed across machines that match the pool criteria.

<Note>
  If you provide a `machine_id` in a bulk run request, `pool_ids` are ignored for those runs. Each run will only target the specified machine; if it is busy, the run will wait for that machine rather than falling back to other machines or pools.
</Note>

<Tabs>
  <Tab title="Async">
    ```python theme={null}
    from cyberdesk import RunBulkCreate

    async def bulk_create_with_pools():
        # Create 100 runs that require machines in specific pools
        bulk_data = RunBulkCreate(
            workflow_id='workflow-uuid',
            count=100,
            pool_ids=['customer-a-pool-id', 'excel-pool-id'],
            input_values={
                'task_type': 'data_extraction',
                'priority': 'high'
            }
        )
        
        response = await client.runs.bulk_create(bulk_data)
        
        if response.data:
            print(f"Created {len(response.data.created_runs)} runs")
            print(f"Failed: {response.data.failed_count}")
            # Each run will execute on machines that match all specified pools when available
    ```
  </Tab>

  <Tab title="Sync">
    ```python theme={null}
    from cyberdesk import RunBulkCreate

    bulk_data = RunBulkCreate(
        workflow_id='workflow-uuid',
        count=100,
        pool_ids=['customer-a-pool-id', 'excel-pool-id'],
        input_values={
            'task_type': 'data_extraction',
            'priority': 'high'
        }
    )

    response = client.runs.bulk_create_sync(bulk_data)

    if response.data:
        print(f"Created {len(response.data.created_runs)} runs")
        print(f"Failed: {response.data.failed_count}")
        # Each run will execute on machines that match all specified pools when available
    ```
  </Tab>
</Tabs>

<Info>
  **Bulk Run Assignment:** When bulk creating runs with pool requirements, Cyberdesk attempts to assign each run to any available machine that meets all specified pools. If no matching machine is available, runs remain in scheduling until one is free. No specific load balancing guarantees are made.
</Info>

## Real-World Example: Healthcare Integration

Here's a complete example of retrieving patient data from an Epic EHR system using Cyberdesk:

<Tabs>
  <Tab title="FastAPI Integration">
    ```python theme={null}
    from fastapi import FastAPI, HTTPException, Body
    from cyberdesk import CyberdeskClient, RunCreate
    import os

    app = FastAPI()
    client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])

    @app.post("/patients/lookup")
    async def get_patient_data(
        patient_id: str = Body(...),
        patient_first_name: str = Body(...),
        patient_last_name: str = Body(...)
    ):
        """Retrieve patient data from Epic EHR."""
        
        try:
            # Create a run to fetch patient data
            run_data = RunCreate(
                workflow_id='550e8400-e29b-41d4-a716-446655440000',  # Your Epic workflow ID
                machine_id='550e8400-e29b-41d4-a716-446655440001',   # Your Epic machine ID
                input_values={
                    'patient_id': patient_id,
                    'patient_first_name': patient_first_name,
                    'patient_last_name': patient_last_name
                }
            )
            
            response = await client.runs.create(run_data)
            if response.error:
                raise HTTPException(status_code=500, detail=f"Failed to create run: {response.error}")
            
            run = response.data
            print(f"Fetching data for patient {patient_first_name} {patient_last_name} ({patient_id})...")
            
            # Wait for completion (2 minute timeout)
            completed_run = await wait_for_run_completion(client, run.id, 120)
            
            # Process the patient data
            patient_data = completed_run.output_data
            
            return {
                'patientId': patient_id,
                'demographics': patient_data['demographics'],
                'medications': patient_data['medications'],
                'vitals': patient_data['recentVitals'],
                'lastUpdated': patient_data['lastUpdated']
            }
            
        except TimeoutError:
            raise HTTPException(status_code=504, detail="Request timed out")
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    ```
  </Tab>

  <Tab title="Django Integration">
    ```python theme={null}
    # views.py
    from django.http import JsonResponse
    from django.views import View
    from cyberdesk import CyberdeskClient, RunCreate
    import os
    import json

    client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])

    class PatientLookupView(View):
        def post(self, request):
            """Retrieve patient data from Epic EHR."""
            
            try:
                data = json.loads(request.body)
                patient_id = data['patient_id']
                patient_first_name = data['patient_first_name']
                patient_last_name = data['patient_last_name']
                
                # Create a run to fetch patient data
                run_data = RunCreate(
                    workflow_id='550e8400-e29b-41d4-a716-446655440000',  # Your Epic workflow ID
                    machine_id='550e8400-e29b-41d4-a716-446655440001',   # Your Epic machine ID
                    input_values={
                        'patient_id': patient_id,
                        'patient_first_name': patient_first_name,
                        'patient_last_name': patient_last_name
                    }
                )
                
                response = client.runs.create_sync(run_data)
                if response.error:
                    return JsonResponse({'error': f"Failed to create run: {response.error}"}, status=500)
                
                run = response.data
                print(f"Fetching data for patient {patient_first_name} {patient_last_name} ({patient_id})...")
                
                # Wait for completion
                completed_run = wait_for_run_completion_sync(client, run.id, 120)
                
                # Process the patient data
                patient_data = completed_run.output_data
                
                return JsonResponse({
                    'patientId': patient_id,
                    'demographics': patient_data['demographics'],
                    'medications': patient_data['medications'],
                    'vitals': patient_data['recentVitals'],
                    'lastUpdated': patient_data['lastUpdated']
                })
                
            except TimeoutError:
                return JsonResponse({'error': 'Request timed out'}, status=504)
            except Exception as e:
                return JsonResponse({'error': str(e)}, status=500)
    ```
  </Tab>

  <Tab title="Script Example">
    ```python theme={null}
    #!/usr/bin/env python3
    """
    Script to extract patient data from Epic EHR and save to CSV.
    """

    import csv
    import asyncio
    from datetime import datetime
    from cyberdesk import CyberdeskClient, RunCreate
    import os

    async def extract_patient_data(client, patient_list):
        """Extract data for multiple patients."""
        results = []
        
        for patient in patient_list:
            print(f"Processing patient {patient['first_name']} {patient['last_name']} ({patient['id']})...")
            
            try:
                # Create run for patient data
                run_data = RunCreate(
                    workflow_id='550e8400-e29b-41d4-a716-446655440000',  # Your Epic workflow ID
                    machine_id='550e8400-e29b-41d4-a716-446655440001',   # Your Epic machine ID
                    input_values={
                        'patient_id': patient['id'],
                        'patient_first_name': patient['first_name'],
                        'patient_last_name': patient['last_name']
                    }
                )
                
                response = await client.runs.create(run_data)
                if response.error:
                    print(f"Error for patient {patient['id']}: {response.error}")
                    continue
                
                # Wait for completion
                completed_run = await wait_for_run_completion(client, response.data.id)
                
                patient_data = completed_run.output_data
                results.append({
                    'patient_id': patient['id'],
                    'first_name': patient['first_name'],
                    'last_name': patient['last_name'],
                    'dob': patient_data.get('demographics', {}).get('dateOfBirth'),
                    'mrn': patient_data.get('demographics', {}).get('mrn'),
                    'phone': patient_data.get('demographics', {}).get('phone'),
                    'email': patient_data.get('demographics', {}).get('email'),
                    'medication_count': len(patient_data.get('medications', [])),
                    'extracted_at': datetime.now().isoformat()
                })
                
            except Exception as e:
                print(f"Failed to process patient {patient['id']}: {e}")
        
        return results

    async def main():
        # Read patient list from CSV
        patient_list = []
        with open('patients.csv', 'r') as f:
            reader = csv.DictReader(f)
            for row in reader:
                patient_list.append({
                    'id': row['patient_id'],
                    'first_name': row['first_name'],
                    'last_name': row['last_name']
                })
        
        with CyberdeskClient(os.environ['CYBERDESK_API_KEY']) as client:
            results = await extract_patient_data(client, patient_list)
        
        # Save to CSV
        with open('patient_data_export.csv', 'w', newline='') as f:
            if results:
                writer = csv.DictWriter(f, fieldnames=results[0].keys())
                writer.writeheader()
                writer.writerows(results)
        
        print(f"Exported {len(results)} patient records to patient_data_export.csv")

    if __name__ == "__main__":
        asyncio.run(main())
    ```
  </Tab>
</Tabs>

## Other SDK Resources

<Warning>
  **Important:** While the SDK provides full CRUD operations for all Cyberdesk resources, we strongly recommend using the [Cyberdesk Dashboard](https://cyberdesk.io/dashboard) for managing these resources. The dashboard provides a more intuitive interface for:

  * Creating and editing workflows
  * Managing machines
  * Viewing connections
  * Analyzing trajectories

  The SDK methods below are provided for advanced use cases and automation scenarios.
</Warning>

<AccordionGroup>
  <Accordion title="Pools">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import PoolCreate, PoolUpdate, MachinePoolUpdate

        # List pools
        response = await client.pools.list()
        pools = response.data.items

        # Create a pool
        pool_data = PoolCreate(
            name='Customer A',
            description='All machines for Customer A'
        )
        response = await client.pools.create(pool_data)

        # Get a pool (with optional machine list)
        response = await client.pools.get('pool-id', include_machines=True)

        # Update a pool
        update_data = PoolUpdate(description='Updated description')
        response = await client.pools.update('pool-id', update_data)

        # Add machines to a pool
        from cyberdesk import MachinePoolAssignment
        assignment_data = MachinePoolAssignment(
            machine_ids=['machine-1', 'machine-2']
        )
        response = await client.pools.add_machines('pool-id', assignment_data)

        # Update a machine's pools
        pool_update = MachinePoolUpdate(
            pool_ids=['pool-1', 'pool-2', 'pool-3']
        )
        response = await client.machines.update_pools('machine-id', pool_update)

        # Delete a pool
        response = await client.pools.delete('pool-id')
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import PoolCreate, PoolUpdate, MachinePoolUpdate

        # List pools
        response = client.pools.list_sync()
        pools = response.data.items

        # Create a pool
        pool_data = PoolCreate(
            name='Customer A',
            description='All machines for Customer A'
        )
        response = client.pools.create_sync(pool_data)

        # Get a pool
        response = client.pools.get_sync('pool-id', include_machines=True)

        # Update a pool
        update_data = PoolUpdate(description='Updated description')
        response = client.pools.update_sync('pool-id', update_data)

        # Add machines to a pool
        from cyberdesk import MachinePoolAssignment
        assignment_data = MachinePoolAssignment(
            machine_ids=['machine-1', 'machine-2']
        )
        response = client.pools.add_machines_sync('pool-id', assignment_data)

        # Update a machine's pools
        pool_update = MachinePoolUpdate(
            pool_ids=['pool-1', 'pool-2', 'pool-3']
        )
        response = client.machines.update_pools_sync('machine-id', pool_update)

        # Delete a pool
        response = client.pools.delete_sync('pool-id')
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Machines">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import MachineCreate, MachineUpdate

        # List machines
        response = await client.machines.list()
        machines = response.data.items

        # Create a machine
        machine_data = MachineCreate(
            name='Epic EHR Machine',
            description='Production Epic environment'
        )
        response = await client.machines.create(machine_data)

        # Get a machine
        response = await client.machines.get('machine-id')
        machine = response.data

        # Update a machine
        update_data = MachineUpdate(name='Updated Name')
        response = await client.machines.update('machine-id', update_data)

        # Delete a machine
        response = await client.machines.delete('machine-id')
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import MachineCreate, MachineUpdate

        # List machines
        response = client.machines.list_sync()
        machines = response.data.items

        # Create a machine
        machine_data = MachineCreate(
            name='Epic EHR Machine',
            description='Production Epic environment'
        )
        response = client.machines.create_sync(machine_data)

        # Get a machine
        response = client.machines.get_sync('machine-id')

        # Update a machine
        update_data = MachineUpdate(name='Updated Name')
        response = client.machines.update_sync('machine-id', update_data)

        # Delete a machine
        response = client.machines.delete_sync('machine-id')
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Workflows">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import WorkflowCreate, WorkflowUpdate

        # List workflows
        response = await client.workflows.list()

        # Create a workflow
        workflow_data = WorkflowCreate(
            name='Patient Data Extraction',
            description='Extracts patient demographics and medications',
            main_prompt='Navigate to patient chart and extract data'
        )
        response = await client.workflows.create(workflow_data)

        # Get a workflow
        response = await client.workflows.get('workflow-id')

        # Update a workflow
        update_data = WorkflowUpdate(description='Updated description')
        response = await client.workflows.update('workflow-id', update_data)

        # Delete a workflow
        response = await client.workflows.delete('workflow-id')
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        # List workflows
        response = client.workflows.list_sync()

        # Create a workflow
        workflow_data = WorkflowCreate(
            name='Patient Data Extraction',
            description='Extracts patient demographics and medications',
            main_prompt='Navigate to patient chart and extract data'
        )
        response = client.workflows.create_sync(workflow_data)

        # Get a workflow
        response = client.workflows.get_sync('workflow-id')

        # Update a workflow
        update_data = WorkflowUpdate(description='Updated description')
        response = client.workflows.update_sync('workflow-id', update_data)

        # Delete a workflow
        response = client.workflows.delete_sync('workflow-id')
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Workflow Prompt Images">
    Upload and manage images for use in workflow prompts. The returned `supabase_url` can be embedded directly in workflow prompt HTML.

    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        # Upload an image from a file path
        response = await client.workflows.upload_prompt_image(file_path='./screenshot.png')
        if response.data:
            print(f"Supabase URL: {response.data.supabase_url}")
            print(f"Signed URL (for preview): {response.data.signed_url}")
            # Use the supabase_url in your workflow prompt HTML:
            # <img src="supabase://workflow-prompt-images/org_xxx/prompt-assets/screenshot.png" alt="Screenshot">

        # Upload from raw bytes
        with open('./image.png', 'rb') as f:
            image_bytes = f.read()
        response = await client.workflows.upload_prompt_image(
            file_content=image_bytes,
            filename='my-image.png',
            content_type='image/png'
        )

        # List all prompt images
        response = await client.workflows.list_prompt_images()
        for img in response.data:
            print(f"{img.filename}: {img.supabase_url}")

        # Get a fresh signed URL for an existing image
        response = await client.workflows.get_prompt_image_signed_url(
            path='org_xxx/prompt-assets/my-image.png'
        )
        print(f"Signed URL: {response.data.signed_url}")
        print(f"Expires in: {response.data.expires_in} seconds")

        # Delete an image
        response = await client.workflows.delete_prompt_image(
            path='org_xxx/prompt-assets/my-image.png'
        )
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        # Upload an image from a file path
        response = client.workflows.upload_prompt_image_sync(file_path='./screenshot.png')
        if response.data:
            print(f"Supabase URL: {response.data.supabase_url}")
            # Use in workflow prompt: <img src="{response.data.supabase_url}" alt="Screenshot">

        # List all prompt images
        response = client.workflows.list_prompt_images_sync()
        for img in response.data:
            print(f"{img.filename}: {img.supabase_url}")

        # Get a fresh signed URL
        response = client.workflows.get_prompt_image_signed_url_sync(
            path='org_xxx/prompt-assets/my-image.png'
        )

        # Delete an image
        response = client.workflows.delete_prompt_image_sync(
            path='org_xxx/prompt-assets/my-image.png'
        )
        ```
      </Tab>
    </Tabs>

    <Info>
      **Using prompt images in workflows:** After uploading, copy the `supabase_url` and use it in your workflow's `main_prompt` HTML:

      ```html theme={null}
      <p>Click on the button shown in this screenshot:</p>
      <img src="supabase://workflow-prompt-images/org_xxx/prompt-assets/button.png" alt="Button to click">
      <p>Then proceed to fill out the form.</p>
      ```

      Cyberdesk automatically resolves these URLs when running workflows, displaying the images to the AI agent.
    </Info>
  </Accordion>

  <Accordion title="Connections">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import ConnectionCreate, ConnectionStatus

        # List connections
        response = await client.connections.list()

        # Create a connection
        connection_data = ConnectionCreate(machine_id='machine-id')
        response = await client.connections.create(connection_data)

        # Filter by machine and status
        response = await client.connections.list(
            machine_id='machine-id',
            status=ConnectionStatus.CONNECTED
        )
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        # List connections
        response = client.connections.list_sync()

        # Create a connection
        connection_data = ConnectionCreate(machine_id='machine-id')
        response = client.connections.create_sync(connection_data)

        # Filter by machine and status
        response = client.connections.list_sync(
            machine_id='machine-id',
            status='connected'
        )
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Trajectories">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import TrajectoryCreate, TrajectoryUpdate

        # List trajectories
        response = await client.trajectories.list()

        # Get a trajectory
        response = await client.trajectories.get('trajectory-id')

        # Get latest trajectory for a workflow
        response = await client.trajectories.get_latest_for_workflow('workflow-id')

        # Create a trajectory
        trajectory_data = TrajectoryCreate(
            workflow_id='workflow-id',
            steps=[]
        )
        response = await client.trajectories.create(trajectory_data)

        # Update a trajectory
        update_data = TrajectoryUpdate(steps=[])
        response = await client.trajectories.update('trajectory-id', update_data)

        # Duplicate a trajectory (creates a copy with fresh image copies)
        response = await client.trajectories.duplicate('trajectory-id')

        # Delete a trajectory
        response = await client.trajectories.delete('trajectory-id')
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        # List trajectories
        response = client.trajectories.list_sync()

        # Get a trajectory
        response = client.trajectories.get_sync('trajectory-id')

        # Get latest trajectory for a workflow
        response = client.trajectories.get_latest_for_workflow_sync('workflow-id')

        # Create a trajectory
        trajectory_data = TrajectoryCreate(
            workflow_id='workflow-id',
            steps=[]
        )
        response = client.trajectories.create_sync(trajectory_data)

        # Update a trajectory
        update_data = TrajectoryUpdate(steps=[])
        response = client.trajectories.update_sync('trajectory-id', update_data)

        # Duplicate a trajectory (creates a copy with fresh image copies)
        response = client.trajectories.duplicate_sync('trajectory-id')

        # Delete a trajectory
        response = client.trajectories.delete_sync('trajectory-id')
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Workflow Tags">
    Organize your workflows with tags. Tags support emojis, colors, and optional grouping for mutual exclusivity.

    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import WorkflowTagCreate, WorkflowTagUpdate

        # List all tags (with workflow counts)
        response = await client.workflow_tags.list()
        for tag in response.data:
            print(f"{tag.emoji or ''} {tag.name}: {tag.workflow_count} workflows")

        # Create a tag
        response = await client.workflow_tags.create(
            name="Production",
            emoji="🚀",
            color="green",
            description="Production-ready workflows"
        )

        # Create a tag in a group (for mutual exclusivity)
        response = await client.workflow_tags.create(
            name="High Priority",
            emoji="🔴",
            group_id="priority-group-id"
        )

        # Get a specific tag
        response = await client.workflow_tags.get('tag-id')

        # Update a tag
        response = await client.workflow_tags.update('tag-id', name="Updated Name", emoji="✨")

        # Archive a tag (soft delete - keeps on existing workflows)
        response = await client.workflow_tags.archive('tag-id')

        # Unarchive a tag
        response = await client.workflow_tags.unarchive('tag-id')

        # Delete a tag (hard delete)
        response = await client.workflow_tags.delete('tag-id')

        # Reorder tags (for drag-and-drop UI)
        response = await client.workflow_tags.reorder(['tag-3', 'tag-1', 'tag-2'])

        # Add tags to a workflow
        response = await client.workflow_tags.add_to_workflow(
            workflow_id='workflow-id',
            tag_ids=['tag-1', 'tag-2']
        )

        # Remove a tag from a workflow
        response = await client.workflow_tags.remove_from_workflow('workflow-id', 'tag-id')

        # Get all tags for a workflow
        response = await client.workflow_tags.get_for_workflow('workflow-id')

        # Bulk add tags to multiple workflows
        response = await client.workflow_tags.bulk_add_to_workflows(
            workflow_ids=['wf-1', 'wf-2', 'wf-3'],
            tag_ids=['production-tag-id']
        )
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import WorkflowTagCreate, WorkflowTagUpdate

        # List all tags (with workflow counts)
        response = client.workflow_tags.list_sync()
        for tag in response.data:
            print(f"{tag.emoji or ''} {tag.name}: {tag.workflow_count} workflows")

        # Create a tag
        response = client.workflow_tags.create_sync(
            name="Production",
            emoji="🚀",
            color="green"
        )

        # Add tags to a workflow
        response = client.workflow_tags.add_to_workflow_sync('workflow-id', ['tag-1', 'tag-2'])

        # Archive/unarchive tags
        response = client.workflow_tags.archive_sync('tag-id')
        response = client.workflow_tags.unarchive_sync('tag-id')

        # Reorder tags
        response = client.workflow_tags.reorder_sync(['tag-3', 'tag-1', 'tag-2'])
        ```
      </Tab>
    </Tabs>

    <Info>
      **Mutual Exclusivity:** When a tag belongs to a group, adding it to a workflow automatically removes any other tag from the same group. This is useful for status-like tags (e.g., "Draft" vs "Published").
    </Info>
  </Accordion>

  <Accordion title="Workflow Tag Groups">
    Group tags for organization and mutual exclusivity. Only one tag from a group can be assigned to a workflow at a time.

    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import WorkflowTagGroupCreate, WorkflowTagGroupUpdate

        # List all tag groups
        response = await client.workflow_tag_groups.list()
        for group in response.data:
            print(f"{group.emoji or ''} {group.name}")

        # Create a tag group
        response = await client.workflow_tag_groups.create(
            name="Priority",
            emoji="🔥",
            color="red",
            description="Priority levels - only one per workflow"
        )

        # Get a specific group
        response = await client.workflow_tag_groups.get('group-id')

        # Update a group
        response = await client.workflow_tag_groups.update('group-id', name="Updated Priority")

        # Delete a group (tags become ungrouped, not deleted)
        response = await client.workflow_tag_groups.delete('group-id')

        # Reorder groups (for drag-and-drop UI)
        response = await client.workflow_tag_groups.reorder(['group-2', 'group-1', 'group-3'])
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import WorkflowTagGroupCreate

        # List all tag groups
        response = client.workflow_tag_groups.list_sync()

        # Create a tag group
        response = client.workflow_tag_groups.create_sync(
            name="Priority",
            emoji="🔥",
            color="red"
        )

        # Reorder groups
        response = client.workflow_tag_groups.reorder_sync(['group-2', 'group-1', 'group-3'])
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Model Configurations">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from cyberdesk import ModelConfigurationCreate, ModelConfigurationUpdate

        # List all model configurations (system defaults + org-owned)
        response = await client.model_configurations.list()
        configs = response.data

        # Create a custom model configuration
        config_data = ModelConfigurationCreate(
            name='My GPT-4o',
            provider='openai',
            model_id='gpt-4o',
            api_key=os.environ['OPENAI_API_KEY'],  # Stored securely
            description='Custom OpenAI config with our API key'
        )
        response = await client.model_configurations.create(config_data)

        # Get a specific configuration
        response = await client.model_configurations.get('config-id')

        # Update a configuration
        update_data = ModelConfigurationUpdate(name='Updated Name')
        response = await client.model_configurations.update('config-id', update_data)

        # Delete a configuration
        response = await client.model_configurations.delete('config-id')
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from cyberdesk import ModelConfigurationCreate, ModelConfigurationUpdate

        # List all model configurations
        response = client.model_configurations.list_sync()

        # Create a custom model configuration
        config_data = ModelConfigurationCreate(
            name='My GPT-4o',
            provider='openai',
            model_id='gpt-4o',
            api_key=os.environ['OPENAI_API_KEY'],
            description='Custom OpenAI config with our API key'
        )
        response = client.model_configurations.create_sync(config_data)

        # Get a specific configuration
        response = client.model_configurations.get_sync('config-id')

        # Update a configuration
        update_data = ModelConfigurationUpdate(name='Updated Name')
        response = client.model_configurations.update_sync('config-id', update_data)

        # Delete a configuration
        response = client.model_configurations.delete_sync('config-id')
        ```
      </Tab>
    </Tabs>
  </Accordion>

  <Accordion title="Usage">
    <Tabs>
      <Tab title="Async">
        ```python theme={null}
        from datetime import datetime
        from cyberdesk import UsageMode

        # Aggregate usage data for a date range
        response = await client.usage.aggregate(
            from_date=datetime(2025, 1, 1),
            to_date=datetime(2025, 1, 31),
            mode=UsageMode.SIMULATED  # or UsageMode.BILLED for Stripe billing
        )

        if response.data:
            usage = response.data
            print(f"Runs: {usage.runs_counted}")
            print(f"Agentic steps: {usage.total_agentic_steps}")
            print(f"Cached steps: {usage.total_cached_steps}")
        ```
      </Tab>

      <Tab title="Sync">
        ```python theme={null}
        from datetime import datetime
        from cyberdesk import UsageMode

        # Aggregate usage data for a date range
        response = client.usage.aggregate_sync(
            from_date=datetime(2025, 1, 1),
            to_date=datetime(2025, 1, 31),
            mode=UsageMode.SIMULATED
        )

        if response.data:
            usage = response.data
            print(f"Runs: {usage.runs_counted}")
            print(f"Agentic steps: {usage.total_agentic_steps}")
            print(f"Cached steps: {usage.total_cached_steps}")
        ```
      </Tab>
    </Tabs>

    See [Usage-Based Billing](/additional-details/usage-based-billing#programmatic-usage-data) for more details.
  </Accordion>
</AccordionGroup>

## Error Handling

All SDK methods return an `ApiResponse` object with `data` and `error` attributes:

```python theme={null}
response = await client.runs.create(run_data)

if response.error:
    status_code = getattr(response.error, 'status_code', None)
    if status_code == 401:
        raise Exception("Invalid API key")
    raise response.error

validation_errors = getattr(response.data, 'detail', None)
if validation_errors:
    print("Validation failed:", validation_errors)
else:
    print(f"Run created: {response.data.id}")
```

### Common Error Types

* **Unexpected HTTP status / transport errors**: surfaced in `response.error` as exceptions. For HTTP failures, check `getattr(response.error, "status_code", None)`.
* **Validation errors (`422`)**: returned in `response.data.detail`, not `response.error`.
* **Successful responses**: returned in `response.data`.

### Exception Handling Pattern

```python theme={null}
import logging

logger = logging.getLogger(__name__)

async def safe_run_creation(client, run_data):
    response = await client.runs.create(run_data)

    if response.error:
        logger.error("API error: %s", response.error)
        raise response.error

    validation_errors = getattr(response.data, "detail", None)
    if validation_errors:
        raise ValueError(f"Validation failed: {validation_errors}")

    return response.data
```

## Type Hints and IDE Support

The SDK provides comprehensive type hints for better IDE support:

```python theme={null}
from cyberdesk import (
    CyberdeskClient,
    RunResponse,
    RunStatus,
    MachineStatus,
    ConnectionStatus
)
from typing import Optional

def process_run(run: RunResponse) -> Optional[dict]:
    """Process a completed run."""
    if run.status == RunStatus.SUCCESS:
        # IDE knows output_data is available
        return run.output_data
    elif run.status == RunStatus.TASK_FAILED:
        print(f"Run failed: {', '.join(run.error or [])}")
        return None
    else:
        print(f"Run status: {run.status}")
        return None
```

## Working with Jupyter Notebooks

The SDK works seamlessly in Jupyter notebooks:

```python theme={null}
# In Jupyter, use sync methods or nest async code
from cyberdesk import CyberdeskClient
import pandas as pd

client = CyberdeskClient('YOUR_API_KEY')

# Get recent runs
response = client.runs.list_sync(limit=10)
runs = response.data.items

# Convert to DataFrame for analysis
df = pd.DataFrame([
    {
        'id': run.id,
        'workflow_id': run.workflow_id,
        'status': run.status,
        'created_at': run.created_at,
        'duration': (run.ended_at - run.created_at).total_seconds() if run.ended_at else None
    }
    for run in runs
])

# Analyze run performance
df.groupby('status').agg({
    'id': 'count',
    'duration': 'mean'
})
```

## Best Practices

<CardGroup cols={2}>
  <Card title="Use Environment Variables" icon="key">
    Store API keys and workflow IDs in environment variables, never in code.
  </Card>

  <Card title="Built-in Retry Logic" icon="rotate">
    The SDK automatically retries on transient failures with exponential backoff. Adjust `retry=RetryConfig(...)` if needed.
  </Card>

  <Card title="Handle Timeouts" icon="clock">
    Set reasonable timeouts for run completion based on your workflow complexity.
  </Card>

  <Card title="Log Everything" icon="scroll">
    Keep detailed logs of run IDs and statuses for debugging and audit trails.
  </Card>

  <Card title="Use Type Hints" icon="code">
    Leverage type hints for better IDE support and fewer runtime errors.
  </Card>

  <Card title="Close Connections" icon="plug">
    Use context managers or explicitly close clients to free resources.
  </Card>
</CardGroup>

## Performance Optimization

### Concurrent Operations

When working with multiple operations, use asyncio for better performance:

```python theme={null}
import asyncio
import os
from cyberdesk import CyberdeskClient

async def process_multiple_patients(patient_ids):
    """Process multiple patients concurrently."""
    with CyberdeskClient(os.environ['CYBERDESK_API_KEY']) as client:
        # Create runs concurrently
        tasks = [
            create_patient_run(client, patient_id) 
            for patient_id in patient_ids
        ]
        runs = await asyncio.gather(*tasks)
        
        # Wait for all runs to complete
        results = await asyncio.gather(*[
            wait_for_run_completion(client, run.id) 
            for run in runs if run
        ])
        
        return results

# Process 10 patients in parallel
results = asyncio.run(process_multiple_patients(patient_ids[:10]))
```

### Connection Pooling

The SDK automatically manages connection pooling for optimal performance. No additional configuration is needed.

## Next Steps

<CardGroup>
  <Card title="API Reference" icon="book" href="/api-reference">
    Explore the complete API documentation
  </Card>

  <Card title="Dashboard" icon="browser" href="https://cyberdesk.io/dashboard">
    Create and manage workflows in the dashboard
  </Card>

  <Card title="Examples" icon="code" href="https://github.com/cyberdesk-io/examples">
    Browse more code examples and use cases
  </Card>
</CardGroup>
