Quick Start

Get up and running with Cyberdesk in under 5 minutes. This guide assumes you’ve already created workflows in the Cyberdesk Dashboard.
1

Install the SDK

pip install cyberdesk
2

Initialize the client and create a run

import asyncio
from cyberdesk import CyberdeskClient, RunCreate

async def main():
    # Initialize the client
    client = CyberdeskClient('YOUR_API_KEY')
    
    # Create a run for your workflow
    run_data = RunCreate(
        workflow_id='your-workflow-id',
        machine_id='your-machine-id',
        input_values={
            'patient_id': '12345',
            'patient_first_name': 'John',
            'patient_last_name': 'Doe'
        }
    )
    
    response = await client.runs.create(run_data)
    
    if response.error:
        print(f"Error creating run: {response.error}")
        return
        
    run = response.data
    
    # Wait for the run to complete
    while run.status in ['scheduling', 'running']:
        await asyncio.sleep(5)  # Wait 5 seconds
        response = await client.runs.get(run.id)
        run = response.data
    
    # Get the output data
    if run.status == 'success':
        print('Patient data:', run.output_data)
    else:
        print('Run failed:', ', '.join(run.error or []))

# Run the async function
asyncio.run(main())
Create and manage workflows in the Cyberdesk Dashboard. The dashboard supports rich, multimodal prompts — you can insert images alongside text to help the agent understand tricky UI elements. Use the SDK to execute runs against those workflows.

Installation & Setup

Prerequisites

  • Python 3.8 or higher
  • pip, poetry, or pipenv for package management

Installation

pip install cyberdesk

Virtual Environment Setup

Always use a virtual environment to avoid dependency conflicts:
# Create virtual environment
python -m venv venv

# Activate it
# On macOS/Linux:
source venv/bin/activate

# On Windows:
venv\Scripts\activate

# Install the SDK
pip install cyberdesk

Authentication

Creating a Client

from cyberdesk import CyberdeskClient

client = CyberdeskClient('YOUR_API_KEY')

Custom Base URL

For self-hosted or enterprise deployments:
client = CyberdeskClient('YOUR_API_KEY', base_url='https://api.your-domain.com')

Using Context Managers

The client supports context managers for proper resource cleanup:
async with CyberdeskClient('YOUR_API_KEY') as client:
    response = await client.runs.list()
    # Client automatically cleans up when done
Never hardcode API keys in your source code. Use environment variables:
import os
from cyberdesk import CyberdeskClient

client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])

Sync vs Async

The Cyberdesk Python SDK provides both synchronous and asynchronous methods for all operations.

When to Use Async

Use async methods when:
  • Building web applications with async frameworks (FastAPI, aiohttp)
  • Making multiple concurrent API calls
  • Integrating with other async libraries
  • Building high-performance applications

When to Use Sync

Use sync methods when:
  • Writing simple scripts
  • Working in Jupyter notebooks
  • Integrating with sync-only frameworks (Django, Flask)
  • Learning or prototyping

Method Naming Convention

  • Async methods: client.resource.method()
  • Sync methods: client.resource.method_sync()
# Async
response = await client.runs.create(data)

# Sync
response = client.runs.create_sync(data)

Working with Runs

Runs are the primary way to execute workflows in Cyberdesk. Here’s everything you need to know about managing runs through the SDK.

Creating a Run

from cyberdesk import RunCreate

async def create_patient_data_run():
    run_data = RunCreate(
        workflow_id='workflow-uuid',
        machine_id='machine-uuid',
        input_values={
            'patient_id': '12345',
            'patient_first_name': 'John',
            'patient_last_name': 'Doe'
        }
    )
    
    response = await client.runs.create(run_data)
    
    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created: {response.data.id}")
        return response.data

Creating a Run with Machine Pools

You can specify pool requirements when creating a run. This ensures your run is executed on a machine that belongs to ALL specified pools. This is especially useful for:
  • Running workflows on customer-specific machines
  • Requiring machines with specific software installed
  • Organizing machines by location or capability
from cyberdesk import RunCreate

