That sounds like fun. I’ve already built a few different wrappers developing a self loop on my AGI project. I am more than happy to share it. Here’s one of my scripts.
from future import annotations
import jsonimport osimport reimport sysimport timefrom dataclasses import dataclass, fieldfrom datetime import datetime, timezonefrom pathlib import Pathfrom typing import Callable, Dict, Iterable, List, Optional, Protocol, Tuple
---------- Adapters ----------
class LLMAdapter(Protocol):“”“Minimal adapter for different LLM providers.”“”def call(self,messages: List[Dict[str, str]],max_tokens: int = 512,temperature: float = 0.2,) → str: …
class CallableAdapter:“”“For tests/demos: uses a user-supplied Python callable(text) → str.”“”def init(self, fn: Callable[[str], str]):self._fn = fn
def call(
self,
messages: List[Dict[str, str]],
max_tokens: int = 512,
temperature: float = 0.2,
) -> str:
# why: allow quick local demos with deterministic behavior
joined = "\n\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages)
return self._fn(joined)[:max_tokens * 4] # rough char cap
class OpenAIAdapter:“”“Optional: calls OpenAI if openai package + API key are available.”“”def init(self, model: str = “gpt-4o-mini”):try:import openai # type: ignoreexcept Exception as exc: # pragma: no coverraise RuntimeError(“openai package not installed”) from excif not os.getenv(“OPENAI_API_KEY”): # pragma: no coverraise RuntimeError(“OPENAI_API_KEY not set”)self._model = modelself._openai = openai
def call(
self,
messages: List[Dict[str, str]],
max_tokens: int = 512,
temperature: float = 0.2,
) -> str: # pragma: no cover
client = self._openai.OpenAI()
resp = client.chat.completions.create(
model=self._model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
return resp.choices[0].message.content or ""
---------- Stats / Utilities ----------
@dataclassdataclassclass RunStats:calls: int = 0latency_sec: float = 0.0prompt_tokens_est: int = 0completion_tokens_est: int = 0
def add(self, prompt: str, completion: str, latency: float) -> None:
self.calls += 1
self.latency_sec += latency
self.prompt_tokens_est += estimate_tokens(prompt)
self.completion_tokens_est += estimate_tokens(completion)
def estimate_tokens(text: str) → int:“”“Cheap estimator; safe when tokenizer isn’t available.”“”return max(1, int(len(text) / 4))
def timed_call(fn: Callable[, str]) → Tuple[str, float]:start = time.perf_counter()out = fn()return out, time.perf_counter() - start
---------- Checkpoints ----------
Checkpoint = Callable[[str], Tuple[bool, str]]
def NonEmptyCheckpoint() → Checkpoint:def _check(text: str) → Tuple[bool, str]:ok = bool(text.strip())return ok, “” if ok else “Empty response.”return _check
def LengthBoundCheckpoint(max_chars: int = 5000) → Checkpoint:def _check(text: str) → Tuple[bool, str]:ok = len(text) <= max_charsreturn ok, “” if ok else f"Exceeded {max_chars} chars."return _check
def UnsafeContentCheckpoint(bad_keywords: Optional[Iterable[str]] = None) → Checkpoint:bad = { # editable list“dox”, “self-harm”, “bomb”, “explosive”, “child sexual”, “credit card”,“ssn”, “social security number”, “malware”, “ransomware”}if bad_keywords: # allow injection/extensibilitybad |= set(map(str.lower, bad_keywords))
pattern = re.compile("|".join(map(re.escape, sorted(bad))), re.IGNORECASE)
def _check(text: str) -> Tuple[bool, str]:
hit = pattern.search(text)
return (hit is None), "" if hit is None else f"Unsafe keyword: {hit.group(0)!r}"
return _check
---------- Regret Store ----------
class RegretStore:“”“JSONL log of failed loops for later prompts/evals.”“”def init(self, path: str | Path = “regret_log.jsonl”) → None:self.path = Path(path)
def append(
self,
*,
question: str,
draft: str,
critique: Dict[str, object],
failed_checkpoints: List[str],
loop_index: int,
) -> None:
rec = {
"ts": datetime.now(timezone.utc).isoformat(),
"question": question,
"failed_checkpoints": failed_checkpoints,
"critique": critique,
"loop_index": loop_index,
"draft": draft,
}
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
---------- Cognitive Loop ---@dataclass------
@dataclassclass CognitiveResult:answer: strplan: strcritique: Dict[str, object]loops: intstats: RunStatspassed_checkpoints: bool
class CognitiveLoop:“”“Plan → Draft → Self-check → Revise loop with cost + checkpoints + regret.”“”def init(self,adapter: LLMAdapter,checkpoints: Optional[List[Checkpoint]] = None,regret_store: Optional[RegretStore] = None,max_loops: int = 2,temperature: float = 0.2,max_tokens: int = 512,) → None:self.adapter = adapterself.checkpoints = checkpoints or [NonEmptyCheckpoint(),LengthBoundCheckpoint(4000),UnsafeContentCheckpoint(),]self.regret_store = regret_store or RegretStore()self.max_loops = max_loopsself.temperature = temperatureself.max_tokens = max_tokensself.stats = RunStats()
# ---- prompts ----
@staticmethod
def _system() -> str:
return (
"You are a precise assistant. Be correct, concise, and cite assumptions. "
"Return exactly what is requested by the user."
)
@staticmethod
def _plan_prompt(question: str, context: str = "") -> str:
return (
"Task:\n"
f"{question}\n\n"
"Context (optional):\n"
f"{context}\n\n"
"Produce a numbered plan of steps to solve the task. Then list explicit assumptions.\n"
"Format:\n"
"Plan:\n1) ...\n2) ...\n\nAssumptions:\n- ...\n"
)
@staticmethod
def _draft_prompt(plan: str, question: str) -> str:
return (
"Follow this plan to produce the answer.\n\n"
f"Plan:\n{plan}\n\n"
f"User request:\n{question}\n\n"
"Write the final answer for the user. Be direct and complete. No meta commentary."
)
@staticmethod
def _check_prompt(plan: str, draft: str) -> str:
return (
"You are a critical reviewer. Assess the DRAFT against the PLAN.\n"
"List factual risks, unsupported claims, harmful/unsafe content, and compliance issues.\n"
"Return strict JSON only:\n"
"{\n"
' "pass": boolean,\n'
' "confidence": float, // 0..1\n'
' "issues": [ "..." ],\n'
' "required_fixes": [ "..." ]\n'
"}\n\n"
f"PLAN:\n{plan}\n\nDRAFT:\n{draft}\n"
)
@staticmethod
def _revise_prompt(question: str, draft: str, critique: Dict[str, object], failed: List[str]) -> str:
return (
"Revise the draft to address ALL critique items and checkpoint failures, "
"while preserving useful content.\n"
f"User request:\n{question}\n\n"
f"Draft:\n{draft}\n\n"
f"Critique JSON:\n{json.dumps(critique)}\n\n"
f"Failed checkpoints:\n{failed}\n\n"
"Return only the revised final answer."
)
# ---- steps ----
def plan(self, question: str, context: str = "") -> str:
sys_prompt = self._system()
user_prompt = self._plan_prompt(question, context)
def _do() -> str:
return self.adapter.call(
[{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens,
)
out, dt = timed_call(_do)
self.stats.add(sys_prompt + "\n" + user_prompt, out, dt)
return out.strip()
def draft(self, plan: str, question: str) -> str:
sys_prompt = self._system()
user_prompt = self._draft_prompt(plan, question)
def _do() -> str:
return self.adapter.call(
[{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens,
)
out, dt = timed_call(_do)
self.stats.add(sys_prompt + "\n" + user_prompt, out, dt)
return out.strip()
def self_check(self, plan: str, draft: str) -> Dict[str, object]:
sys_prompt = self._system()
user_prompt = self._check_prompt(plan, draft)
def _do() -> str:
return self.adapter.call(
[{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}],
temperature=0.0,
max_tokens=self.max_tokens,
)
out, dt = timed_call(_do)
self.stats.add(sys_prompt + "\n" + user_prompt, out, dt)
try:
data = json.loads(out)
except json.JSONDecodeError:
# why: guard against malformed JSON from the model
data = {"pass": False, "confidence": 0.0, "issues": ["Malformed JSON"], "required_fixes": ["Return valid JSON"]}
# normalize
data.setdefault("pass", False)
data.setdefault("confidence", 0.0)
data.setdefault("issues", [])
data.setdefault("required_fixes", [])
return data
def run_checkpoints(self, text: str) -> List[str]:
failed: List[str] = []
for check in self.checkpoints:
ok, reason = check(text)
if not ok:
failed.append(reason or "Checkpoint failed")
return failed
def revise(
self,
question: str,
draft: str,
critique: Dict[str, object],
failed: List[str],
) -> str:
sys_prompt = self._system()
user_prompt = self._revise_prompt(question, draft, critique, failed)
def _do() -> str:
return self.adapter.call(
[{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}],
temperature=self.temperature,
max_tokens=self.max_tokens,
)
out, dt = timed_call(_do)
self.stats.add(sys_prompt + "\n" + user_prompt, out, dt)
return out.strip()
# ---- main API ----
def run(self, question: str, *, context: str = "") -> CognitiveResult:
plan = self.plan(question, context)
answer = self.draft(plan, question)
loops = 0
critique = self.self_check(plan, answer)
failed = self.run_checkpoints(answer)
while (loops < self.max_loops) and (failed or not bool(critique.get("pass", False))):
self.regret_store.append(
question=question,
draft=answer,
critique=critique,
failed_checkpoints=failed,
loop_index=loops,
)
answer = self.revise(question, answer, critique, failed)
loops += 1
critique = self.self_check(plan, answer)
failed = self.run_checkpoints(answer)
return CognitiveResult(
answer=answer,
plan=plan,
critique=critique,
loops=loops,
stats=self.stats,
passed_checkpoints=not failed and bool(critique.get("pass", False)),
)
---------- Demo (no external deps) ----------
def _toy_fn(transcript: str) → str:“”“Deterministic callable for demo/testing.”“”if “Produce a numbered plan” in transcript:return “Plan:\n1) Identify key facts.\n2) Provide a concise answer.\n\nAssumptions:\n- General knowledge is sufficient.”if “Return strict JSON only” in transcript:
Naive ‘pass’ unless we see the word ‘bomb’ in the draft
pass_flag = “bomb” not in transcript.lower()return json.dumps({“pass”: pass_flag,“confidence”: 0.7 if pass_flag else 0.2,“issues”: if pass_flag else [“Unsafe content detected”],“required_fixes”: if pass_flag else [“Remove unsafe content”],})
Default: echo a trimmed, friendly answer
return “Here’s a concise, safe answer based on the plan.”
def main() → None:adapter = CallableAdapter(_toy_fn)loop = CognitiveLoop(adapter, max_loops=1)result = loop.run(“Explain photosynthesis briefly.”, context=“”)print(“=== PLAN ===\n”, result.plan)print(“\n=== ANSWER ===\n”, result.answer)print(“\n=== CRITIQUE ===\n”, json.dumps(result.critique, indent=2))print(“\n=== STATS ===”)print(vars(result.stats))print(“\nPassed checkpoints:”, result.passed_checkpoints, " | loops:", result.loops)
if name == “main”:main()
Basic tests (run with: pytest -q)
import jsonfrom marcognity_loop.wrapper import CallableAdapter, CognitiveLoop, _toy_fn
def test_loop_passes_by_default():loop = CognitiveLoop(CallableAdapter(_toy_fn), max_loops=1)res = loop.run(“What is 2+2?”)assert res.answerassert isinstance(res.critique, dict)assert res.stats.calls >= 3
def test_checkpoint_blocks_unsafe_word():def bad_fn(transcript: str) → str:if “Produce a numbered plan” in transcript:return “Plan:\n1) …\n\nAssumptions:\n- …”if “Return strict JSON only” in transcript:return json.dumps({“pass”: False, “confidence”: 0.1, “issues”: [“bad”], “required_fixes”: [“remove”]})return “This mentions a bomb.”loop = CognitiveLoop(CallableAdapter(bad_fn), max_loops=1)res = loop.run(“Say something unsafe.”)assert not res.passed_checkpointsassert res.loops == 1