Human-in-the-loop mutation approval
execute_stepwise() pauses before every read_only=False primitive and yields a checkpoint. Inspect the pending call, ask the user, then resume or abandon. The primitive only runs if approved.
Before you start
Track 26 blocked a call automatically with a policy function. The one new
thing here: execute_stepwise() pauses execution before every read_only=False
primitive and hands control back to the caller. You inspect the pending call,
ask a human, then either resume or abandon. The primitive only runs if you
resume with approval.
The agent#
Two primitives: a read-only compose step and a mutating send step.
# emailer.py
from pydantic import BaseModel
from opensymbolicai.blueprints import PlanExecute
from opensymbolicai.core import primitive
class Email(BaseModel):
to: str
subject: str
body: str
class EmailAgent(PlanExecute):
@primitive(read_only=True)
def compose_email(self, to: str, subject: str, body: str) -> Email:
"""Compose a draft email. Does not send; returns an Email object."""
return Email(to=to, subject=subject, body=body)
@primitive(read_only=False)
def send_email(self, email: Email) -> str:
"""Send the email and return a confirmation string."""
return f"Sent to {email.to}: '{email.subject}'"compose_email runs uninterrupted. send_email is where execution pauses.
The approval loop#
from opensymbolicai.checkpoint import CheckpointStatus, SerializerRegistry
from opensymbolicai.models import PlanExecuteConfig
_serializer = SerializerRegistry()
_serializer.register(
Email,
serializer=lambda e: e.model_dump(),
deserializer=lambda d: Email(**d),
)
config = PlanExecuteConfig(require_mutation_approval=True)
agent = EmailAgent(llm=llm, config=config)
for cp in agent.execute_stepwise(task, serializer=_serializer):
if cp.status != CheckpointStatus.AWAITING_APPROVAL:
continue
print(f"Pending: {cp.pending_mutation.statement}")
answer = input("Approve? [y/n]: ").strip().lower()
if answer == "y":
for cp2 in agent.resume_from_checkpoint(
cp, approve_mutation=True, serializer=_serializer
):
if cp2.status == CheckpointStatus.COMPLETED and cp2.result_value:
print(f"Result: {cp2.result_value.data}")
else:
print("Rejected. Email not sent.")
breakrequire_mutation_approval=True enables the pause. execute_stepwise() is a
generator that yields a checkpoint each time execution stops. When the status
is AWAITING_APPROVAL, cp.pending_mutation.statement shows the exact plan
line that is waiting, for example confirmation = send_email(email).
To approve: call resume_from_checkpoint(cp, approve_mutation=True), also a
generator, and iterate it to completion. To reject: just don't call it.
Execution is abandoned and the primitive never runs.
The SerializerRegistry tells the framework how to save and restore the
Email object in the namespace snapshot between the pause and the resume.
Without it, the framework cannot deserialize the variable when picking up
where it left off.
Run two tasks#
uv run main.pyOutput (first task approved, second rejected):
Task: Compose and send a project status update to alice@example.com.
Pending: confirmation = send_email(email)
Approve? [y/n]: y
Result: Sent to alice@example.com: 'Project Status Update'
Task: Compose and send a meeting cancellation to team@example.com.
Pending: confirmation = send_email(email)
Approve? [y/n]: n
Rejected. Email not sent.The plan is the same for both tasks. compose_email runs first and builds the
Email object. Execution pauses at send_email. The approval decision
determines whether it ever runs.
What to notice#
compose_emailnever pauses. It isread_only=True, sorequire_mutation_approvalignores it. Onlysend_emailtriggers the pause.- Rejecting is passive. You do not call any cancellation method. Simply not resuming is enough: the primitive is never invoked.
- The plan does not change. Both tasks produce a plan with the same two lines. The human decision happens outside the plan, not inside it.