Skip to main content
The Precipiq Python SDK gives you a thread-safe sync client (Precipiq) and an async twin (AsyncPrecipiq) that buffer decisions locally and flush them to the ledger in the background. Both clients share the same public surface, so you can switch between them without rewriting your integration logic.

Install

pip install precipiq
The version documented on this page is 0.1.0.

Overview

The public surface of the precipiq package:
  • Precipiq / AsyncPrecipiq — sync and async clients
  • precipiq.integrations.langchain — LangChain callback handler
  • precipiq.integrations.crewai — CrewAI callback handler
Both integration modules are import-time optional. You never pay the cost of importing LangChain or CrewAI unless you explicitly import the adapter yourself.
from precipiq import Precipiq

pq = Precipiq(api_key="pq_live_...")

@pq.track(agent_id="sales-bot", action_type="email_send")
def send_email(to: str) -> dict:
    ...
    return {"sent": True}

Precipiq client

Precipiq is the thread-safe synchronous entry point. It buffers decisions in memory and flushes them on a background thread, so your hot path never blocks on network I/O.

Constructor

ArgumentTypeDescription
api_keystrRaw API key issued by Precipiq (pq_live_… or pq_test_…).
base_urlstr | NoneOverride the default API root for self-hosted deployments.
flush_thresholdintNumber of queued items that trigger a forced flush. Default: 50.
flush_intervalfloatSeconds between background flushes. Default: 5.
enable_batchingboolWhen False, every log_decision ships synchronously — useful for tests and scripts. Default: True.
raise_on_errorboolWhen True, surfaces API failures as exceptions instead of writing to the fallback log. Default: False.
fallback_log_pathstr | NoneOverride the rotating fallback log destination. Default: ~/.precipiq/fallback.log.
timeoutfloat | NonePer-request timeout in seconds.
pq = Precipiq(api_key="pq_live_...")

@pq.track(agent_id="sales-bot", action_type="email_send")
def send_email(to: str) -> dict:
    ...
    return {"sent": True}

log_decision

Precipiq.log_decision(
    agent_id: str,
    action_type: str,
    inputs: dict[str, Any],
    outputs: dict[str, Any],
    confidence: float,
    alternatives: list[dict[str, Any]] | None = None,
    human_in_loop: bool = False,
    metadata: dict[str, Any] | None = None,
)
Queue one decision (or POST it immediately when enable_batching=False). Returns the server receipt when batching is disabled; returns None when the decision is queued for a later batched ship.
receipt = pq.log_decision(
    agent_id="pricing-bot",
    action_type="discount_offer",
    inputs={"customer_id": "cust_123", "tier": "gold"},
    outputs={"discount_pct": 15},
    confidence=0.82,
    human_in_loop=False,
    metadata={"model_version": "v2.3.1"},
)

flush

Precipiq.flush()
Drain the buffer and ship every queued decision in order. A single-item drain goes through POST /api/v1/decisions; larger drains use the batch endpoint POST /api/v1/decisions/batch (up to 100 decisions per call), so N queued decisions cost ceil(N/100) round-trips instead of N. The server preserves hash-chain order inside each batch.
Call flush() during graceful shutdown to ensure no decisions are lost. Alternatively, use close() which flushes and then stops the background thread and closes the HTTP connection.
Precipiq.link_outcome(
    decision_id: str | UUID,
    financial_event: str | UUID,
    correlation_strength: float,
    link_type: str,
    attribution_method: str = "manual",
    notes: str | None = None,
)
Link a previously logged decision to a financial event. decision_id and financial_event are positional so callers can write pq.link_outcome(decision_id, event_id, ...). correlation_strength and link_type are keyword-only so every link is explicit about its probabilistic weighting and economic character.
pq.link_outcome(
    "d5e7a3b0-0000-0000-0000-000000000001",
    "fev_abc",
    correlation_strength=1.0,
    link_type="revenue",
    attribution_method="direct",
    notes="upsell offer accepted",
)

get_ai_pnl

Precipiq.get_ai_pnl(
    start: datetime | str | None = None,
    end: datetime | str | None = None,
    agent_id: str | None = None,
)
Fetch the aggregated AI P&L for the authenticated org. Optionally filter by date range and agent.
from datetime import date

pnl = pq.get_ai_pnl(start="2026-04-01", end="2026-04-30")
if pnl is not None:
    print(pnl["total_revenue_attributed"], pnl["currency"])
    print(pnl["net_ai_impact"])