async def create_run_with_pools():
    # Get pool IDs (typically from your configuration or database)
    customer_pool_id = 'pool-uuid-1'  # e.g., "Customer A" pool
    excel_pool_id = 'pool-uuid-2'     # e.g., "Has Excel" pool
    
    run_data = RunCreate(
        workflow_id='workflow-uuid',
        # Machine must be in BOTH pools (intersection, not union)
        pool_ids=[customer_pool_id, excel_pool_id],
        input_values={
            'patient_id': '12345',
            'patient_first_name': 'John',
            'patient_last_name': 'Doe'
        }
    )
    
    response = await client.runs.create(run_data)
    
    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created: {response.data.id}")
        print(f"Will execute on machine in pools: {run_data.pool_ids}")
        return response.data
Pool Matching Logic: When you specify multiple pools, Cyberdesk will only select machines that belong to ALL specified pools (intersection). For example, if you specify ["Customer A", "Has Excel"], only machines that are in both pools will be considered.
If you provide a machine_id when creating a run, pool_ids are ignored. Cyberdesk will only attempt the specified machine; if it’s busy or unavailable, the run will wait until that machine is free (no fallback to other machines or pools).
Creating and Managing Pools: While you can manage pools via the SDK, we recommend using the Cyberdesk Dashboard for a more intuitive experience:
  1. Navigate to any machine in the dashboard
  2. Click on the machine to view its details
  3. Add the machine to existing pools or create new pools
  4. Assign multiple pools to organize machines by customer, capability, or location
Common pool strategies:
  • By Customer: “Customer A”, “Customer B”, etc.
  • By Software: “Has Excel”, “Has Chrome”, “Has Epic EHR”
  • By Environment: “Production”, “Staging”, “Development”
  • By Location: “US-East”, “EU-West”, etc.

Creating a Run with File Inputs

You can attach files to a run at creation. This is useful for workflows that need to process or manipulate files on the remote machine.
import base64
from cyberdesk import RunCreate, FileInput

async def create_run_with_file():
    # Read and encode a file
    with open("path/to/your/file.txt", "rb") as f:
        content = base64.b64encode(f.read()).decode("utf-8")

    run_data = RunCreate(
        workflow_id='workflow-uuid',
        file_inputs=[
            FileInput(
                filename="file.txt",
                content=content,
                target_path="C:/Users/Default/Desktop/file.txt", # Optional
                cleanup_imports_after_run=True # Optional
            )
        ]
    )
    
    response = await client.runs.create(run_data)
    
    if response.error:
        print(f"Failed to create run: {response.error}")
    else:
        print(f"Run created with file attachment: {response.data.id}")
        return response.data
filename
string
required
The name of the file, including its extension.
content
string
required
The base64-encoded content of the file.
target_path
string
The absolute path on the remote machine where the file should be saved. If not provided, it defaults to ~/CyberdeskTransfers/.
cleanup_imports_after_run
boolean
If True, the file will be deleted from the remote machine after the run completes (whether it succeeds or fails). Defaults to False.

Listing Runs

# List all runs
response = await client.runs.list()
runs = response.data.items

# List with pagination
response = await client.runs.list(skip=0, limit=20)

# Filter by status
from cyberdesk import RunStatus
response = await client.runs.list(status=RunStatus.SUCCESS)

# Filter by workflow
response = await client.runs.list(workflow_id='workflow-uuid')

# Multiple filters
response = await client.runs.list(
    workflow_id='workflow-uuid',
    status=RunStatus.RUNNING,
    limit=10
)

Getting a Specific Run

response = await client.runs.get('run-uuid')

if response.data:
    run = response.data
    print(f"Run status: {run.status}")
    print(f"Output data: {run.output_data}")

Updating a Run

Run updates are typically handled automatically by the Cyberdesk system. Manual updates are rarely needed.
from cyberdesk import RunUpdate, RunStatus

update_data = RunUpdate(status=RunStatus.CANCELLED)
response = await client.runs.update('run-uuid', update_data)

Deleting a Run

response = await client.runs.delete('run-uuid')

if not response.error:
    print("Run deleted successfully")

Polling for Run Completion

Here’s a robust pattern for waiting for runs to complete:
import asyncio
from datetime import datetime, timedelta

