Skip to Content
SDK ReferenceAsyncCTClient

AsyncCTClient

Asynchronous Python client for Kaizen.

AsyncCTClient is the async mirror of CTClient. All methods are async def and must be awaited. Identical constructor parameters and method signatures — only the calling convention differs.

from kaizen_sdk import AsyncCTClient

AsyncCTClient supports the async context manager protocol. Use async with AsyncCTClient() as client: to ensure the underlying HTTP connection pool is closed automatically.

async with AsyncCTClient(api_key="sk-...") as client: prompt = await client.get_prompt("summarize_ticket") print(prompt.prompt_text)

Constructor

AsyncCTClient( api_key: str | None = None, base_url: str | None = None, cache_ttl: float = 300.0, timeout: float = 30.0, ) -> None

Parameters

NameTypeDefaultDescription
api_keystr | NoneNoneCT API key. Falls back to the KAIZEN_API_KEY environment variable. Raises CTError if neither is set.
base_urlstr | NoneNoneBase URL of the CT server. Falls back to KAIZEN_BASE_URL env var, then http://localhost:8000.
cache_ttlfloat300.0Time-to-live in seconds for the in-memory prompt cache.
timeoutfloat30.0HTTP request timeout in seconds.

Example

import asyncio from kaizen_sdk import AsyncCTClient async def main(): async with AsyncCTClient() as client: tasks = await client.list_tasks() print(tasks) asyncio.run(main())

log_feedback

async def log_feedback( task_id: str, inputs: dict | None = None, output: str | None = None, score: float | None = None, source: str = "sdk", metadata: dict | None = None, ) -> FeedbackResult

Log a feedback entry for a task asynchronously.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.
inputsdict | NoneNoneInput variables passed to the LLM.
outputstr | NoneNoneThe LLM’s output text.
scorefloat | NoneNoneQuality score in [0.0, 1.0].
sourcestr"sdk"Source label for the feedback entry.
metadatadict | NoneNoneArbitrary JSON metadata.

Returns

FeedbackResult — the created feedback record.

Example

async with AsyncCTClient() as client: result = await client.log_feedback( task_id="3b4e1f2a-...", inputs={"user_message": "Summarize this ticket"}, output="The ticket is about a login issue.", score=0.9, ) print(result.id)

get_prompt

async def get_prompt(task_id: str) -> Prompt

Retrieve the active prompt for a task (TTL-cached).

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.

Returns

Prompt — the active prompt version.

Example

async with AsyncCTClient() as client: prompt = await client.get_prompt("3b4e1f2a-...") print(prompt.prompt_text)

activate_prompt

async def activate_prompt(task_id: str, version_id: str) -> Prompt

Activate a specific prompt version and invalidate the local cache.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.
version_idstrrequiredUUID of the prompt version to activate.

Returns

Prompt — the activated prompt version.

Example

async with AsyncCTClient() as client: prompt = await client.activate_prompt( task_id="3b4e1f2a-...", version_id="7c8d9e0f-...", ) print(f"Activated version {prompt.version_number}")

trigger_optimization

async def trigger_optimization(task_id: str) -> OptimizeResult

Trigger a DSPy optimization run for a task.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task to optimize.

Returns

OptimizeResult — queued job and cost estimate.

Example

async with AsyncCTClient() as client: result = await client.trigger_optimization("3b4e1f2a-...") print(f"Job ID: {result.job.id}") print(f"Estimated cost: ${result.cost_estimate.estimated_cost_usd:.4f}")

get_job

async def get_job(job_id: str) -> Job

Retrieve the current status of an optimization job.

Parameters

NameTypeDefaultDescription
job_idstrrequiredUUID of the optimization job.

Returns

Job — the job with its current status.

Example

import asyncio async with AsyncCTClient() as client: result = await client.trigger_optimization("3b4e1f2a-...") job_id = str(result.job.id) while True: job = await client.get_job(job_id) print(f"Status: {job.status}{job.progress_step}") if job.status in ("SUCCESS", "FAILURE", "PR_FAILED"): break await asyncio.sleep(10) if job.status == "SUCCESS": print(f"PR: {job.pr_url}")

list_tasks

async def list_tasks() -> list[Task]

List all tasks configured on the CT server.

Returns

list[Task] — all tasks.

Example

async with AsyncCTClient() as client: tasks = await client.list_tasks() for task in tasks: print(f"{task.id}: {task.name}")

create_task

async def create_task( name: str, description: str | None = None, schema_json: dict | None = None, feedback_threshold: int = 50, **kwargs, ) -> Task

Create a new task on the CT server.

Parameters

NameTypeDefaultDescription
namestrrequiredUnique human-readable task name.
descriptionstr | NoneNoneOptional description.
schema_jsondict | NoneNoneJSON schema for expected inputs.
feedback_thresholdint50Feedback count before auto-optimization triggers.
**kwargsAdditional task fields (e.g. teacher_model, judge_model).

Returns

Task — the newly created task.

Example

async with AsyncCTClient() as client: task = await client.create_task( name="classify_intent", description="Classifies customer intent from support messages.", feedback_threshold=75, ) print(f"Created: {task.id}")

score

async def score(trace_id: str, score: float, scored_by: str = "sdk") -> TraceResult

Score a previously captured trace by its ID.

Parameters

NameTypeDefaultDescription
trace_idstrrequiredUUID of the trace. Obtained from result.ct_trace_id.
scorefloatrequiredQuality score in [0.0, 1.0].
scored_bystr"sdk"Label identifying who/what scored the trace.

Returns

TraceResult — the updated trace with score applied.

Example

async with AsyncCTClient() as client: trace = await client.score( trace_id=response.ct_trace_id, score=0.85, scored_by="human-review", )

close

async def close() -> None

Close the underlying async HTTP connection pool.

Example

client = AsyncCTClient() try: tasks = await client.list_tasks() finally: await client.close()

Prefer async with AsyncCTClient() as client: — it calls await client.close() automatically.


Sync vs Async Comparison

Sync (CTClient)Async (AsyncCTClient)
with CTClient() as client:async with AsyncCTClient() as client:
client.get_prompt(task_id)await client.get_prompt(task_id)
client.log_feedback(...)await client.log_feedback(...)
client.close()await client.close()

Use CTClient for scripts, CLIs, and synchronous web frameworks (Flask, Django). Use AsyncCTClient for async frameworks (FastAPI, aiohttp) and when you need concurrent CT calls.

Last updated on