track

Precipiq.track(
    agent_id: str,
    action_type: str,
    capture_inputs: bool = True,
    capture_outputs: bool = True,
    confidence: float = 1.0,
    **metadata: Any,
)
Decorator factory that logs a decision on every invocation of the wrapped function. The function’s identity (name, docstring, signature) is preserved via functools.wraps. Extra **metadata keyword arguments are stamped onto every decision produced by the decorated function, letting you attach deployment context (version, env, etc.) without a second call.
@pq.track(agent_id="support-bot", action_type="reply", version="2.1")
async def reply(ticket_id: str) -> str:
    ...
track works on both regular and async def functions — coroutines are detected via inspect.iscoroutinefunction and wrapped accordingly.

close

Precipiq.close()
Stop the background flusher, drain any pending decisions, and close the HTTP connection. Call this when your application exits if you are not using AsyncPrecipiq as a context manager.

AsyncPrecipiq client

AsyncPrecipiq is the async twin of Precipiq. All methods are coroutines and batching uses an asyncio.Lock with a background task instead of a thread, so concurrent log_decision calls cannot interleave writes into the buffer. Use it as a context manager for automatic cleanup:
async with AsyncPrecipiq(api_key="pq_live_...") as pq:
    await pq.log_decision(
        agent_id="refund-bot",
        action_type="approve",
        inputs={"ticket_id": "t1"},
        outputs={"refund_amount": 100},
        confidence=0.95,
    )
The async client exposes the same methods as Precipiqlog_decision, flush, link_outcome, get_ai_pnl, track, and close — all as async def coroutines. See the sync client docs above for argument signatures and descriptions. AsyncPrecipiq.flush() drains and ships queued decisions, batching when there is more than one item — the same ceil(N/100) round-trip logic as the sync client.

Exceptions

All SDK exceptions inherit from PrecipiqError so you can catch the base class when you want to handle all SDK failures uniformly.
ExceptionWhen raised
PrecipiqErrorBase class for every exception raised by the SDK.
PrecipiqAuthErrorThe server returned 401 or 403. API keys are redacted in the message; the raw key is never stored on the exception instance.
PrecipiqAPIErrorAny non-2xx response that is not 401/403. Has status_code (HTTP status) and body (parsed or raw response body, truncated to 512 characters) attributes.
PrecipiqTransportErrorThe HTTP transport failed — DNS, connection error, or timeout. Wraps the underlying httpx exception.
from precipiq import Precipiq, PrecipiqAuthError, PrecipiqAPIError, PrecipiqTransportError

pq = Precipiq(api_key="pq_live_...", raise_on_error=True)

try:
    pq.log_decision(
        agent_id="pricing-bot",
        action_type="discount_offer",
        inputs={"customer_id": "cust_123"},
        outputs={"discount_pct": 15},
        confidence=0.82,
    )
except PrecipiqAuthError:
    # 401 / 403 — rotate or validate the API key
    pass
except PrecipiqAPIError as e:
    print(e.status_code, e.body)
except PrecipiqTransportError:
    # Network or timeout failure
    pass
Exceptions are only raised when raise_on_error=True. With the default raise_on_error=False, failures are written to the fallback log at ~/.precipiq/fallback.log and swallowed silently.

Integrations

LangChain

PrecipiqLangChainCallback is a LangChain BaseCallbackHandler that automatically tracks LLM, tool, and chain calls. Streaming responses are handled correctly — tokens are aggregated per run_id and a single consolidated decision is shipped on on_llm_end, so you get one record per completion rather than one per token.
from langchain_openai import ChatOpenAI
from precipiq import Precipiq
from precipiq.integrations.langchain import PrecipiqLangChainCallback

pq = Precipiq(api_key="pq_live_...")
llm = ChatOpenAI(callbacks=[PrecipiqLangChainCallback(pq, agent_id="qa-bot")])
See the LangChain integration guide for full configuration options.

CrewAI

PrecipiqCrewAICallback tracks every agent task execution. Pass it as the step_callback on a crewai.Agent or crewai.Crew.
from crewai import Agent
from precipiq import Precipiq
from precipiq.integrations.crewai import PrecipiqCrewAICallback

pq = Precipiq(api_key="pq_live_...")
agent = Agent(
    role="Researcher",
    step_callback=PrecipiqCrewAICallback(pq, agent_id="researcher"),
)
See the CrewAI integration guide for full configuration options.