async def wait_for_run_completion(client, run_id, timeout_seconds=300):
    """Wait for a run to complete with timeout."""
    start_time = datetime.now()
    timeout = timedelta(seconds=timeout_seconds)
    
    while datetime.now() - start_time < timeout:
        response = await client.runs.get(run_id)
        
        if response.error:
            raise Exception(f"Failed to get run status: {response.error}")
        
        run = response.data
        
        if run.status == 'success':
            return run
        
        if run.status in ['error', 'cancelled']:
            raise Exception(f"Run {run.status}: {', '.join(run.error or ['Unknown error'])}")
        
        await asyncio.sleep(5)  # Poll every 5 seconds
    
    raise TimeoutError(f"Run timed out after {timeout_seconds} seconds")

# Usage
try:
    completed_run = await wait_for_run_completion(client, run.id)
    print("Output:", completed_run.output_data)
except Exception as e:
    print(f"Run failed: {e}")

Working with File Attachments

Manage files associated with your runs, such as input files uploaded at creation or output files generated by a workflow.

Listing Run Attachments

You can list all attachments for a specific run and filter them by type (INPUT or OUTPUT).
from cyberdesk import AttachmentType

# List all attachments for a run
response = await client.run_attachments.list(run_id='run-uuid')
attachments = response.data.items

# List only output attachments
response = await client.run_attachments.list(
    run_id='run-uuid',
    attachment_type=AttachmentType.OUTPUT
)
output_files = response.data.items

Downloading an Attachment

There are multiple ways to download attachments depending on your use case:

Method 1: Get a Download URL

Get a signed URL that triggers automatic download when accessed. This is perfect for web applications where you want to provide download links to users.
# Get a download URL with custom expiration (default: 5 minutes)
response = await client.run_attachments.get_download_url(
    'attachment-uuid',
    expires_in=600  # 10 minutes
)

if response.data:
    print(f"Download URL: {response.data.url}")
    print(f"Expires in: {response.data.expires_in} seconds")
    
    # You can use this URL in your web app or share it
    # The URL will trigger automatic download when accessed

Method 2: Download Raw File Content

Download the file content directly as bytes. Useful when you need to process the file in memory.
# Get the attachment metadata first
response = await client.run_attachments.get('attachment-uuid')
attachment_info = response.data

# Download the file content
response = await client.run_attachments.download(attachment_info.id)

if not response.error:
    # Save the file
    with open(attachment_info.filename, "wb") as f:
        f.write(response.data)
    print(f"Downloaded {attachment_info.filename}")

Method 3: Save to File (Convenience Method)

The SDK provides a convenience method that downloads and saves the file in one operation.
# Save directly to a file
response = await client.run_attachments.save_to_file(
    'attachment-uuid',
    output_path='./downloads/'  # Will use original filename
)

if response.data:
    print(f"Saved to: {response.data['path']}")
    print(f"File size: {response.data['size']} bytes")

# Or specify a custom filename
response = await client.run_attachments.save_to_file(
    'attachment-uuid',
    output_path='./downloads/custom-name.pdf'
)

Example: Upload, Process, and Download

Here’s a full example of a workflow that processes a file.
  1. Workflow Prompt: "Take the file at ~/CyberdeskTransfers/report.txt, add a summary to the end of it, and mark it for export."
  2. Workflow Setting: includes_file_exports is set to True.
import asyncio
import base64
from cyberdesk import CyberdeskClient, RunCreate, FileInput, AttachmentType

async def main():
    async with CyberdeskClient("YOUR_API_KEY") as client:
        # 1. Prepare and upload the input file
        report_content = "This is the initial report content."
        encoded_content = base64.b64encode(report_content.encode()).decode()

        run_data = RunCreate(
            workflow_id="your-file-processing-workflow-id",
            file_inputs=[
                FileInput(filename="report.txt", content=encoded_content)
            ]
        )
        response = await client.runs.create(run_data)
        run = response.data
        print(f"Run started: {run.id}")

        # 2. Wait for the run to complete
        completed_run = await wait_for_run_completion(client, run.id)
        print("Run finished with status:", completed_run.status)

        # 3. Find and download the output attachment
        if completed_run.status == 'success':
            response = await client.run_attachments.list(
                run_id=completed_run.id,
                attachment_type=AttachmentType.OUTPUT
            )
            output_attachments = response.data.items
            
            if output_attachments:
                processed_report = output_attachments[0]
                
                # Option 1: Get a download URL (for web apps)
                url_response = await client.run_attachments.get_download_url(processed_report.id)
                if url_response.data:
                    print(f"Download URL: {url_response.data.url}")
                    print(f"Valid for: {url_response.data.expires_in} seconds")
                
                # Option 2: Download the processed file directly
                response = await client.run_attachments.download(processed_report.id)
                
                if not response.error:
                    # Decode and print the content
                    processed_content = response.data.decode()
                    print("\n--- Processed Report ---")
                    print(processed_content)
                    print("------------------------")
                else:
                    print(f"Failed to download processed file: {response.error}")
            else:
                print("No output files were generated.")

