Skip to Content

CTClient

Synchronous Python client for Kaizen.

CTClient wraps all CT API endpoints with typed returns, in-memory TTL caching, environment variable configuration, and proper error handling.

from kaizen_sdk import CTClient

CTClient supports the context manager protocol. Use with CTClient() as client: to ensure the underlying HTTP connection pool is closed automatically on exit.

with CTClient(api_key="sk-...") as client: prompt = client.get_prompt("summarize_ticket") print(prompt.prompt_text)

Constructor

CTClient( api_key: str | None = None, base_url: str | None = None, cache_ttl: float = 300.0, timeout: float = 30.0, ) -> None

Parameters

NameTypeDefaultDescription
api_keystr | NoneNoneCT API key. Falls back to the KAIZEN_API_KEY environment variable. Raises CTError if neither is set.
base_urlstr | NoneNoneBase URL of the CT server. Falls back to KAIZEN_BASE_URL env var, then http://localhost:8000.
cache_ttlfloat300.0Time-to-live in seconds for the in-memory prompt cache.
timeoutfloat30.0HTTP request timeout in seconds.

Example

from kaizen_sdk import CTClient # From environment variables (KAIZEN_API_KEY, KAIZEN_BASE_URL) client = CTClient() # Explicit configuration client = CTClient( api_key="sk-my-key", base_url="https://ct.my-company.com", cache_ttl=600.0, timeout=60.0, )

log_feedback

log_feedback( task_id: str, inputs: dict | None = None, output: str | None = None, score: float | None = None, source: str = "sdk", metadata: dict | None = None, ) -> FeedbackResult

Log a feedback entry for a task. Feedback is the training signal that drives prompt optimization.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.
inputsdict | NoneNoneInput variables passed to the LLM (e.g. {"user_message": "..."}).
outputstr | NoneNoneThe LLM’s output text.
scorefloat | NoneNoneQuality score in the range [0.0, 1.0]. None if not yet scored.
sourcestr"sdk"Source label for the feedback entry (e.g. "sdk", "human", "auto").
metadatadict | NoneNoneArbitrary JSON metadata to attach to this feedback entry.

Returns

FeedbackResult — the created feedback record.

Example

from kaizen_sdk import CTClient with CTClient() as client: result = client.log_feedback( task_id="3b4e1f2a-...", inputs={"user_message": "Summarize this ticket"}, output="The ticket is about a login issue.", score=0.9, metadata={"session_id": "abc123"}, ) print(result.id) # UUID of the created feedback record

get_prompt

get_prompt(task_id: str) -> Prompt

Retrieve the active prompt for a task. Results are TTL-cached in memory — repeated calls within cache_ttl seconds return the cached value without hitting the API.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.

Returns

Prompt — the active prompt version for this task.

Example

with CTClient() as client: prompt = client.get_prompt("3b4e1f2a-...") print(prompt.prompt_text) print(f"Version: {prompt.version_number}, Score: {prompt.eval_score}")

Prompts are cached for cache_ttl seconds (default 300s). Use activate_prompt after switching versions — it automatically invalidates the cache.


activate_prompt

activate_prompt(task_id: str, version_id: str) -> Prompt

Activate a specific prompt version and invalidate the local prompt cache for the task.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task.
version_idstrrequiredUUID of the prompt version to activate.

Returns

Prompt — the newly activated prompt version.

Example

with CTClient() as client: # Activate a specific version prompt = client.activate_prompt( task_id="3b4e1f2a-...", version_id="7c8d9e0f-...", ) print(f"Activated version {prompt.version_number}")

trigger_optimization

trigger_optimization(task_id: str) -> OptimizeResult

Trigger a DSPy optimization run for a task. Returns a job handle and a cost estimate.

Parameters

NameTypeDefaultDescription
task_idstrrequiredUUID or string ID of the task to optimize.

Returns

OptimizeResult — contains the queued Job and a CostEstimate.

Example

with CTClient() as client: result = client.trigger_optimization("3b4e1f2a-...") print(f"Job ID: {result.job.id}") print(f"Estimated cost: ${result.cost_estimate.estimated_cost_usd:.4f}") if result.budget_warning: print(f"Warning: {result.budget_warning}")

get_job

get_job(job_id: str) -> Job

Retrieve the current status of an optimization job. Poll this to wait for completion.

Parameters

NameTypeDefaultDescription
job_idstrrequiredUUID of the optimization job.

Returns

Job — the job with its current status and progress information.

Example

import time with CTClient() as client: result = client.trigger_optimization("3b4e1f2a-...") job_id = str(result.job.id) # Poll until done while True: job = client.get_job(job_id) print(f"Status: {job.status}{job.progress_step}") if job.status in ("SUCCESS", "FAILURE", "PR_FAILED"): break time.sleep(10) if job.status == "SUCCESS": print(f"PR: {job.pr_url}")

list_tasks

list_tasks() -> list[Task]

List all tasks configured in the CT server.

Returns

list[Task] — all tasks.

Example

with CTClient() as client: tasks = client.list_tasks() for task in tasks: print(f"{task.id}: {task.name} ({task.feedback_count} feedback entries)")

create_task

create_task( name: str, description: str | None = None, schema_json: dict | None = None, feedback_threshold: int = 50, **kwargs, ) -> Task

Create a new task on the CT server.

Parameters

NameTypeDefaultDescription
namestrrequiredUnique human-readable name for the task (e.g. "summarize_ticket").
descriptionstr | NoneNoneOptional description of what this task does.
schema_jsondict | NoneNoneJSON schema describing the expected input structure.
feedback_thresholdint50Number of feedback entries to collect before triggering auto-optimization.
**kwargsAdditional task fields passed directly to the API (e.g. teacher_model, judge_model).

Returns

Task — the newly created task.

Example

with CTClient() as client: task = client.create_task( name="summarize_ticket", description="Summarizes customer support tickets into one sentence.", feedback_threshold=100, teacher_model="gpt-4o", judge_model="gpt-4o-mini", ) print(f"Created task: {task.id}")

score

score(trace_id: str, score: float, scored_by: str = "sdk") -> TraceResult

Score a previously captured trace by its ID.

Parameters

NameTypeDefaultDescription
trace_idstrrequiredUUID of the trace to score. Obtained from result.ct_trace_id after an instrumented LLM call.
scorefloatrequiredQuality score in the range [0.0, 1.0].
scored_bystr"sdk"Label identifying who/what generated the score.

Returns

TraceResult — the updated trace with score applied.

Example

import litellm from kaizen_sdk import CTClient, instrument instrument(litellm) response = litellm.completion( model="gpt-4o-mini", messages=[{"role": "user", "content": SUMMARIZE_TICKET}], ) # Score the captured trace with CTClient() as client: trace = client.score( trace_id=response.ct_trace_id, score=0.85, scored_by="human-review", ) print(f"Scored trace {trace.id}: {trace.score}")

close

close() -> None

Close the underlying HTTP connection pool. Call this when done with the client if not using the context manager.

Example

client = CTClient() try: tasks = client.list_tasks() finally: client.close()

Prefer using CTClient as a context manager (with CTClient() as client:) — it calls close() automatically on exit.

Last updated on