The Precipiq Python SDK gives you a thread-safe sync client (Precipiq) and an async twin (AsyncPrecipiq) that buffer decisions locally and flush them to the ledger in the background. Both clients share the same public surface, so you can switch between them without rewriting your integration logic.
Install
The version documented on this page is 0.1.0.
Overview
The public surface of the precipiq package:
Precipiq / AsyncPrecipiq — sync and async clients
precipiq.integrations.langchain — LangChain callback handler
precipiq.integrations.crewai — CrewAI callback handler
Both integration modules are import-time optional. You never pay the cost of importing LangChain or CrewAI unless you explicitly import the adapter yourself.
from precipiq import Precipiq
pq = Precipiq(api_key="pq_live_...")
@pq.track(agent_id="sales-bot", action_type="email_send")
def send_email(to: str) -> dict:
...
return {"sent": True}
Precipiq client
Precipiq is the thread-safe synchronous entry point. It buffers decisions in memory and flushes them on a background thread, so your hot path never blocks on network I/O.
Constructor
| Argument | Type | Description |
|---|
api_key | str | Raw API key issued by Precipiq (pq_live_… or pq_test_…). |
base_url | str | None | Override the default API root for self-hosted deployments. |
flush_threshold | int | Number of queued items that trigger a forced flush. Default: 50. |
flush_interval | float | Seconds between background flushes. Default: 5. |
enable_batching | bool | When False, every log_decision ships synchronously — useful for tests and scripts. Default: True. |
raise_on_error | bool | When True, surfaces API failures as exceptions instead of writing to the fallback log. Default: False. |
fallback_log_path | str | None | Override the rotating fallback log destination. Default: ~/.precipiq/fallback.log. |
timeout | float | None | Per-request timeout in seconds. |
pq = Precipiq(api_key="pq_live_...")
@pq.track(agent_id="sales-bot", action_type="email_send")
def send_email(to: str) -> dict:
...
return {"sent": True}
log_decision
Precipiq.log_decision(
agent_id: str,
action_type: str,
inputs: dict[str, Any],
outputs: dict[str, Any],
confidence: float,
alternatives: list[dict[str, Any]] | None = None,
human_in_loop: bool = False,
metadata: dict[str, Any] | None = None,
)
Queue one decision (or POST it immediately when enable_batching=False). Returns the server receipt when batching is disabled; returns None when the decision is queued for a later batched ship.
receipt = pq.log_decision(
agent_id="pricing-bot",
action_type="discount_offer",
inputs={"customer_id": "cust_123", "tier": "gold"},
outputs={"discount_pct": 15},
confidence=0.82,
human_in_loop=False,
metadata={"model_version": "v2.3.1"},
)
flush
Drain the buffer and ship every queued decision in order. A single-item drain goes through POST /api/v1/decisions; larger drains use the batch endpoint POST /api/v1/decisions/batch (up to 100 decisions per call), so N queued decisions cost ceil(N/100) round-trips instead of N. The server preserves hash-chain order inside each batch.
Call flush() during graceful shutdown to ensure no decisions are lost. Alternatively, use close() which flushes and then stops the background thread and closes the HTTP connection.
link_outcome
Precipiq.link_outcome(
decision_id: str | UUID,
financial_event: str | UUID,
correlation_strength: float,
link_type: str,
attribution_method: str = "manual",
notes: str | None = None,
)
Link a previously logged decision to a financial event. decision_id and financial_event are positional so callers can write pq.link_outcome(decision_id, event_id, ...). correlation_strength and link_type are keyword-only so every link is explicit about its probabilistic weighting and economic character.
pq.link_outcome(
"d5e7a3b0-0000-0000-0000-000000000001",
"fev_abc",
correlation_strength=1.0,
link_type="revenue",
attribution_method="direct",
notes="upsell offer accepted",
)
get_ai_pnl
Precipiq.get_ai_pnl(
start: datetime | str | None = None,
end: datetime | str | None = None,
agent_id: str | None = None,
)
Fetch the aggregated AI P&L for the authenticated org. Optionally filter by date range and agent.
from datetime import date
pnl = pq.get_ai_pnl(start="2026-04-01", end="2026-04-30")
if pnl is not None:
print(pnl["total_revenue_attributed"], pnl["currency"])
print(pnl["net_ai_impact"])
track
Precipiq.track(
agent_id: str,
action_type: str,
capture_inputs: bool = True,
capture_outputs: bool = True,
confidence: float = 1.0,
**metadata: Any,
)
Decorator factory that logs a decision on every invocation of the wrapped function. The function’s identity (name, docstring, signature) is preserved via functools.wraps. Extra **metadata keyword arguments are stamped onto every decision produced by the decorated function, letting you attach deployment context (version, env, etc.) without a second call.
@pq.track(agent_id="support-bot", action_type="reply", version="2.1")
async def reply(ticket_id: str) -> str:
...
track works on both regular and async def functions — coroutines are detected via inspect.iscoroutinefunction and wrapped accordingly.
close
Stop the background flusher, drain any pending decisions, and close the HTTP connection. Call this when your application exits if you are not using AsyncPrecipiq as a context manager.
AsyncPrecipiq client
AsyncPrecipiq is the async twin of Precipiq. All methods are coroutines and batching uses an asyncio.Lock with a background task instead of a thread, so concurrent log_decision calls cannot interleave writes into the buffer.
Use it as a context manager for automatic cleanup:
async with AsyncPrecipiq(api_key="pq_live_...") as pq:
await pq.log_decision(
agent_id="refund-bot",
action_type="approve",
inputs={"ticket_id": "t1"},
outputs={"refund_amount": 100},
confidence=0.95,
)
The async client exposes the same methods as Precipiq — log_decision, flush, link_outcome, get_ai_pnl, track, and close — all as async def coroutines. See the sync client docs above for argument signatures and descriptions.
AsyncPrecipiq.flush() drains and ships queued decisions, batching when there is more than one item — the same ceil(N/100) round-trip logic as the sync client.
Exceptions
All SDK exceptions inherit from PrecipiqError so you can catch the base class when you want to handle all SDK failures uniformly.
| Exception | When raised |
|---|
PrecipiqError | Base class for every exception raised by the SDK. |
PrecipiqAuthError | The server returned 401 or 403. API keys are redacted in the message; the raw key is never stored on the exception instance. |
PrecipiqAPIError | Any non-2xx response that is not 401/403. Has status_code (HTTP status) and body (parsed or raw response body, truncated to 512 characters) attributes. |
PrecipiqTransportError | The HTTP transport failed — DNS, connection error, or timeout. Wraps the underlying httpx exception. |
from precipiq import Precipiq, PrecipiqAuthError, PrecipiqAPIError, PrecipiqTransportError
pq = Precipiq(api_key="pq_live_...", raise_on_error=True)
try:
pq.log_decision(
agent_id="pricing-bot",
action_type="discount_offer",
inputs={"customer_id": "cust_123"},
outputs={"discount_pct": 15},
confidence=0.82,
)
except PrecipiqAuthError:
# 401 / 403 — rotate or validate the API key
pass
except PrecipiqAPIError as e:
print(e.status_code, e.body)
except PrecipiqTransportError:
# Network or timeout failure
pass
Exceptions are only raised when raise_on_error=True. With the default raise_on_error=False, failures are written to the fallback log at ~/.precipiq/fallback.log and swallowed silently.
Integrations
LangChain
PrecipiqLangChainCallback is a LangChain BaseCallbackHandler that automatically tracks LLM, tool, and chain calls. Streaming responses are handled correctly — tokens are aggregated per run_id and a single consolidated decision is shipped on on_llm_end, so you get one record per completion rather than one per token.
from langchain_openai import ChatOpenAI
from precipiq import Precipiq
from precipiq.integrations.langchain import PrecipiqLangChainCallback
pq = Precipiq(api_key="pq_live_...")
llm = ChatOpenAI(callbacks=[PrecipiqLangChainCallback(pq, agent_id="qa-bot")])
See the LangChain integration guide for full configuration options.
CrewAI
PrecipiqCrewAICallback tracks every agent task execution. Pass it as the step_callback on a crewai.Agent or crewai.Crew.
from crewai import Agent
from precipiq import Precipiq
from precipiq.integrations.crewai import PrecipiqCrewAICallback
pq = Precipiq(api_key="pq_live_...")
agent = Agent(
role="Researcher",
step_callback=PrecipiqCrewAICallback(pq, agent_id="researcher"),
)
See the CrewAI integration guide for full configuration options.