# Assuming wait_for_run_completion is defined as in the previous examples
asyncio.run(main())
This example demonstrates the complete lifecycle: uploading a file with a run, executing a workflow that modifies it, and then retrieving the processed file from the run’s output attachments.

Bulk Creating Runs with Pools

When creating multiple runs in bulk, you can also specify pool requirements. All runs will be distributed across machines that match the pool criteria.
If you provide a machine_id in a bulk run request, pool_ids are ignored for those runs. Each run will only target the specified machine; if it is busy, the run will wait for that machine rather than falling back to other machines or pools.
from cyberdesk import RunBulkCreate

async def bulk_create_with_pools():
    # Create 100 runs that require machines in specific pools
    bulk_data = RunBulkCreate(
        workflow_id='workflow-uuid',
        count=100,
        pool_ids=['customer-a-pool-id', 'excel-pool-id'],
        input_values={
            'task_type': 'data_extraction',
            'priority': 'high'
        }
    )
    
    response = await client.runs.bulk_create(bulk_data)
    
    if response.data:
        print(f"Created {len(response.data.created_runs)} runs")
        print(f"Failed: {response.data.failed_count}")
        # Each run will execute on machines that match all specified pools when available
Bulk Run Assignment: When bulk creating runs with pool requirements, Cyberdesk attempts to assign each run to any available machine that meets all specified pools. If no matching machine is available, runs remain in scheduling until one is free. No specific load balancing guarantees are made.

Real-World Example: Healthcare Integration

Here’s a complete example of retrieving patient data from an Epic EHR system using Cyberdesk:
from fastapi import FastAPI, HTTPException, Body
from cyberdesk import CyberdeskClient, RunCreate
import os

app = FastAPI()
client = CyberdeskClient(os.environ['CYBERDESK_API_KEY'])

@app.post("/patients/lookup")
async def get_patient_data(
    patient_id: str = Body(...),
    patient_first_name: str = Body(...),
    patient_last_name: str = Body(...)
):
    """Retrieve patient data from Epic EHR."""
    
    try:
        # Create a run to fetch patient data
        run_data = RunCreate(
            workflow_id='550e8400-e29b-41d4-a716-446655440000',  # Your Epic workflow ID
            machine_id='550e8400-e29b-41d4-a716-446655440001',   # Your Epic machine ID
            input_values={
                'patient_id': patient_id,
                'patient_first_name': patient_first_name,
                'patient_last_name': patient_last_name
            }
        )
        
        response = await client.runs.create(run_data)
        if response.error:
            raise HTTPException(status_code=500, detail=f"Failed to create run: {response.error}")
        
        run = response.data
        print(f"Fetching data for patient {patient_first_name} {patient_last_name} ({patient_id})...")
        
        # Wait for completion (2 minute timeout)
        completed_run = await wait_for_run_completion(client, run.id, 120)
        
        # Process the patient data
        patient_data = completed_run.output_data
        
        return {
            'patientId': patient_id,
            'demographics': patient_data['demographics'],
            'medications': patient_data['medications'],
            'vitals': patient_data['recentVitals'],
            'lastUpdated': patient_data['lastUpdated']
        }
        
    except TimeoutError:
        raise HTTPException(status_code=504, detail="Request timed out")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Other SDK Resources

Important: While the SDK provides full CRUD operations for all Cyberdesk resources, we strongly recommend using the Cyberdesk Dashboard for managing these resources. The dashboard provides a more intuitive interface for:
  • Creating and editing workflows
  • Managing machines
  • Viewing connections
  • Analyzing trajectories
The SDK methods below are provided for advanced use cases and automation scenarios.

Error Handling

