AsyncCTClient
Asynchronous Python client for Kaizen.
AsyncCTClient is the async mirror of CTClient. All methods are async def and must be awaited. Identical constructor parameters and method signatures — only the calling convention differs.
from kaizen_sdk import AsyncCTClientAsyncCTClient supports the async context manager protocol. Use async with AsyncCTClient() as client: to ensure the underlying HTTP connection pool is closed automatically.
async with AsyncCTClient(api_key="sk-...") as client:
prompt = await client.get_prompt("summarize_ticket")
print(prompt.prompt_text)Constructor
AsyncCTClient(
api_key: str | None = None,
base_url: str | None = None,
cache_ttl: float = 300.0,
timeout: float = 30.0,
) -> NoneParameters
| Name | Type | Default | Description |
|---|---|---|---|
api_key | str | None | None | CT API key. Falls back to the KAIZEN_API_KEY environment variable. Raises CTError if neither is set. |
base_url | str | None | None | Base URL of the CT server. Falls back to KAIZEN_BASE_URL env var, then http://localhost:8000. |
cache_ttl | float | 300.0 | Time-to-live in seconds for the in-memory prompt cache. |
timeout | float | 30.0 | HTTP request timeout in seconds. |
Example
import asyncio
from kaizen_sdk import AsyncCTClient
async def main():
async with AsyncCTClient() as client:
tasks = await client.list_tasks()
print(tasks)
asyncio.run(main())log_feedback
async def log_feedback(
task_id: str,
inputs: dict | None = None,
output: str | None = None,
score: float | None = None,
source: str = "sdk",
metadata: dict | None = None,
) -> FeedbackResultLog a feedback entry for a task asynchronously.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
task_id | str | required | UUID or string ID of the task. |
inputs | dict | None | None | Input variables passed to the LLM. |
output | str | None | None | The LLM’s output text. |
score | float | None | None | Quality score in [0.0, 1.0]. |
source | str | "sdk" | Source label for the feedback entry. |
metadata | dict | None | None | Arbitrary JSON metadata. |
Returns
FeedbackResult — the created feedback record.
Example
async with AsyncCTClient() as client:
result = await client.log_feedback(
task_id="3b4e1f2a-...",
inputs={"user_message": "Summarize this ticket"},
output="The ticket is about a login issue.",
score=0.9,
)
print(result.id)get_prompt
async def get_prompt(task_id: str) -> PromptRetrieve the active prompt for a task (TTL-cached).
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
task_id | str | required | UUID or string ID of the task. |
Returns
Prompt — the active prompt version.
Example
async with AsyncCTClient() as client:
prompt = await client.get_prompt("3b4e1f2a-...")
print(prompt.prompt_text)activate_prompt
async def activate_prompt(task_id: str, version_id: str) -> PromptActivate a specific prompt version and invalidate the local cache.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
task_id | str | required | UUID or string ID of the task. |
version_id | str | required | UUID of the prompt version to activate. |
Returns
Prompt — the activated prompt version.
Example
async with AsyncCTClient() as client:
prompt = await client.activate_prompt(
task_id="3b4e1f2a-...",
version_id="7c8d9e0f-...",
)
print(f"Activated version {prompt.version_number}")trigger_optimization
async def trigger_optimization(task_id: str) -> OptimizeResultTrigger a DSPy optimization run for a task.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
task_id | str | required | UUID or string ID of the task to optimize. |
Returns
OptimizeResult — queued job and cost estimate.
Example
async with AsyncCTClient() as client:
result = await client.trigger_optimization("3b4e1f2a-...")
print(f"Job ID: {result.job.id}")
print(f"Estimated cost: ${result.cost_estimate.estimated_cost_usd:.4f}")get_job
async def get_job(job_id: str) -> JobRetrieve the current status of an optimization job.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
job_id | str | required | UUID of the optimization job. |
Returns
Job — the job with its current status.
Example
import asyncio
async with AsyncCTClient() as client:
result = await client.trigger_optimization("3b4e1f2a-...")
job_id = str(result.job.id)
while True:
job = await client.get_job(job_id)
print(f"Status: {job.status} — {job.progress_step}")
if job.status in ("SUCCESS", "FAILURE", "PR_FAILED"):
break
await asyncio.sleep(10)
if job.status == "SUCCESS":
print(f"PR: {job.pr_url}")list_tasks
async def list_tasks() -> list[Task]List all tasks configured on the CT server.
Returns
list[Task] — all tasks.
Example
async with AsyncCTClient() as client:
tasks = await client.list_tasks()
for task in tasks:
print(f"{task.id}: {task.name}")create_task
async def create_task(
name: str,
description: str | None = None,
schema_json: dict | None = None,
feedback_threshold: int = 50,
**kwargs,
) -> TaskCreate a new task on the CT server.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique human-readable task name. |
description | str | None | None | Optional description. |
schema_json | dict | None | None | JSON schema for expected inputs. |
feedback_threshold | int | 50 | Feedback count before auto-optimization triggers. |
**kwargs | Additional task fields (e.g. teacher_model, judge_model). |
Returns
Task — the newly created task.
Example
async with AsyncCTClient() as client:
task = await client.create_task(
name="classify_intent",
description="Classifies customer intent from support messages.",
feedback_threshold=75,
)
print(f"Created: {task.id}")score
async def score(trace_id: str, score: float, scored_by: str = "sdk") -> TraceResultScore a previously captured trace by its ID.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
trace_id | str | required | UUID of the trace. Obtained from result.ct_trace_id. |
score | float | required | Quality score in [0.0, 1.0]. |
scored_by | str | "sdk" | Label identifying who/what scored the trace. |
Returns
TraceResult — the updated trace with score applied.
Example
async with AsyncCTClient() as client:
trace = await client.score(
trace_id=response.ct_trace_id,
score=0.85,
scored_by="human-review",
)close
async def close() -> NoneClose the underlying async HTTP connection pool.
Example
client = AsyncCTClient()
try:
tasks = await client.list_tasks()
finally:
await client.close()Prefer async with AsyncCTClient() as client: — it calls await client.close() automatically.
Sync vs Async Comparison
Sync (CTClient) | Async (AsyncCTClient) |
|---|---|
with CTClient() as client: | async with AsyncCTClient() as client: |
client.get_prompt(task_id) | await client.get_prompt(task_id) |
client.log_feedback(...) | await client.log_feedback(...) |
client.close() | await client.close() |
Use CTClient for scripts, CLIs, and synchronous web frameworks (Flask, Django). Use AsyncCTClient for async frameworks (FastAPI, aiohttp) and when you need concurrent CT calls.