Get up and running with Cyberdesk in under 5 minutes. This guide assumes you’ve already created workflows in the Cyberdesk Dashboard.
1
Install the SDK
Copy
Ask AI
pip install cyberdesk
2
Initialize the client and create a run
Copy
Ask AI
import asynciofrom cyberdesk import CyberdeskClient, RunCreateasync def main(): # Initialize the client client = CyberdeskClient('YOUR_API_KEY') # Create a run for your workflow run_data = RunCreate( workflow_id='your-workflow-id', machine_id='your-machine-id', input_values={ 'patient_id': '12345', 'patient_first_name': 'John', 'patient_last_name': 'Doe' } ) response = await client.runs.create(run_data) if response.error: print(f"Error creating run: {response.error}") return run = response.data # Wait for the run to complete while run.status in ['scheduling', 'running']: await asyncio.sleep(5) # Wait 5 seconds response = await client.runs.get(run.id) run = response.data # Get the output data if run.status == 'success': print('Patient data:', run.output_data) else: print('Run failed:', ', '.join(run.error or []))# Run the async functionasyncio.run(main())
Create and manage workflows in the Cyberdesk Dashboard. The dashboard supports rich, multimodal prompts — you can insert images alongside text to help the agent understand tricky UI elements. Use the SDK to execute runs against those workflows.
If your workflow prompt references sensitive variables using the {$variable} syntax (for example, {$password}), pass those values via sensitive_input_values.
Copy
Ask AI
from cyberdesk import RunCreaterun_data = RunCreate( workflow_id='workflow-uuid', machine_id='machine-uuid', input_values={ # non-sensitive inputs 'patient_id': '12345' }, sensitive_input_values={ # referenced in your prompt as {$password} 'password': 's3cr3tP@ss' })response = await client.runs.create(run_data)
Sensitive inputs are stored in a secure third‑party secret vault (Basis Theory) only for the duration of the run. They are not logged in Cyberdesk, and they are not sent to any LLMs. The values are only resolved at the last moment during actual computer actions (e.g., when typing). After the run completes, these sensitive values are deleted from the vault. On the dashboard, sensitive inputs are never displayed and will not be prefilled when repeating a run.
You can specify pool requirements when creating a run. This ensures your run is executed on a machine that belongs to ALL specified pools. This is especially useful for:
Running workflows on customer-specific machines
Requiring machines with specific software installed
Organizing machines by location or capability
Copy
Ask AI
from cyberdesk import RunCreateasync def create_run_with_pools(): # Get pool IDs (typically from your configuration or database) customer_pool_id = 'pool-uuid-1' # e.g., "Customer A" pool excel_pool_id = 'pool-uuid-2' # e.g., "Has Excel" pool run_data = RunCreate( workflow_id='workflow-uuid', # Machine must be in BOTH pools (intersection, not union) pool_ids=[customer_pool_id, excel_pool_id], input_values={ 'patient_id': '12345', 'patient_first_name': 'John', 'patient_last_name': 'Doe' } ) response = await client.runs.create(run_data) if response.error: print(f"Failed to create run: {response.error}") else: print(f"Run created: {response.data.id}") print(f"Will execute on machine in pools: {run_data.pool_ids}") return response.data
Pool Matching Logic: When you specify multiple pools, Cyberdesk will only select machines that belong to ALL specified pools (intersection). For example, if you specify ["Customer A", "Has Excel"], only machines that are in both pools will be considered.
If you provide a machine_id when creating a run, pool_ids are ignored. Cyberdesk will only attempt the specified machine; if it’s busy or unavailable, the run will wait until that machine is free (no fallback to other machines or pools).
Creating and Managing Pools: While you can manage pools via the SDK, we recommend using the Cyberdesk Dashboard for a more intuitive experience:
Navigate to any machine in the dashboard
Click on the machine to view its details
Add the machine to existing pools or create new pools
Assign multiple pools to organize machines by customer, capability, or location
Common pool strategies:
By Customer: “Customer A”, “Customer B”, etc.
By Software: “Has Excel”, “Has Chrome”, “Has Epic EHR”
By Environment: “Production”, “Staging”, “Development”
Here’s a robust pattern for waiting for runs to complete:
Copy
Ask AI
import asynciofrom datetime import datetime, timedeltaasync def wait_for_run_completion(client, run_id, timeout_seconds=300): """Wait for a run to complete with timeout.""" start_time = datetime.now() timeout = timedelta(seconds=timeout_seconds) while datetime.now() - start_time < timeout: response = await client.runs.get(run_id) if response.error: raise Exception(f"Failed to get run status: {response.error}") run = response.data if run.status == 'success': return run if run.status in ['error', 'cancelled']: raise Exception(f"Run {run.status}: {', '.join(run.error or ['Unknown error'])}") await asyncio.sleep(5) # Poll every 5 seconds raise TimeoutError(f"Run timed out after {timeout_seconds} seconds")# Usagetry: completed_run = await wait_for_run_completion(client, run.id) print("Output:", completed_run.output_data)except Exception as e: print(f"Run failed: {e}")
You can list all attachments for a specific run and filter them by type (INPUT or OUTPUT).
Copy
Ask AI
from cyberdesk import AttachmentType# List all attachments for a runresponse = await client.run_attachments.list(run_id='run-uuid')attachments = response.data.items# List only output attachmentsresponse = await client.run_attachments.list( run_id='run-uuid', attachment_type=AttachmentType.OUTPUT)output_files = response.data.items
Get a signed URL that triggers automatic download when accessed. This is perfect for web applications where you want to provide download links to users.
Copy
Ask AI
# Get a download URL with custom expiration (default: 5 minutes)response = await client.run_attachments.get_download_url( 'attachment-uuid', expires_in=600 # 10 minutes)if response.data: print(f"Download URL: {response.data.url}") print(f"Expires in: {response.data.expires_in} seconds") # You can use this URL in your web app or share it # The URL will trigger automatic download when accessed
Download the file content directly as bytes. Useful when you need to process the file in memory.
Copy
Ask AI
# Get the attachment metadata firstresponse = await client.run_attachments.get('attachment-uuid')attachment_info = response.data# Download the file contentresponse = await client.run_attachments.download(attachment_info.id)if not response.error: # Save the file with open(attachment_info.filename, "wb") as f: f.write(response.data) print(f"Downloaded {attachment_info.filename}")
The SDK provides a convenience method that downloads and saves the file in one operation.
Copy
Ask AI
# Save directly to a fileresponse = await client.run_attachments.save_to_file( 'attachment-uuid', output_path='./downloads/' # Will use original filename)if response.data: print(f"Saved to: {response.data['path']}") print(f"File size: {response.data['size']} bytes")# Or specify a custom filenameresponse = await client.run_attachments.save_to_file( 'attachment-uuid', output_path='./downloads/custom-name.pdf')
Here’s a full example of a workflow that processes a file.
Workflow Prompt: "Take the file at ~/CyberdeskTransfers/report.txt, add a summary to the end of it, and mark it for export."
Workflow Setting: includes_file_exports is set to True.
Copy
Ask AI
import asyncioimport base64from cyberdesk import CyberdeskClient, RunCreate, FileInput, AttachmentTypeasync def main(): async with CyberdeskClient("YOUR_API_KEY") as client: # 1. Prepare and upload the input file report_content = "This is the initial report content." encoded_content = base64.b64encode(report_content.encode()).decode() run_data = RunCreate( workflow_id="your-file-processing-workflow-id", file_inputs=[ FileInput(filename="report.txt", content=encoded_content) ] ) response = await client.runs.create(run_data) run = response.data print(f"Run started: {run.id}") # 2. Wait for the run to complete completed_run = await wait_for_run_completion(client, run.id) print("Run finished with status:", completed_run.status) # 3. Find and download the output attachment if completed_run.status == 'success': response = await client.run_attachments.list( run_id=completed_run.id, attachment_type=AttachmentType.OUTPUT ) output_attachments = response.data.items if output_attachments: processed_report = output_attachments[0] # Option 1: Get a download URL (for web apps) url_response = await client.run_attachments.get_download_url(processed_report.id) if url_response.data: print(f"Download URL: {url_response.data.url}") print(f"Valid for: {url_response.data.expires_in} seconds") # Option 2: Download the processed file directly response = await client.run_attachments.download(processed_report.id) if not response.error: # Decode and print the content processed_content = response.data.decode() print("\n--- Processed Report ---") print(processed_content) print("------------------------") else: print(f"Failed to download processed file: {response.error}") else: print("No output files were generated.")# Assuming wait_for_run_completion is defined as in the previous examplesasyncio.run(main())
This example demonstrates the complete lifecycle: uploading a file with a run, executing a workflow that modifies it, and then retrieving the processed file from the run’s output attachments.
At its core, a session is a reservation of a single machine. While a session is active, that machine is dedicated to your session only — no unrelated runs will be scheduled onto it. This guarantees your multi‑step automations run back‑to‑back on the same desktop without interference.What you get from a session:
Exclusive access to one machine for the session’s duration (strong scheduling guarantee)
Deterministic “step 1 → step 2 → …” behavior with no opportunistic interleaving
Chains are a convenient way to create multiple runs that execute back‑to‑back in the same session. Instead of manually creating individual runs and managing their sequencing, you can define all your workflow steps upfront and let Cyberdesk handle the session management and execution order.
Once you have multiple workflows running in the same session, you’ll often want to pass outputs from earlier steps as inputs to later ones. Refs make this seamless — simply reference a previous step’s output using a JSON object:
Copy
Ask AI
{"$ref": "step1.outputs.result"}
You can construct these as plain Python dicts when building chain steps.
Ad‑hoc sessions without a chain (start with a single run, then add more)
You can start a session with a normal run and then submit additional runs referencing the same session_id — useful when downstream steps are conditional or discovered at runtime.
Copy
Ask AI
# 1) Start a session and warm up the desktopwarmup = client.runs.create_sync(RunCreate( workflow_id='login-workflow-id', pool_ids=['customer-a'], start_session=True, input_values={'username': 'alice'})).datasession_id = warmup.session_id# 2) Add another run in the same session — scheduling remains exclusiveclient.runs.create_sync(RunCreate( workflow_id='search-workflow-id', session_id=session_id, input_values={'query': 'recent orders'}))# 3) Final run that releases the session when completeclient.runs.create_sync(RunCreate( workflow_id='cleanup-workflow-id', session_id=session_id, release_session_after=True, # Release the session after this run completes input_values={'cleanup': 'true'}))
Automatic session release with release_session_after
When creating individual runs in a session (not using chains), you can use release_session_after=True to automatically release the session when that run completes (regardless of success or failure):
Copy
Ask AI
# This run will release the session after it completesfinal_run = client.runs.create_sync(RunCreate( workflow_id='final-workflow-id', session_id=existing_session_id, release_session_after=True, input_values={'finalize': 'true'}))
This is useful mainly as a convenience, so you don’t have to decouple creating a session ending run and actually ending the session.Note: The session is released when the run completes, whether it succeeds, fails, or is cancelled. This ensures the session doesn’t remain locked if something goes wrong.
EHR workflows: Log into Epic, navigate to a specific patient, extract their data, then upload documents to their chart — all with no interruptions from other miscellaneous runs.
Financial reporting: Export monthly reports from your ERP system, transform the data in Excel, then re‑import the processed results — all back‑to‑back without interference.
Document processing: Download files from a web portal, process them with a local application, then upload the results back — ensuring no other runs interfere with your workflow.
When creating multiple runs in bulk, you can also specify pool requirements. All runs will be distributed across machines that match the pool criteria.
If you provide a machine_id in a bulk run request, pool_ids are ignored for those runs. Each run will only target the specified machine; if it is busy, the run will wait for that machine rather than falling back to other machines or pools.
Copy
Ask AI
from cyberdesk import RunBulkCreateasync def bulk_create_with_pools(): # Create 100 runs that require machines in specific pools bulk_data = RunBulkCreate( workflow_id='workflow-uuid', count=100, pool_ids=['customer-a-pool-id', 'excel-pool-id'], input_values={ 'task_type': 'data_extraction', 'priority': 'high' } ) response = await client.runs.bulk_create(bulk_data) if response.data: print(f"Created {len(response.data.created_runs)} runs") print(f"Failed: {response.data.failed_count}") # Each run will execute on machines that match all specified pools when available
Bulk Run Assignment: When bulk creating runs with pool requirements, Cyberdesk attempts to assign each run to any available machine that meets all specified pools. If no matching machine is available, runs remain in scheduling until one is free. No specific load balancing guarantees are made.
Important: While the SDK provides full CRUD operations for all Cyberdesk resources, we strongly recommend using the Cyberdesk Dashboard for managing these resources. The dashboard provides a more intuitive interface for:
Creating and editing workflows
Managing machines
Viewing connections
Analyzing trajectories
The SDK methods below are provided for advanced use cases and automation scenarios.
Pools
Copy
Ask AI
from cyberdesk import PoolCreate, PoolUpdate, MachinePoolUpdate# List poolsresponse = await client.pools.list()pools = response.data.items# Create a poolpool_data = PoolCreate( name='Customer A', description='All machines for Customer A')response = await client.pools.create(pool_data)# Get a pool (with optional machine list)response = await client.pools.get('pool-id', include_machines=True)# Update a poolupdate_data = PoolUpdate(description='Updated description')response = await client.pools.update('pool-id', update_data)# Add machines to a poolfrom cyberdesk import MachinePoolAssignmentassignment_data = MachinePoolAssignment( machine_ids=['machine-1', 'machine-2'])response = await client.pools.add_machines('pool-id', assignment_data)# Update a machine's poolspool_update = MachinePoolUpdate( pool_ids=['pool-1', 'pool-2', 'pool-3'])response = await client.machines.update_pools('machine-id', pool_update)# Delete a poolresponse = await client.pools.delete('pool-id')
Machines
Copy
Ask AI
from cyberdesk import MachineCreate, MachineUpdate# List machinesresponse = await client.machines.list()machines = response.data.items# Create a machinemachine_data = MachineCreate( name='Epic EHR Machine', description='Production Epic environment')response = await client.machines.create(machine_data)# Get a machineresponse = await client.machines.get('machine-id')machine = response.data# Update a machineupdate_data = MachineUpdate(name='Updated Name')response = await client.machines.update('machine-id', update_data)# Delete a machineresponse = await client.machines.delete('machine-id')
Workflows
Copy
Ask AI
from cyberdesk import WorkflowCreate, WorkflowUpdate# List workflowsresponse = await client.workflows.list()# Create a workflowworkflow_data = WorkflowCreate( name='Patient Data Extraction', description='Extracts patient demographics and medications', main_prompt='Navigate to patient chart and extract data')response = await client.workflows.create(workflow_data)# Get a workflowresponse = await client.workflows.get('workflow-id')# Update a workflowupdate_data = WorkflowUpdate(description='Updated description')response = await client.workflows.update('workflow-id', update_data)# Delete a workflowresponse = await client.workflows.delete('workflow-id')
Connections
Copy
Ask AI
from cyberdesk import ConnectionCreate, ConnectionStatus# List connectionsresponse = await client.connections.list()# Create a connectionconnection_data = ConnectionCreate(machine_id='machine-id')response = await client.connections.create(connection_data)# Filter by machine and statusresponse = await client.connections.list( machine_id='machine-id', status=ConnectionStatus.ACTIVE)
Trajectories
Copy
Ask AI
from cyberdesk import TrajectoryCreate, TrajectoryUpdate# List trajectoriesresponse = await client.trajectories.list()# Get a trajectoryresponse = await client.trajectories.get('trajectory-id')# Get latest trajectory for a workflowresponse = await client.trajectories.get_latest_for_workflow('workflow-id')# Create a trajectorytrajectory_data = TrajectoryCreate( workflow_id='workflow-id', steps=[])response = await client.trajectories.create(trajectory_data)# Update a trajectoryupdate_data = TrajectoryUpdate(steps=[])response = await client.trajectories.update('trajectory-id', update_data)# Delete a trajectoryresponse = await client.trajectories.delete('trajectory-id')
When working with multiple operations, use asyncio for better performance:
Copy
Ask AI
import asynciofrom cyberdesk import CyberdeskClientasync def process_multiple_patients(patient_ids): """Process multiple patients concurrently.""" async with CyberdeskClient(os.environ['CYBERDESK_API_KEY']) as client: # Create runs concurrently tasks = [ create_patient_run(client, patient_id) for patient_id in patient_ids ] runs = await asyncio.gather(*tasks) # Wait for all runs to complete results = await asyncio.gather(*[ wait_for_run_completion(client, run.id) for run in runs if run ]) return results# Process 10 patients in parallelresults = asyncio.run(process_multiple_patients(patient_ids[:10]))