All SDK methods return an ApiResponse object with data and error attributes:
response = await client.runs.create(run_data)

if response.error:
    # Handle error
    print(f"Error details: {response.error}")
else:
    # Use data
    print(f"Run created: {response.data.id}")

Common Error Types

ValidationError
dict
Invalid input parameters
{
    "message": "Validation failed",
    "details": {
        "workflow_id": "Invalid UUID format"
    }
}
AuthenticationError
dict
Invalid or missing API key
{
    "message": "Authentication failed",
    "status": 401
}
RateLimitError
dict
Too many requests
{
    "message": "Rate limit exceeded",
    "status": 429,
    "retry_after": 60
}

Exception Handling Pattern

from cyberdesk import CyberdeskClient
import logging

logger = logging.getLogger(__name__)

async def safe_run_creation(client, run_data):
    """Create a run with proper error handling."""
    try:
        response = await client.runs.create(run_data)
        
        if response.error:
            # Log the error
            logger.error(f"API error: {response.error}")
            
            # Handle specific error types
            if hasattr(response.error, 'status'):
                if response.error.status == 429:
                    # Rate limited - wait and retry
                    await asyncio.sleep(response.error.get('retry_after', 60))
                    return await client.runs.create(run_data)
                elif response.error.status == 401:
                    raise Exception("Invalid API key")
            
            raise Exception(f"Failed to create run: {response.error}")
        
        return response.data
        
    except Exception as e:
        logger.exception("Unexpected error creating run")
        raise

Type Hints and IDE Support

The SDK provides comprehensive type hints for better IDE support:
from cyberdesk import (
    CyberdeskClient,
    RunResponse,
    RunStatus,
    MachineStatus,
    ConnectionStatus
)
from typing import Optional

def process_run(run: RunResponse) -> Optional[dict]:
    """Process a completed run."""
    if run.status == RunStatus.SUCCESS:
        # IDE knows output_data is available
        return run.output_data
    elif run.status == RunStatus.FAILED:
                    print(f"Run failed: {', '.join(run.error or [])}")
        return None
    else:
        print(f"Run status: {run.status}")
        return None

Working with Jupyter Notebooks

The SDK works seamlessly in Jupyter notebooks:
# In Jupyter, use sync methods or nest async code
from cyberdesk import CyberdeskClient
import pandas as pd

client = CyberdeskClient('YOUR_API_KEY')

# Get recent runs
response = client.runs.list_sync(limit=10)
runs = response.data.items

# Convert to DataFrame for analysis
df = pd.DataFrame([
    {
        'id': run.id,
        'workflow_id': run.workflow_id,
        'status': run.status,
        'created_at': run.created_at,
        'duration': (run.completed_at - run.created_at).total_seconds() if run.completed_at else None
    }
    for run in runs
])

# Analyze run performance
df.groupby('status').agg({
    'id': 'count',
    'duration': 'mean'
})

Best Practices

Use Environment Variables

Store API keys and workflow IDs in environment variables, never in code.

Implement Retry Logic

Add exponential backoff for transient failures and rate limits.

Handle Timeouts

Set reasonable timeouts for run completion based on your workflow complexity.

Log Everything

Keep detailed logs of run IDs and statuses for debugging and audit trails.

Use Type Hints

Leverage type hints for better IDE support and fewer runtime errors.

Close Connections

Use context managers or explicitly close clients to free resources.

Performance Optimization

Concurrent Operations

When working with multiple operations, use asyncio for better performance:
import asyncio
from cyberdesk import CyberdeskClient

async def process_multiple_patients(patient_ids):
    """Process multiple patients concurrently."""
    async with CyberdeskClient(os.environ['CYBERDESK_API_KEY']) as client:
        # Create runs concurrently
        tasks = [
            create_patient_run(client, patient_id) 
            for patient_id in patient_ids
        ]
        runs = await asyncio.gather(*tasks)
        
        # Wait for all runs to complete
        results = await asyncio.gather(*[
            wait_for_run_completion(client, run.id) 
            for run in runs if run
        ])
        
        return results

# Process 10 patients in parallel
results = asyncio.run(process_multiple_patients(patient_ids[:10]))

Connection Pooling

The SDK automatically manages connection pooling for optimal performance. No additional configuration is needed.

Next Steps