Skip to main content

perspt_store/
store.rs

1//! Session Store Implementation
2//!
3//! Provides CRUD operations for SRBN sessions, node states, and energy history.
4
5use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9
10use crate::schema::init_schema;
11
12/// Record for a session
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct SessionRecord {
15    pub session_id: String,
16    pub task: String,
17    pub working_dir: String,
18    pub merkle_root: Option<Vec<u8>>,
19    pub detected_toolchain: Option<String>,
20    pub status: String,
21}
22
23/// Record for node state
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeStateRecord {
26    pub node_id: String,
27    pub session_id: String,
28    pub state: String,
29    pub v_total: f32,
30    pub merkle_hash: Option<Vec<u8>>,
31    pub attempt_count: i32,
32    // PSP-5 Phase 8: Richer node snapshot for resume reconstruction
33    pub node_class: Option<String>,
34    pub owner_plugin: Option<String>,
35    pub goal: Option<String>,
36    pub parent_id: Option<String>,
37    /// JSON-serialized `Vec<String>`
38    pub children: Option<String>,
39    pub last_error_type: Option<String>,
40    pub committed_at: Option<String>,
41}
42
43/// Record for energy history
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct EnergyRecord {
46    pub node_id: String,
47    pub session_id: String,
48    pub v_syn: f32,
49    pub v_str: f32,
50    pub v_log: f32,
51    pub v_boot: f32,
52    pub v_sheaf: f32,
53    pub v_total: f32,
54}
55
56/// Record for LLM request/response logging
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct LlmRequestRecord {
59    pub session_id: String,
60    pub node_id: Option<String>,
61    pub model: String,
62    pub prompt: String,
63    pub response: String,
64    pub tokens_in: i32,
65    pub tokens_out: i32,
66    pub latency_ms: i32,
67}
68
69/// PSP-5 Phase 3: Record for structural digest persistence
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StructuralDigestRecord {
72    pub digest_id: String,
73    pub session_id: String,
74    pub node_id: String,
75    pub source_path: String,
76    pub artifact_kind: String,
77    pub hash: Vec<u8>,
78    pub version: i32,
79}
80
81/// PSP-5 Phase 3: Record for context provenance persistence
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ContextProvenanceRecord {
84    pub session_id: String,
85    pub node_id: String,
86    pub context_package_id: String,
87    /// JSON-serialized structural digest hashes
88    pub structural_hashes: String,
89    /// JSON-serialized summary hashes
90    pub summary_hashes: String,
91    /// JSON-serialized dependency commit hashes
92    pub dependency_hashes: String,
93    pub included_file_count: i32,
94    pub total_bytes: i32,
95}
96
97/// PSP-5 Phase 5: Record for escalation report persistence
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct EscalationReportRecord {
100    pub session_id: String,
101    pub node_id: String,
102    /// Serialized EscalationCategory
103    pub category: String,
104    /// JSON-serialized RewriteAction
105    pub action: String,
106    /// JSON-serialized EnergyComponents
107    pub energy_snapshot: String,
108    /// JSON-serialized `Vec<StageOutcome>`
109    pub stage_outcomes: String,
110    /// Human-readable evidence
111    pub evidence: String,
112    /// JSON-serialized `Vec<String>`
113    pub affected_node_ids: String,
114}
115
116/// PSP-5 Phase 5: Record for local graph rewrite persistence
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct RewriteRecordRow {
119    pub session_id: String,
120    pub node_id: String,
121    /// JSON-serialized RewriteAction
122    pub action: String,
123    /// Serialized EscalationCategory
124    pub category: String,
125    /// JSON-serialized `Vec<String>`
126    pub requeued_nodes: String,
127    /// JSON-serialized `Vec<String>`
128    pub inserted_nodes: String,
129}
130
131/// PSP-5 Phase 5: Record for sheaf validation result persistence
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SheafValidationRow {
134    pub session_id: String,
135    pub node_id: String,
136    pub validator_class: String,
137    pub plugin_source: Option<String>,
138    pub passed: bool,
139    pub evidence_summary: String,
140    /// JSON-serialized `Vec<String>`
141    pub affected_files: String,
142    pub v_sheaf_contribution: f32,
143    /// JSON-serialized `Vec<String>`
144    pub requeue_targets: String,
145}
146
147// =============================================================================
148// PSP-5 Phase 6: Provisional Branch, Interface Seal, Branch Flush Records
149// =============================================================================
150
151/// PSP-5 Phase 6: Record for provisional branch persistence
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ProvisionalBranchRow {
154    pub branch_id: String,
155    pub session_id: String,
156    pub node_id: String,
157    pub parent_node_id: String,
158    pub state: String,
159    pub parent_seal_hash: Option<Vec<u8>>,
160    pub sandbox_dir: Option<String>,
161}
162
163/// PSP-5 Phase 6: Record for branch lineage persistence
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct BranchLineageRow {
166    pub lineage_id: String,
167    pub parent_branch_id: String,
168    pub child_branch_id: String,
169    pub depends_on_seal: bool,
170}
171
172/// PSP-5 Phase 6: Record for interface seal persistence
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct InterfaceSealRow {
175    pub seal_id: String,
176    pub session_id: String,
177    pub node_id: String,
178    pub sealed_path: String,
179    pub artifact_kind: String,
180    pub seal_hash: Vec<u8>,
181    pub version: i32,
182}
183
184/// PSP-5 Phase 6: Record for branch flush decision persistence
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct BranchFlushRow {
187    pub flush_id: String,
188    pub session_id: String,
189    pub parent_node_id: String,
190    /// JSON-serialized `Vec<String>`
191    pub flushed_branch_ids: String,
192    /// JSON-serialized `Vec<String>`
193    pub requeue_node_ids: String,
194    pub reason: String,
195}
196
197// =============================================================================
198// PSP-5 Phase 8: Task Graph and Review Outcome Records
199// =============================================================================
200
201/// PSP-5 Phase 8: Record for task graph edges (DAG reconstruction on resume)
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct TaskGraphEdgeRow {
204    pub session_id: String,
205    pub parent_node_id: String,
206    pub child_node_id: String,
207    pub edge_type: String,
208}
209
210/// PSP-5 Phase 8: Record for review outcome persistence
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct ReviewOutcomeRow {
213    pub session_id: String,
214    pub node_id: String,
215    /// One of: "approved", "rejected", "edit_requested", "correction_requested", "skipped"
216    pub outcome: String,
217    pub reviewer_note: Option<String>,
218    /// Energy at time of review decision
219    pub energy_at_review: Option<f64>,
220    /// Whether verification was degraded when decision was made
221    pub degraded: Option<bool>,
222    /// Escalation category if the node had been classified
223    pub escalation_category: Option<String>,
224}
225
226/// PSP-5 Phase 8: Record for verification result snapshot persistence
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct VerificationResultRow {
229    pub session_id: String,
230    pub node_id: String,
231    /// JSON-serialized VerificationResult (full data for resume reconstruction)
232    pub result_json: String,
233    // Query-friendly summary fields
234    pub syntax_ok: bool,
235    pub build_ok: bool,
236    pub tests_ok: bool,
237    pub lint_ok: bool,
238    pub diagnostics_count: i32,
239    pub tests_passed: i32,
240    pub tests_failed: i32,
241    pub degraded: bool,
242    pub degraded_reason: Option<String>,
243}
244
245/// PSP-5 Phase 8: Record for artifact bundle snapshot persistence
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct ArtifactBundleRow {
248    pub session_id: String,
249    pub node_id: String,
250    /// JSON-serialized ArtifactBundle (full data for resume reconstruction)
251    pub bundle_json: String,
252    pub artifact_count: i32,
253    pub command_count: i32,
254    /// JSON-serialized `Vec<String>` of touched file paths
255    pub touched_files: String,
256}
257
258// =========================================================================
259// Plan Revision, Feature Charter, and Repair Footprint Row Types
260// =========================================================================
261
262/// Row type for feature_charters table
263#[derive(Debug, Clone)]
264pub struct FeatureCharterRow {
265    pub charter_id: String,
266    pub session_id: String,
267    pub scope_description: String,
268    pub max_modules: Option<i32>,
269    pub max_files: Option<i32>,
270    pub max_revisions: Option<i32>,
271    pub language_constraint: Option<String>,
272}
273
274/// Row type for plan_revisions table
275#[derive(Debug, Clone)]
276pub struct PlanRevisionRow {
277    pub revision_id: String,
278    pub session_id: String,
279    pub sequence: i32,
280    pub plan_json: String,
281    pub reason: String,
282    pub supersedes: Option<String>,
283    pub status: String,
284}
285
286/// Row type for repair_footprints table
287#[derive(Debug, Clone)]
288pub struct RepairFootprintRow {
289    pub footprint_id: String,
290    pub session_id: String,
291    pub node_id: String,
292    pub revision_id: String,
293    pub attempt: i32,
294    pub affected_files: String,
295    pub bundle_json: String,
296    pub diagnosis: String,
297    pub resolved: bool,
298}
299
300/// Row type for budget_envelopes table
301#[derive(Debug, Clone)]
302pub struct BudgetEnvelopeRow {
303    pub session_id: String,
304    pub max_steps: Option<i32>,
305    pub steps_used: i32,
306    pub max_revisions: Option<i32>,
307    pub revisions_used: i32,
308    pub max_cost_usd: Option<f64>,
309    pub cost_used_usd: f64,
310}
311
312// =========================================================================
313// PSP-7: SRBN Step Records and Correction Attempt Row Types
314// =========================================================================
315
316/// PSP-7: Record for a single orchestration step transition.
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct SrbnStepRecord {
319    pub session_id: String,
320    pub node_id: String,
321    /// Pipeline stage name (e.g. "speculate", "verify", "converge", "commit").
322    pub step: String,
323    /// Outcome of the step (e.g. "ok", "retry", "escalated", "failed").
324    pub outcome: String,
325    /// JSON-serialized EnergyComponents snapshot (if available).
326    pub energy_json: Option<String>,
327    /// ParseResultState as string (if this step involved parsing).
328    pub parse_state: Option<String>,
329    /// RetryClassification as string (if this step triggered a retry).
330    pub retry_classification: Option<String>,
331    /// Attempt count at the time of recording.
332    pub attempt_count: i32,
333    /// Wall-clock duration of the step in milliseconds.
334    pub duration_ms: i32,
335}
336
337/// PSP-7: Record for a single correction attempt within a convergence loop.
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct CorrectionAttemptRow {
340    pub session_id: String,
341    pub node_id: String,
342    pub attempt: i32,
343    pub parse_state: String,
344    pub retry_classification: Option<String>,
345    pub response_fingerprint: String,
346    pub response_length: i32,
347    /// JSON-serialized EnergyComponents snapshot (if available).
348    pub energy_json: Option<String>,
349    pub accepted: bool,
350    pub rejection_reason: Option<String>,
351    /// Epoch seconds.
352    pub created_at: i64,
353}
354
355use std::sync::Mutex;
356
357/// Session store for SRBN persistence
358pub struct SessionStore {
359    conn: Mutex<Connection>,
360}
361
362impl SessionStore {
363    /// Create a new session store with default path
364    pub fn new() -> Result<Self> {
365        let db_path = Self::default_db_path()?;
366        Self::open(&db_path)
367    }
368
369    /// Open a session store at the given path
370    pub fn open(path: &PathBuf) -> Result<Self> {
371        // Ensure parent directory exists
372        if let Some(parent) = path.parent() {
373            std::fs::create_dir_all(parent)?;
374        }
375
376        let conn = Connection::open(path).context("Failed to open DuckDB")?;
377        init_schema(&conn)?;
378
379        Ok(Self {
380            conn: Mutex::new(conn),
381        })
382    }
383
384    /// Open a session store in read-only mode for concurrent dashboard reads.
385    ///
386    /// Uses `AccessMode::ReadOnly` so the dashboard can read alongside the
387    /// agent's write lock. Does **not** call `init_schema()` (a write op).
388    /// The database file must already exist.
389    pub fn open_read_only(path: &std::path::Path) -> Result<Self> {
390        let config = duckdb::Config::default()
391            .access_mode(duckdb::AccessMode::ReadOnly)
392            .context("Failed to configure DuckDB read-only mode")?;
393        let conn = Connection::open_with_flags(path, config)
394            .context("Failed to open DuckDB in read-only mode")?;
395        Ok(Self {
396            conn: Mutex::new(conn),
397        })
398    }
399
400    /// Get the default database path (~/.local/share/perspt/perspt.db or similar)
401    pub fn default_db_path() -> Result<PathBuf> {
402        perspt_core::paths::database_path().context("Could not determine platform data directory")
403    }
404
405    /// Create a new session
406    pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
407        self.conn.lock().unwrap().execute(
408            r#"
409            INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
410            VALUES (?, ?, ?, ?, ?, ?)
411            "#,
412            [
413                &session.session_id,
414                &session.task,
415                &session.working_dir,
416                &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
417                &session.detected_toolchain.clone().unwrap_or_default(),
418                &session.status,
419            ],
420        )?;
421        Ok(())
422    }
423
424    /// Update session merkle root
425    pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
426        self.conn.lock().unwrap().execute(
427            "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
428            [hex::encode(merkle_root), session_id.to_string()],
429        )?;
430        Ok(())
431    }
432
433    /// Record node state
434    pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
435        let v_total = record.v_total.to_string();
436        let merkle_hash = record
437            .merkle_hash
438            .as_ref()
439            .map(hex::encode)
440            .unwrap_or_default();
441        let attempt_count = record.attempt_count.to_string();
442        let node_class = record.node_class.clone().unwrap_or_default();
443        let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
444        let goal = record.goal.clone().unwrap_or_default();
445        let parent_id = record.parent_id.clone().unwrap_or_default();
446        let children = record.children.clone().unwrap_or_default();
447        let last_error_type = record.last_error_type.clone().unwrap_or_default();
448        let committed_at = record.committed_at.clone().unwrap_or_default();
449
450        self.conn.lock().unwrap().execute(
451            r#"
452            INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
453                                     node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
454            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
455            "#,
456            [
457                &record.node_id,
458                &record.session_id,
459                &record.state,
460                &v_total,
461                &merkle_hash,
462                &attempt_count,
463                &node_class,
464                &owner_plugin,
465                &goal,
466                &parent_id,
467                &children,
468                &last_error_type,
469                &committed_at,
470            ],
471        )?;
472        Ok(())
473    }
474
475    /// Record energy measurement
476    pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
477        self.conn.lock().unwrap().execute(
478            r#"
479            INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
480            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
481            "#,
482            [
483                &record.node_id,
484                &record.session_id,
485                &record.v_syn.to_string(),
486                &record.v_str.to_string(),
487                &record.v_log.to_string(),
488                &record.v_boot.to_string(),
489                &record.v_sheaf.to_string(),
490                &record.v_total.to_string(),
491            ],
492        )?;
493        Ok(())
494    }
495
496    /// Get session by ID
497    pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
498        let conn = self.conn.lock().unwrap();
499        let mut stmt = conn.prepare(
500            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
501        )?;
502
503        let mut rows = stmt.query([session_id])?;
504        if let Some(row) = rows.next()? {
505            // merkle_root is stored as BLOB; read directly as Option<Vec<u8>>
506            // to match list_recent_sessions and avoid type mismatch on Blob columns.
507            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
508
509            Ok(Some(SessionRecord {
510                session_id: row.get(0)?,
511                task: row.get(1)?,
512                working_dir: row.get(2)?,
513                merkle_root,
514                detected_toolchain: row.get(4)?,
515                status: row.get(5)?,
516            }))
517        } else {
518            Ok(None)
519        }
520    }
521
522    /// Get the directory for session artifacts (`~/.local/share/perspt/sessions/<id>`)
523    pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
524        let data_dir = dirs::data_local_dir()
525            .context("Could not find local data directory")?
526            .join("perspt")
527            .join("sessions")
528            .join(session_id);
529        Ok(data_dir)
530    }
531
532    /// Ensure a session directory exists and return the path
533    pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
534        let dir = self.get_session_dir(session_id)?;
535        if !dir.exists() {
536            std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
537        }
538        Ok(dir)
539    }
540
541    /// Get energy history for a node (query)
542    pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
543        let conn = self.conn.lock().unwrap();
544        let mut stmt = conn.prepare(
545            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
546        )?;
547
548        let mut rows = stmt.query([session_id, node_id])?;
549        let mut records = Vec::new();
550
551        while let Some(row) = rows.next()? {
552            records.push(EnergyRecord {
553                node_id: row.get(0)?,
554                session_id: row.get(1)?,
555                v_syn: row.get::<_, f64>(2)? as f32,
556                v_str: row.get::<_, f64>(3)? as f32,
557                v_log: row.get::<_, f64>(4)? as f32,
558                v_boot: row.get::<_, f64>(5)? as f32,
559                v_sheaf: row.get::<_, f64>(6)? as f32,
560                v_total: row.get::<_, f64>(7)? as f32,
561            });
562        }
563
564        Ok(records)
565    }
566
567    /// Get all energy history for a session (all nodes)
568    pub fn get_session_energy_history(&self, session_id: &str) -> Result<Vec<EnergyRecord>> {
569        let conn = self.conn.lock().unwrap();
570        let mut stmt = conn.prepare(
571            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? ORDER BY timestamp"
572        )?;
573
574        let mut rows = stmt.query([session_id])?;
575        let mut records = Vec::new();
576
577        while let Some(row) = rows.next()? {
578            records.push(EnergyRecord {
579                node_id: row.get(0)?,
580                session_id: row.get(1)?,
581                v_syn: row.get::<_, f64>(2)? as f32,
582                v_str: row.get::<_, f64>(3)? as f32,
583                v_log: row.get::<_, f64>(4)? as f32,
584                v_boot: row.get::<_, f64>(5)? as f32,
585                v_sheaf: row.get::<_, f64>(6)? as f32,
586                v_total: row.get::<_, f64>(7)? as f32,
587            });
588        }
589
590        Ok(records)
591    }
592
593    /// List recent sessions (newest first)
594    pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
595        self.list_sessions_paginated(limit, 0)
596    }
597
598    /// List sessions with pagination (most recent first).
599    pub fn list_sessions_paginated(
600        &self,
601        limit: usize,
602        offset: usize,
603    ) -> Result<Vec<SessionRecord>> {
604        let conn = self.conn.lock().unwrap();
605        let mut stmt = conn.prepare(
606            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
607             FROM sessions ORDER BY created_at DESC LIMIT ? OFFSET ?",
608        )?;
609
610        let mut rows = stmt.query([limit.to_string(), offset.to_string()])?;
611        let mut records = Vec::new();
612
613        while let Some(row) = rows.next()? {
614            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
615
616            records.push(SessionRecord {
617                session_id: row.get(0)?,
618                task: row.get(1)?,
619                working_dir: row.get(2)?,
620                merkle_root,
621                detected_toolchain: row.get(4)?,
622                status: row.get(5)?,
623            });
624        }
625
626        Ok(records)
627    }
628
629    /// Count total number of sessions.
630    pub fn count_sessions(&self) -> Result<usize> {
631        let conn = self.conn.lock().unwrap();
632        let mut stmt = conn.prepare("SELECT COUNT(*) FROM sessions")?;
633        let mut rows = stmt.query([])?;
634        if let Some(row) = rows.next()? {
635            let count: i64 = row.get(0)?;
636            Ok(count as usize)
637        } else {
638            Ok(0)
639        }
640    }
641
642    /// Get all node states for a session
643    pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
644        let conn = self.conn.lock().unwrap();
645        let mut stmt = conn.prepare(
646            "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
647                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
648             FROM node_states WHERE session_id = ? ORDER BY created_at",
649        )?;
650
651        let mut rows = stmt.query([session_id])?;
652        let mut records = Vec::new();
653
654        while let Some(row) = rows.next()? {
655            records.push(NodeStateRecord {
656                node_id: row.get(0)?,
657                session_id: row.get(1)?,
658                state: row.get(2)?,
659                v_total: row.get::<_, f64>(3)? as f32,
660                merkle_hash: row
661                    .get::<_, Option<String>>(4)?
662                    .and_then(|s| hex::decode(s).ok()),
663                attempt_count: row.get(5)?,
664                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
665                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
666                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
667                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
668                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
669                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
670                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
671            });
672        }
673
674        Ok(records)
675    }
676
677    /// Update session status
678    pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
679        self.conn.lock().unwrap().execute(
680            "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
681            [status, session_id],
682        )?;
683        Ok(())
684    }
685
686    /// Record an LLM request/response
687    pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
688        let conn = self.conn.lock().unwrap();
689        conn.execute(
690            r#"
691            INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
692            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
693            "#,
694            [
695                &record.session_id,
696                &record.node_id.clone().unwrap_or_default(),
697                &record.model,
698                &record.prompt,
699                &record.response,
700                &record.tokens_in.to_string(),
701                &record.tokens_out.to_string(),
702                &record.latency_ms.to_string(),
703            ],
704        )?;
705        Ok(())
706    }
707
708    /// Get LLM requests for a session
709    pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
710        let conn = self.conn.lock().unwrap();
711        let mut stmt = conn.prepare(
712            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
713             FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
714        )?;
715
716        let mut rows = stmt.query([session_id])?;
717        let mut records = Vec::new();
718
719        while let Some(row) = rows.next()? {
720            let node_id: Option<String> = row.get(1)?;
721            records.push(LlmRequestRecord {
722                session_id: row.get(0)?,
723                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
724                    None
725                } else {
726                    node_id
727                },
728                model: row.get(2)?,
729                prompt: row.get(3)?,
730                response: row.get(4)?,
731                tokens_in: row.get(5)?,
732                tokens_out: row.get(6)?,
733                latency_ms: row.get(7)?,
734            });
735        }
736
737        Ok(records)
738    }
739
740    /// Aggregate LLM statistics across all sessions: (count, sum_tokens_in, sum_tokens_out, sum_latency_ms)
741    pub fn get_global_llm_summary(&self) -> Result<(i64, i64, i64, i64)> {
742        let conn = self.conn.lock().unwrap();
743        let mut stmt = conn.prepare(
744            "SELECT COUNT(*), \
745             COALESCE(SUM(CASE WHEN tokens_in > 0 THEN tokens_in ELSE (LENGTH(prompt) + 3) / 4 END), 0), \
746             COALESCE(SUM(CASE WHEN tokens_out > 0 THEN tokens_out ELSE (LENGTH(response) + 3) / 4 END), 0), \
747             COALESCE(MEDIAN(latency_ms), 0) \
748             FROM llm_requests",
749        )?;
750        let result = stmt.query_row([], |row| {
751            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
752        })?;
753        Ok(result)
754    }
755
756    // =========================================================================
757    // PSP-5 Phase 3: Structural Digest & Context Provenance Persistence
758    // =========================================================================
759
760    /// Record a structural digest
761    pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
762        self.conn.lock().unwrap().execute(
763            r#"
764            INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
765            VALUES (?, ?, ?, ?, ?, ?, ?)
766            "#,
767            [
768                &record.digest_id,
769                &record.session_id,
770                &record.node_id,
771                &record.source_path,
772                &record.artifact_kind,
773                &hex::encode(&record.hash),
774                &record.version.to_string(),
775            ],
776        )?;
777        Ok(())
778    }
779
780    /// Get structural digests for a session and node
781    pub fn get_structural_digests(
782        &self,
783        session_id: &str,
784        node_id: &str,
785    ) -> Result<Vec<StructuralDigestRecord>> {
786        let conn = self.conn.lock().unwrap();
787        let mut stmt = conn.prepare(
788            "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
789             FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
790        )?;
791
792        let mut rows = stmt.query([session_id, node_id])?;
793        let mut records = Vec::new();
794
795        while let Some(row) = rows.next()? {
796            records.push(StructuralDigestRecord {
797                digest_id: row.get(0)?,
798                session_id: row.get(1)?,
799                node_id: row.get(2)?,
800                source_path: row.get(3)?,
801                artifact_kind: row.get(4)?,
802                hash: row
803                    .get::<_, String>(5)
804                    .ok()
805                    .and_then(|s| hex::decode(s).ok())
806                    .unwrap_or_default(),
807                version: row.get(5)?,
808            });
809        }
810
811        Ok(records)
812    }
813
814    /// Record context provenance for a node
815    pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
816        self.conn.lock().unwrap().execute(
817            r#"
818            INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
819            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
820            "#,
821            [
822                &record.session_id,
823                &record.node_id,
824                &record.context_package_id,
825                &record.structural_hashes,
826                &record.summary_hashes,
827                &record.dependency_hashes,
828                &record.included_file_count.to_string(),
829                &record.total_bytes.to_string(),
830            ],
831        )?;
832        Ok(())
833    }
834
835    /// Get context provenance for a session and node
836    pub fn get_context_provenance(
837        &self,
838        session_id: &str,
839        node_id: &str,
840    ) -> Result<Option<ContextProvenanceRecord>> {
841        let conn = self.conn.lock().unwrap();
842        let mut stmt = conn.prepare(
843            "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
844             FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
845        )?;
846
847        let mut rows = stmt.query([session_id, node_id])?;
848        if let Some(row) = rows.next()? {
849            Ok(Some(ContextProvenanceRecord {
850                session_id: row.get(0)?,
851                node_id: row.get(1)?,
852                context_package_id: row.get(2)?,
853                structural_hashes: row.get(3)?,
854                summary_hashes: row.get(4)?,
855                dependency_hashes: row.get(5)?,
856                included_file_count: row.get(6)?,
857                total_bytes: row.get(7)?,
858            }))
859        } else {
860            Ok(None)
861        }
862    }
863
864    // =========================================================================
865    // PSP-5 Phase 5: Escalation, Rewrite, and Sheaf Validation Persistence
866    // =========================================================================
867
868    /// Record an escalation report
869    pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
870        self.conn.lock().unwrap().execute(
871            r#"
872            INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
873            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
874            "#,
875            [
876                &record.session_id,
877                &record.node_id,
878                &record.category,
879                &record.action,
880                &record.energy_snapshot,
881                &record.stage_outcomes,
882                &record.evidence,
883                &record.affected_node_ids,
884            ],
885        )?;
886        Ok(())
887    }
888
889    /// Get escalation reports for a session
890    pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
891        let conn = self.conn.lock().unwrap();
892        let mut stmt = conn.prepare(
893            "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
894             FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
895        )?;
896        let mut rows = stmt.query([session_id])?;
897        let mut records = Vec::new();
898        while let Some(row) = rows.next()? {
899            records.push(EscalationReportRecord {
900                session_id: row.get(0)?,
901                node_id: row.get(1)?,
902                category: row.get(2)?,
903                action: row.get(3)?,
904                energy_snapshot: row.get(4)?,
905                stage_outcomes: row.get(5)?,
906                evidence: row.get(6)?,
907                affected_node_ids: row.get(7)?,
908            });
909        }
910        Ok(records)
911    }
912
913    /// Record a local graph rewrite
914    pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
915        self.conn.lock().unwrap().execute(
916            r#"
917            INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
918            VALUES (?, ?, ?, ?, ?, ?)
919            "#,
920            [
921                &record.session_id,
922                &record.node_id,
923                &record.action,
924                &record.category,
925                &record.requeued_nodes,
926                &record.inserted_nodes,
927            ],
928        )?;
929        Ok(())
930    }
931
932    /// Get rewrite records for a session
933    pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
934        let conn = self.conn.lock().unwrap();
935        let mut stmt = conn.prepare(
936            "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
937             FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
938        )?;
939        let mut rows = stmt.query([session_id])?;
940        let mut records = Vec::new();
941        while let Some(row) = rows.next()? {
942            records.push(RewriteRecordRow {
943                session_id: row.get(0)?,
944                node_id: row.get(1)?,
945                action: row.get(2)?,
946                category: row.get(3)?,
947                requeued_nodes: row.get(4)?,
948                inserted_nodes: row.get(5)?,
949            });
950        }
951        Ok(records)
952    }
953
954    /// Record a sheaf validation result
955    pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
956        self.conn.lock().unwrap().execute(
957            r#"
958            INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
959            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
960            "#,
961            [
962                &record.session_id,
963                &record.node_id,
964                &record.validator_class,
965                &record.plugin_source.clone().unwrap_or_default(),
966                &record.passed.to_string(),
967                &record.evidence_summary,
968                &record.affected_files,
969                &record.v_sheaf_contribution.to_string(),
970                &record.requeue_targets,
971            ],
972        )?;
973        Ok(())
974    }
975
976    /// Get sheaf validation results for a session and node
977    pub fn get_sheaf_validations(
978        &self,
979        session_id: &str,
980        node_id: &str,
981    ) -> Result<Vec<SheafValidationRow>> {
982        let conn = self.conn.lock().unwrap();
983        let mut stmt = conn.prepare(
984            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
985             FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
986        )?;
987        let mut rows = stmt.query([session_id, node_id])?;
988        let mut records = Vec::new();
989        while let Some(row) = rows.next()? {
990            records.push(SheafValidationRow {
991                session_id: row.get(0)?,
992                node_id: row.get(1)?,
993                validator_class: row.get(2)?,
994                plugin_source: row.get::<_, Option<String>>(3)?,
995                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
996                evidence_summary: row.get(5)?,
997                affected_files: row.get(6)?,
998                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
999                requeue_targets: row.get(8)?,
1000            });
1001        }
1002        Ok(records)
1003    }
1004
1005    /// Get all sheaf validations for a session (all nodes).
1006    pub fn get_all_sheaf_validations(&self, session_id: &str) -> Result<Vec<SheafValidationRow>> {
1007        let conn = self.conn.lock().unwrap();
1008        let mut stmt = conn.prepare(
1009            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
1010             FROM sheaf_validations WHERE session_id = ? ORDER BY created_at",
1011        )?;
1012        let mut rows = stmt.query([session_id])?;
1013        let mut records = Vec::new();
1014        while let Some(row) = rows.next()? {
1015            records.push(SheafValidationRow {
1016                session_id: row.get(0)?,
1017                node_id: row.get(1)?,
1018                validator_class: row.get(2)?,
1019                plugin_source: row.get::<_, Option<String>>(3)?,
1020                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1021                evidence_summary: row.get(5)?,
1022                affected_files: row.get(6)?,
1023                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1024                requeue_targets: row.get(8)?,
1025            });
1026        }
1027        Ok(records)
1028    }
1029
1030    // =========================================================================
1031    // PSP-5 Phase 6: Provisional Branch CRUD
1032    // =========================================================================
1033
1034    /// Record a new provisional branch
1035    pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
1036        self.conn.lock().unwrap().execute(
1037            r#"
1038            INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
1039            VALUES (?, ?, ?, ?, ?, ?, ?)
1040            "#,
1041            [
1042                &record.branch_id,
1043                &record.session_id,
1044                &record.node_id,
1045                &record.parent_node_id,
1046                &record.state,
1047                &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
1048                &record.sandbox_dir.clone().unwrap_or_default(),
1049            ],
1050        )?;
1051        Ok(())
1052    }
1053
1054    /// Update a provisional branch state
1055    pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
1056        self.conn.lock().unwrap().execute(
1057            "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
1058            [new_state, branch_id],
1059        )?;
1060        Ok(())
1061    }
1062
1063    /// Get all provisional branches for a session
1064    pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
1065        let conn = self.conn.lock().unwrap();
1066        let mut stmt = conn.prepare(
1067            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1068             FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
1069        )?;
1070        let mut rows = stmt.query([session_id])?;
1071        let mut records = Vec::new();
1072        while let Some(row) = rows.next()? {
1073            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1074            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1075            records.push(ProvisionalBranchRow {
1076                branch_id: row.get(0)?,
1077                session_id: row.get(1)?,
1078                node_id: row.get(2)?,
1079                parent_node_id: row.get(3)?,
1080                state: row.get(4)?,
1081                parent_seal_hash,
1082                sandbox_dir: row.get::<_, Option<String>>(6)?,
1083            });
1084        }
1085        Ok(records)
1086    }
1087
1088    /// Get live (active/sealed) provisional branches depending on a parent node
1089    pub fn get_live_branches_for_parent(
1090        &self,
1091        session_id: &str,
1092        parent_node_id: &str,
1093    ) -> Result<Vec<ProvisionalBranchRow>> {
1094        let conn = self.conn.lock().unwrap();
1095        let mut stmt = conn.prepare(
1096            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1097             FROM provisional_branches
1098             WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
1099             ORDER BY created_at",
1100        )?;
1101        let mut rows = stmt.query([session_id, parent_node_id])?;
1102        let mut records = Vec::new();
1103        while let Some(row) = rows.next()? {
1104            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1105            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1106            records.push(ProvisionalBranchRow {
1107                branch_id: row.get(0)?,
1108                session_id: row.get(1)?,
1109                node_id: row.get(2)?,
1110                parent_node_id: row.get(3)?,
1111                state: row.get(4)?,
1112                parent_seal_hash,
1113                sandbox_dir: row.get::<_, Option<String>>(6)?,
1114            });
1115        }
1116        Ok(records)
1117    }
1118
1119    /// Mark all live branches for a parent as flushed
1120    pub fn flush_branches_for_parent(
1121        &self,
1122        session_id: &str,
1123        parent_node_id: &str,
1124    ) -> Result<Vec<String>> {
1125        let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
1126        let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
1127        for bid in &branch_ids {
1128            self.update_branch_state(bid, "flushed")?;
1129        }
1130        Ok(branch_ids)
1131    }
1132
1133    // =========================================================================
1134    // PSP-5 Phase 6: Branch Lineage CRUD
1135    // =========================================================================
1136
1137    /// Record a branch lineage edge
1138    pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
1139        self.conn.lock().unwrap().execute(
1140            r#"
1141            INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
1142            VALUES (?, ?, ?, ?)
1143            "#,
1144            [
1145                &record.lineage_id,
1146                &record.parent_branch_id,
1147                &record.child_branch_id,
1148                &record.depends_on_seal.to_string(),
1149            ],
1150        )?;
1151        Ok(())
1152    }
1153
1154    /// Get child branch IDs for a parent branch
1155    pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1156        let conn = self.conn.lock().unwrap();
1157        let mut stmt =
1158            conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1159        let mut rows = stmt.query([parent_branch_id])?;
1160        let mut ids = Vec::new();
1161        while let Some(row) = rows.next()? {
1162            ids.push(row.get(0)?);
1163        }
1164        Ok(ids)
1165    }
1166
1167    // =========================================================================
1168    // PSP-5 Phase 6: Interface Seal CRUD
1169    // =========================================================================
1170
1171    /// Record an interface seal
1172    pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1173        self.conn.lock().unwrap().execute(
1174            r#"
1175            INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1176            VALUES (?, ?, ?, ?, ?, ?, ?)
1177            "#,
1178            [
1179                &record.seal_id,
1180                &record.session_id,
1181                &record.node_id,
1182                &record.sealed_path,
1183                &record.artifact_kind,
1184                &hex::encode(&record.seal_hash),
1185                &record.version.to_string(),
1186            ],
1187        )?;
1188        Ok(())
1189    }
1190
1191    /// Get all interface seals for a node
1192    pub fn get_interface_seals(
1193        &self,
1194        session_id: &str,
1195        node_id: &str,
1196    ) -> Result<Vec<InterfaceSealRow>> {
1197        let conn = self.conn.lock().unwrap();
1198        let mut stmt = conn.prepare(
1199            "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1200             FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1201        )?;
1202        let mut rows = stmt.query([session_id, node_id])?;
1203        let mut records = Vec::new();
1204        while let Some(row) = rows.next()? {
1205            records.push(InterfaceSealRow {
1206                seal_id: row.get(0)?,
1207                session_id: row.get(1)?,
1208                node_id: row.get(2)?,
1209                sealed_path: row.get(3)?,
1210                artifact_kind: row.get(4)?,
1211                seal_hash: row
1212                    .get::<_, String>(5)
1213                    .ok()
1214                    .and_then(|h| hex::decode(h).ok())
1215                    .unwrap_or_default(),
1216                version: row.get::<_, i32>(6)?,
1217            });
1218        }
1219        Ok(records)
1220    }
1221
1222    /// Check whether a node has any interface seals
1223    pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1224        let conn = self.conn.lock().unwrap();
1225        let count: i64 = conn.query_row(
1226            "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1227            [session_id, node_id],
1228            |row| row.get(0),
1229        )?;
1230        Ok(count > 0)
1231    }
1232
1233    // =========================================================================
1234    // PSP-5 Phase 6: Branch Flush CRUD
1235    // =========================================================================
1236
1237    /// Record a branch flush decision
1238    pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1239        self.conn.lock().unwrap().execute(
1240            r#"
1241            INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1242            VALUES (?, ?, ?, ?, ?, ?)
1243            "#,
1244            [
1245                &record.flush_id,
1246                &record.session_id,
1247                &record.parent_node_id,
1248                &record.flushed_branch_ids,
1249                &record.requeue_node_ids,
1250                &record.reason,
1251            ],
1252        )?;
1253        Ok(())
1254    }
1255
1256    /// Get all branch flush records for a session
1257    pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1258        let conn = self.conn.lock().unwrap();
1259        let mut stmt = conn.prepare(
1260            "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1261             FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1262        )?;
1263        let mut rows = stmt.query([session_id])?;
1264        let mut records = Vec::new();
1265        while let Some(row) = rows.next()? {
1266            records.push(BranchFlushRow {
1267                flush_id: row.get(0)?,
1268                session_id: row.get(1)?,
1269                parent_node_id: row.get(2)?,
1270                flushed_branch_ids: row.get(3)?,
1271                requeue_node_ids: row.get(4)?,
1272                reason: row.get(5)?,
1273            });
1274        }
1275        Ok(records)
1276    }
1277
1278    // =========================================================================
1279    // PSP-5 Phase 8: Node Snapshot, Task Graph, and Review Outcome Persistence
1280    // =========================================================================
1281
1282    /// Get the latest node state snapshot per node for a session (for resume reconstruction).
1283    ///
1284    /// Returns at most one record per node_id, picking the most recently created row.
1285    pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1286        let conn = self.conn.lock().unwrap();
1287        let mut stmt = conn.prepare(
1288            "WITH ranked AS ( \
1289                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1290                 FROM node_states WHERE session_id = ? \
1291             ) \
1292             SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1293                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1294             FROM ranked WHERE rn = 1 ORDER BY created_at",
1295        )?;
1296
1297        let mut rows = stmt.query([session_id])?;
1298        let mut records = Vec::new();
1299
1300        while let Some(row) = rows.next()? {
1301            records.push(NodeStateRecord {
1302                node_id: row.get(0)?,
1303                session_id: row.get(1)?,
1304                state: row.get(2)?,
1305                v_total: row.get::<_, f64>(3)? as f32,
1306                merkle_hash: row
1307                    .get::<_, Option<String>>(4)?
1308                    .and_then(|s| hex::decode(s).ok()),
1309                attempt_count: row.get(5)?,
1310                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1311                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1312                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1313                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1314                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1315                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1316                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1317            });
1318        }
1319
1320        Ok(records)
1321    }
1322
1323    /// Record a task graph edge (parent→child dependency)
1324    pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1325        self.conn.lock().unwrap().execute(
1326            r#"
1327            INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1328            VALUES (?, ?, ?, ?)
1329            "#,
1330            [
1331                &record.session_id,
1332                &record.parent_node_id,
1333                &record.child_node_id,
1334                &record.edge_type,
1335            ],
1336        )?;
1337        Ok(())
1338    }
1339
1340    /// Get all task graph edges for a session
1341    pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1342        let conn = self.conn.lock().unwrap();
1343        let mut stmt = conn.prepare(
1344            "SELECT session_id, parent_node_id, child_node_id, edge_type \
1345             FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1346        )?;
1347        let mut rows = stmt.query([session_id])?;
1348        let mut records = Vec::new();
1349        while let Some(row) = rows.next()? {
1350            records.push(TaskGraphEdgeRow {
1351                session_id: row.get(0)?,
1352                parent_node_id: row.get(1)?,
1353                child_node_id: row.get(2)?,
1354                edge_type: row.get(3)?,
1355            });
1356        }
1357        Ok(records)
1358    }
1359
1360    /// Record a review outcome (approval, rejection, edit request)
1361    pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1362        let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1363        let escalation_category = record.escalation_category.clone().unwrap_or_default();
1364        self.conn.lock().unwrap().execute(
1365            r#"
1366            INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1367                                         energy_at_review, degraded, escalation_category)
1368            VALUES (?, ?, ?, ?, ?, ?, ?)
1369            "#,
1370            duckdb::params![
1371                record.session_id,
1372                record.node_id,
1373                record.outcome,
1374                reviewer_note,
1375                record.energy_at_review.unwrap_or(0.0),
1376                record.degraded.unwrap_or(false),
1377                escalation_category,
1378            ],
1379        )?;
1380        Ok(())
1381    }
1382
1383    /// Get all review outcomes for a node
1384    pub fn get_review_outcomes(
1385        &self,
1386        session_id: &str,
1387        node_id: &str,
1388    ) -> Result<Vec<ReviewOutcomeRow>> {
1389        let conn = self.conn.lock().unwrap();
1390        let mut stmt = conn.prepare(
1391            "SELECT session_id, node_id, outcome, reviewer_note, \
1392             energy_at_review, degraded, escalation_category \
1393             FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1394        )?;
1395        let mut rows = stmt.query([session_id, node_id])?;
1396        let mut records = Vec::new();
1397        while let Some(row) = rows.next()? {
1398            records.push(ReviewOutcomeRow {
1399                session_id: row.get(0)?,
1400                node_id: row.get(1)?,
1401                outcome: row.get(2)?,
1402                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1403                energy_at_review: row.get::<_, Option<f64>>(4)?,
1404                degraded: row.get::<_, Option<bool>>(5)?,
1405                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1406            });
1407        }
1408        Ok(records)
1409    }
1410
1411    /// Get the most recent review outcome for a node
1412    pub fn get_latest_review_outcome(
1413        &self,
1414        session_id: &str,
1415        node_id: &str,
1416    ) -> Result<Option<ReviewOutcomeRow>> {
1417        let conn = self.conn.lock().unwrap();
1418        let mut stmt = conn.prepare(
1419            "SELECT session_id, node_id, outcome, reviewer_note, \
1420             energy_at_review, degraded, escalation_category \
1421             FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1422             ORDER BY created_at DESC LIMIT 1",
1423        )?;
1424        let mut rows = stmt.query([session_id, node_id])?;
1425        if let Some(row) = rows.next()? {
1426            Ok(Some(ReviewOutcomeRow {
1427                session_id: row.get(0)?,
1428                node_id: row.get(1)?,
1429                outcome: row.get(2)?,
1430                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1431                energy_at_review: row.get::<_, Option<f64>>(4)?,
1432                degraded: row.get::<_, Option<bool>>(5)?,
1433                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1434            }))
1435        } else {
1436            Ok(None)
1437        }
1438    }
1439
1440    /// Get all review outcomes for a session (across all nodes).
1441    pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1442        let conn = self.conn.lock().unwrap();
1443        let mut stmt = conn.prepare(
1444            "SELECT session_id, node_id, outcome, reviewer_note, \
1445             energy_at_review, degraded, escalation_category \
1446             FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1447        )?;
1448        let mut rows = stmt.query([session_id])?;
1449        let mut records = Vec::new();
1450        while let Some(row) = rows.next()? {
1451            records.push(ReviewOutcomeRow {
1452                session_id: row.get(0)?,
1453                node_id: row.get(1)?,
1454                outcome: row.get(2)?,
1455                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1456                energy_at_review: row.get::<_, Option<f64>>(4)?,
1457                degraded: row.get::<_, Option<bool>>(5)?,
1458                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1459            });
1460        }
1461        Ok(records)
1462    }
1463
1464    // =========================================================================
1465    // PSP-5 Phase 8: Verification Result and Artifact Bundle Persistence
1466    // =========================================================================
1467
1468    /// Record a verification result snapshot for a node
1469    pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1470        let syntax_ok = record.syntax_ok.to_string();
1471        let build_ok = record.build_ok.to_string();
1472        let tests_ok = record.tests_ok.to_string();
1473        let lint_ok = record.lint_ok.to_string();
1474        let diagnostics_count = record.diagnostics_count.to_string();
1475        let tests_passed = record.tests_passed.to_string();
1476        let tests_failed = record.tests_failed.to_string();
1477        let degraded = record.degraded.to_string();
1478        let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1479
1480        self.conn.lock().unwrap().execute(
1481            r#"
1482            INSERT INTO verification_results (session_id, node_id, result_json,
1483                syntax_ok, build_ok, tests_ok, lint_ok,
1484                diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1485            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1486            "#,
1487            [
1488                &record.session_id,
1489                &record.node_id,
1490                &record.result_json,
1491                &syntax_ok,
1492                &build_ok,
1493                &tests_ok,
1494                &lint_ok,
1495                &diagnostics_count,
1496                &tests_passed,
1497                &tests_failed,
1498                &degraded,
1499                &degraded_reason,
1500            ],
1501        )?;
1502        Ok(())
1503    }
1504
1505    /// Get the latest verification result for a node
1506    pub fn get_verification_result(
1507        &self,
1508        session_id: &str,
1509        node_id: &str,
1510    ) -> Result<Option<VerificationResultRow>> {
1511        let conn = self.conn.lock().unwrap();
1512        let mut stmt = conn.prepare(
1513            "SELECT session_id, node_id, result_json, \
1514                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1515                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1516             FROM verification_results \
1517             WHERE session_id = ? AND node_id = ? \
1518             ORDER BY created_at DESC LIMIT 1",
1519        )?;
1520        let mut rows = stmt.query([session_id, node_id])?;
1521        if let Some(row) = rows.next()? {
1522            Ok(Some(VerificationResultRow {
1523                session_id: row.get(0)?,
1524                node_id: row.get(1)?,
1525                result_json: row.get(2)?,
1526                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1527                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1528                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1529                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1530                diagnostics_count: row.get(7)?,
1531                tests_passed: row.get(8)?,
1532                tests_failed: row.get(9)?,
1533                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1534                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1535            }))
1536        } else {
1537            Ok(None)
1538        }
1539    }
1540
1541    /// Get all verification results for a session (for status display)
1542    pub fn get_all_verification_results(
1543        &self,
1544        session_id: &str,
1545    ) -> Result<Vec<VerificationResultRow>> {
1546        let conn = self.conn.lock().unwrap();
1547        let mut stmt = conn.prepare(
1548            "WITH ranked AS ( \
1549                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1550                 FROM verification_results WHERE session_id = ? \
1551             ) \
1552             SELECT session_id, node_id, result_json, \
1553                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1554                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1555             FROM ranked WHERE rn = 1 ORDER BY created_at",
1556        )?;
1557        let mut rows = stmt.query([session_id])?;
1558        let mut records = Vec::new();
1559        while let Some(row) = rows.next()? {
1560            records.push(VerificationResultRow {
1561                session_id: row.get(0)?,
1562                node_id: row.get(1)?,
1563                result_json: row.get(2)?,
1564                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1565                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1566                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1567                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1568                diagnostics_count: row.get(7)?,
1569                tests_passed: row.get(8)?,
1570                tests_failed: row.get(9)?,
1571                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1572                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1573            });
1574        }
1575        Ok(records)
1576    }
1577
1578    /// Record an artifact bundle snapshot for a node
1579    pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1580        let artifact_count = record.artifact_count.to_string();
1581        let command_count = record.command_count.to_string();
1582
1583        self.conn.lock().unwrap().execute(
1584            r#"
1585            INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1586                artifact_count, command_count, touched_files)
1587            VALUES (?, ?, ?, ?, ?, ?)
1588            "#,
1589            [
1590                &record.session_id,
1591                &record.node_id,
1592                &record.bundle_json,
1593                &artifact_count,
1594                &command_count,
1595                &record.touched_files,
1596            ],
1597        )?;
1598        Ok(())
1599    }
1600
1601    /// Get the latest artifact bundle for a node
1602    pub fn get_artifact_bundle(
1603        &self,
1604        session_id: &str,
1605        node_id: &str,
1606    ) -> Result<Option<ArtifactBundleRow>> {
1607        let conn = self.conn.lock().unwrap();
1608        let mut stmt = conn.prepare(
1609            "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1610             FROM artifact_bundles \
1611             WHERE session_id = ? AND node_id = ? \
1612             ORDER BY created_at DESC LIMIT 1",
1613        )?;
1614        let mut rows = stmt.query([session_id, node_id])?;
1615        if let Some(row) = rows.next()? {
1616            Ok(Some(ArtifactBundleRow {
1617                session_id: row.get(0)?,
1618                node_id: row.get(1)?,
1619                bundle_json: row.get(2)?,
1620                artifact_count: row.get(3)?,
1621                command_count: row.get(4)?,
1622                touched_files: row.get(5)?,
1623            }))
1624        } else {
1625            Ok(None)
1626        }
1627    }
1628}
1629
1630// =========================================================================
1631// Plan Revision, Feature Charter, and Repair Footprint Methods
1632// =========================================================================
1633
1634impl SessionStore {
1635    /// Record a feature charter for a session.
1636    pub fn record_feature_charter(&self, row: &FeatureCharterRow) -> Result<()> {
1637        let conn = self.conn.lock().unwrap();
1638        conn.execute(
1639            "INSERT INTO feature_charters (charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint) \
1640             VALUES (?, ?, ?, ?, ?, ?, ?)",
1641            duckdb::params![
1642                row.charter_id,
1643                row.session_id,
1644                row.scope_description,
1645                row.max_modules,
1646                row.max_files,
1647                row.max_revisions,
1648                row.language_constraint,
1649            ],
1650        )?;
1651        Ok(())
1652    }
1653
1654    /// Get the feature charter for a session.
1655    pub fn get_feature_charter(&self, session_id: &str) -> Result<Option<FeatureCharterRow>> {
1656        let conn = self.conn.lock().unwrap();
1657        let mut stmt = conn.prepare(
1658            "SELECT charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint \
1659             FROM feature_charters WHERE session_id = ? LIMIT 1",
1660        )?;
1661        let mut rows = stmt.query([session_id])?;
1662        if let Some(row) = rows.next()? {
1663            Ok(Some(FeatureCharterRow {
1664                charter_id: row.get(0)?,
1665                session_id: row.get(1)?,
1666                scope_description: row.get(2)?,
1667                max_modules: row.get(3)?,
1668                max_files: row.get(4)?,
1669                max_revisions: row.get(5)?,
1670                language_constraint: row.get(6)?,
1671            }))
1672        } else {
1673            Ok(None)
1674        }
1675    }
1676
1677    /// Record a plan revision.
1678    pub fn record_plan_revision(&self, row: &PlanRevisionRow) -> Result<()> {
1679        let conn = self.conn.lock().unwrap();
1680        conn.execute(
1681            "INSERT INTO plan_revisions (revision_id, session_id, sequence, plan_json, reason, supersedes, status) \
1682             VALUES (?, ?, ?, ?, ?, ?, ?)",
1683            duckdb::params![
1684                row.revision_id,
1685                row.session_id,
1686                row.sequence,
1687                row.plan_json,
1688                row.reason,
1689                row.supersedes,
1690                row.status,
1691            ],
1692        )?;
1693        Ok(())
1694    }
1695
1696    /// Get the active plan revision for a session.
1697    pub fn get_active_plan_revision(&self, session_id: &str) -> Result<Option<PlanRevisionRow>> {
1698        let conn = self.conn.lock().unwrap();
1699        let mut stmt = conn.prepare(
1700            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1701             FROM plan_revisions WHERE session_id = ? AND status = 'active' \
1702             ORDER BY sequence DESC LIMIT 1",
1703        )?;
1704        let mut rows = stmt.query([session_id])?;
1705        if let Some(row) = rows.next()? {
1706            Ok(Some(PlanRevisionRow {
1707                revision_id: row.get(0)?,
1708                session_id: row.get(1)?,
1709                sequence: row.get(2)?,
1710                plan_json: row.get(3)?,
1711                reason: row.get(4)?,
1712                supersedes: row.get(5)?,
1713                status: row.get(6)?,
1714            }))
1715        } else {
1716            Ok(None)
1717        }
1718    }
1719
1720    /// Get all plan revisions for a session, ordered by sequence.
1721    pub fn get_plan_revisions(&self, session_id: &str) -> Result<Vec<PlanRevisionRow>> {
1722        let conn = self.conn.lock().unwrap();
1723        let mut stmt = conn.prepare(
1724            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1725             FROM plan_revisions WHERE session_id = ? ORDER BY sequence ASC",
1726        )?;
1727        let mut rows = stmt.query([session_id])?;
1728        let mut results = Vec::new();
1729        while let Some(row) = rows.next()? {
1730            results.push(PlanRevisionRow {
1731                revision_id: row.get(0)?,
1732                session_id: row.get(1)?,
1733                sequence: row.get(2)?,
1734                plan_json: row.get(3)?,
1735                reason: row.get(4)?,
1736                supersedes: row.get(5)?,
1737                status: row.get(6)?,
1738            });
1739        }
1740        Ok(results)
1741    }
1742
1743    /// Supersede a plan revision (set status to 'superseded').
1744    pub fn supersede_plan_revision(&self, revision_id: &str) -> Result<()> {
1745        let conn = self.conn.lock().unwrap();
1746        conn.execute(
1747            "UPDATE plan_revisions SET status = 'superseded' WHERE revision_id = ?",
1748            [revision_id],
1749        )?;
1750        Ok(())
1751    }
1752
1753    /// Record a repair footprint.
1754    pub fn record_repair_footprint(&self, row: &RepairFootprintRow) -> Result<()> {
1755        let conn = self.conn.lock().unwrap();
1756        conn.execute(
1757            "INSERT INTO repair_footprints (footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved) \
1758             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
1759            duckdb::params![
1760                row.footprint_id,
1761                row.session_id,
1762                row.node_id,
1763                row.revision_id,
1764                row.attempt,
1765                row.affected_files,
1766                row.bundle_json,
1767                row.diagnosis,
1768                row.resolved,
1769            ],
1770        )?;
1771        Ok(())
1772    }
1773
1774    /// Get repair footprints for a node, ordered by attempt.
1775    pub fn get_repair_footprints(
1776        &self,
1777        session_id: &str,
1778        node_id: &str,
1779    ) -> Result<Vec<RepairFootprintRow>> {
1780        let conn = self.conn.lock().unwrap();
1781        let mut stmt = conn.prepare(
1782            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1783             FROM repair_footprints WHERE session_id = ? AND node_id = ? ORDER BY attempt ASC",
1784        )?;
1785        let mut rows = stmt.query([session_id, node_id])?;
1786        let mut results = Vec::new();
1787        while let Some(row) = rows.next()? {
1788            results.push(RepairFootprintRow {
1789                footprint_id: row.get(0)?,
1790                session_id: row.get(1)?,
1791                node_id: row.get(2)?,
1792                revision_id: row.get(3)?,
1793                attempt: row.get(4)?,
1794                affected_files: row.get(5)?,
1795                bundle_json: row.get(6)?,
1796                diagnosis: row.get(7)?,
1797                resolved: row.get(8)?,
1798            });
1799        }
1800        Ok(results)
1801    }
1802
1803    /// Get all repair footprints for a session (all nodes).
1804    pub fn get_all_repair_footprints(&self, session_id: &str) -> Result<Vec<RepairFootprintRow>> {
1805        let conn = self.conn.lock().unwrap();
1806        let mut stmt = conn.prepare(
1807            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1808             FROM repair_footprints WHERE session_id = ? ORDER BY attempt ASC",
1809        )?;
1810        let mut rows = stmt.query([session_id])?;
1811        let mut results = Vec::new();
1812        while let Some(row) = rows.next()? {
1813            results.push(RepairFootprintRow {
1814                footprint_id: row.get(0)?,
1815                session_id: row.get(1)?,
1816                node_id: row.get(2)?,
1817                revision_id: row.get(3)?,
1818                attempt: row.get(4)?,
1819                affected_files: row.get(5)?,
1820                bundle_json: row.get(6)?,
1821                diagnosis: row.get(7)?,
1822                resolved: row.get(8)?,
1823            });
1824        }
1825        Ok(results)
1826    }
1827
1828    /// Mark a repair footprint as resolved.
1829    pub fn resolve_repair_footprint(&self, footprint_id: &str) -> Result<()> {
1830        let conn = self.conn.lock().unwrap();
1831        conn.execute(
1832            "UPDATE repair_footprints SET resolved = true WHERE footprint_id = ?",
1833            [footprint_id],
1834        )?;
1835        Ok(())
1836    }
1837
1838    /// Record or update a budget envelope for a session.
1839    pub fn upsert_budget_envelope(&self, row: &BudgetEnvelopeRow) -> Result<()> {
1840        let conn = self.conn.lock().unwrap();
1841        // Try insert first, update on conflict
1842        conn.execute(
1843            "INSERT INTO budget_envelopes (session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd) \
1844             VALUES (?, ?, ?, ?, ?, ?, ?) \
1845             ON CONFLICT (session_id) DO UPDATE SET \
1846             max_steps = EXCLUDED.max_steps, steps_used = EXCLUDED.steps_used, \
1847             max_revisions = EXCLUDED.max_revisions, revisions_used = EXCLUDED.revisions_used, \
1848             max_cost_usd = EXCLUDED.max_cost_usd, cost_used_usd = EXCLUDED.cost_used_usd",
1849            duckdb::params![
1850                row.session_id,
1851                row.max_steps,
1852                row.steps_used,
1853                row.max_revisions,
1854                row.revisions_used,
1855                row.max_cost_usd,
1856                row.cost_used_usd,
1857            ],
1858        )?;
1859        Ok(())
1860    }
1861
1862    /// Get the budget envelope for a session.
1863    pub fn get_budget_envelope(&self, session_id: &str) -> Result<Option<BudgetEnvelopeRow>> {
1864        let conn = self.conn.lock().unwrap();
1865        let mut stmt = conn.prepare(
1866            "SELECT session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd \
1867             FROM budget_envelopes WHERE session_id = ?",
1868        )?;
1869        let mut rows = stmt.query([session_id])?;
1870        if let Some(row) = rows.next()? {
1871            Ok(Some(BudgetEnvelopeRow {
1872                session_id: row.get(0)?,
1873                max_steps: row.get(1)?,
1874                steps_used: row.get(2)?,
1875                max_revisions: row.get(3)?,
1876                revisions_used: row.get(4)?,
1877                max_cost_usd: row.get(5)?,
1878                cost_used_usd: row.get(6)?,
1879            }))
1880        } else {
1881            Ok(None)
1882        }
1883    }
1884}
1885
1886// =========================================================================
1887// PSP-7: SRBN Step Records and Correction Attempts
1888// =========================================================================
1889
1890impl SessionStore {
1891    /// Record an orchestration step transition.
1892    pub fn record_step(&self, record: &SrbnStepRecord) -> Result<()> {
1893        let conn = self.conn.lock().unwrap();
1894        conn.execute(
1895            r#"INSERT INTO srbn_step_records
1896               (session_id, node_id, step, outcome, energy_json,
1897                parse_state, retry_classification, attempt_count, duration_ms)
1898               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1899            duckdb::params![
1900                record.session_id,
1901                record.node_id,
1902                record.step,
1903                record.outcome,
1904                record.energy_json,
1905                record.parse_state,
1906                record.retry_classification,
1907                record.attempt_count,
1908                record.duration_ms,
1909            ],
1910        )?;
1911        Ok(())
1912    }
1913
1914    /// Retrieve the step timeline for a given node in chronological order.
1915    pub fn get_step_timeline(
1916        &self,
1917        session_id: &str,
1918        node_id: &str,
1919    ) -> Result<Vec<SrbnStepRecord>> {
1920        let conn = self.conn.lock().unwrap();
1921        let mut stmt = conn.prepare(
1922            r#"SELECT session_id, node_id, step, outcome, energy_json,
1923                      parse_state, retry_classification, attempt_count, duration_ms
1924               FROM srbn_step_records
1925               WHERE session_id = ? AND node_id = ?
1926               ORDER BY id ASC"#,
1927        )?;
1928        let rows = stmt.query_map(duckdb::params![session_id, node_id], |row| {
1929            Ok(SrbnStepRecord {
1930                session_id: row.get(0)?,
1931                node_id: row.get(1)?,
1932                step: row.get(2)?,
1933                outcome: row.get(3)?,
1934                energy_json: row.get(4)?,
1935                parse_state: row.get(5)?,
1936                retry_classification: row.get(6)?,
1937                attempt_count: row.get(7)?,
1938                duration_ms: row.get(8)?,
1939            })
1940        })?;
1941        let mut results = Vec::new();
1942        for row in rows {
1943            results.push(row?);
1944        }
1945        Ok(results)
1946    }
1947
1948    /// Retrieve all step records for a session, ordered by id.
1949    pub fn get_session_steps(&self, session_id: &str) -> Result<Vec<SrbnStepRecord>> {
1950        let conn = self.conn.lock().unwrap();
1951        let mut stmt = conn.prepare(
1952            r#"SELECT session_id, node_id, step, outcome, energy_json,
1953                      parse_state, retry_classification, attempt_count, duration_ms
1954               FROM srbn_step_records
1955               WHERE session_id = ?
1956               ORDER BY id ASC"#,
1957        )?;
1958        let rows = stmt.query_map(duckdb::params![session_id], |row| {
1959            Ok(SrbnStepRecord {
1960                session_id: row.get(0)?,
1961                node_id: row.get(1)?,
1962                step: row.get(2)?,
1963                outcome: row.get(3)?,
1964                energy_json: row.get(4)?,
1965                parse_state: row.get(5)?,
1966                retry_classification: row.get(6)?,
1967                attempt_count: row.get(7)?,
1968                duration_ms: row.get(8)?,
1969            })
1970        })?;
1971        let mut results = Vec::new();
1972        for row in rows {
1973            results.push(row?);
1974        }
1975        Ok(results)
1976    }
1977
1978    /// Record a correction attempt within a convergence loop.
1979    pub fn record_correction_attempt(&self, record: &CorrectionAttemptRow) -> Result<()> {
1980        let conn = self.conn.lock().unwrap();
1981        conn.execute(
1982            r#"INSERT INTO correction_attempts
1983               (session_id, node_id, attempt, parse_state, retry_classification,
1984                response_fingerprint, response_length, energy_json,
1985                accepted, rejection_reason, created_at)
1986               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1987            duckdb::params![
1988                record.session_id,
1989                record.node_id,
1990                record.attempt,
1991                record.parse_state,
1992                record.retry_classification,
1993                record.response_fingerprint,
1994                record.response_length,
1995                record.energy_json,
1996                record.accepted,
1997                record.rejection_reason,
1998                record.created_at,
1999            ],
2000        )?;
2001        Ok(())
2002    }
2003
2004    /// Retrieve all correction attempts for a node, ordered by attempt number.
2005    pub fn get_correction_attempts(
2006        &self,
2007        session_id: &str,
2008        node_id: &str,
2009    ) -> Result<Vec<CorrectionAttemptRow>> {
2010        let conn = self.conn.lock().unwrap();
2011        let mut stmt = conn.prepare(
2012            r#"SELECT session_id, node_id, attempt, parse_state, retry_classification,
2013                      response_fingerprint, response_length, energy_json,
2014                      accepted, rejection_reason, created_at
2015               FROM correction_attempts
2016               WHERE session_id = ? AND node_id = ?
2017               ORDER BY attempt ASC"#,
2018        )?;
2019        let rows = stmt.query_map(duckdb::params![session_id, node_id], |row| {
2020            Ok(CorrectionAttemptRow {
2021                session_id: row.get(0)?,
2022                node_id: row.get(1)?,
2023                attempt: row.get(2)?,
2024                parse_state: row.get(3)?,
2025                retry_classification: row.get(4)?,
2026                response_fingerprint: row.get(5)?,
2027                response_length: row.get(6)?,
2028                energy_json: row.get(7)?,
2029                accepted: row.get(8)?,
2030                rejection_reason: row.get(9)?,
2031                created_at: row.get(10)?,
2032            })
2033        })?;
2034        let mut results = Vec::new();
2035        for row in rows {
2036            results.push(row?);
2037        }
2038        Ok(results)
2039    }
2040
2041    /// Retrieve all correction attempts for a session, ordered by node then attempt.
2042    pub fn get_session_correction_attempts(
2043        &self,
2044        session_id: &str,
2045    ) -> Result<Vec<CorrectionAttemptRow>> {
2046        let conn = self.conn.lock().unwrap();
2047        let mut stmt = conn.prepare(
2048            r#"SELECT session_id, node_id, attempt, parse_state, retry_classification,
2049                      response_fingerprint, response_length, energy_json,
2050                      accepted, rejection_reason, created_at
2051               FROM correction_attempts
2052               WHERE session_id = ?
2053               ORDER BY node_id ASC, attempt ASC"#,
2054        )?;
2055        let rows = stmt.query_map(duckdb::params![session_id], |row| {
2056            Ok(CorrectionAttemptRow {
2057                session_id: row.get(0)?,
2058                node_id: row.get(1)?,
2059                attempt: row.get(2)?,
2060                parse_state: row.get(3)?,
2061                retry_classification: row.get(4)?,
2062                response_fingerprint: row.get(5)?,
2063                response_length: row.get(6)?,
2064                energy_json: row.get(7)?,
2065                accepted: row.get(8)?,
2066                rejection_reason: row.get(9)?,
2067                created_at: row.get(10)?,
2068            })
2069        })?;
2070        let mut results = Vec::new();
2071        for row in rows {
2072            results.push(row?);
2073        }
2074        Ok(results)
2075    }
2076}
2077
2078#[cfg(test)]
2079mod tests {
2080    use super::*;
2081
2082    /// Create an in-memory store for testing
2083    fn test_store() -> SessionStore {
2084        let temp_dir = std::env::temp_dir();
2085        let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
2086        SessionStore::open(&db_path).expect("Failed to create test store")
2087    }
2088
2089    fn seed_session(store: &SessionStore, session_id: &str) {
2090        let record = SessionRecord {
2091            session_id: session_id.to_string(),
2092            task: "test task".to_string(),
2093            working_dir: "/tmp/test".to_string(),
2094            merkle_root: None,
2095            detected_toolchain: None,
2096            status: "RUNNING".to_string(),
2097        };
2098        store.create_session(&record).unwrap();
2099    }
2100
2101    #[test]
2102    fn test_node_state_phase8_roundtrip() {
2103        let store = test_store();
2104        let sid = "test-sess-1";
2105        seed_session(&store, sid);
2106
2107        let record = NodeStateRecord {
2108            node_id: "node-1".to_string(),
2109            session_id: sid.to_string(),
2110            state: "Completed".to_string(),
2111            v_total: 0.42,
2112            merkle_hash: Some(vec![0xab; 32]),
2113            attempt_count: 3,
2114            node_class: Some("Interface".to_string()),
2115            owner_plugin: Some("rust".to_string()),
2116            goal: Some("Implement API".to_string()),
2117            parent_id: Some("root".to_string()),
2118            children: Some(r#"["child-a","child-b"]"#.to_string()),
2119            last_error_type: Some("CompilationError".to_string()),
2120            committed_at: Some("2025-01-01T00:00:00Z".to_string()),
2121        };
2122
2123        store.record_node_state(&record).unwrap();
2124
2125        let states = store.get_latest_node_states(sid).unwrap();
2126        assert_eq!(states.len(), 1);
2127        let r = &states[0];
2128        assert_eq!(r.node_id, "node-1");
2129        assert_eq!(r.state, "Completed");
2130        assert_eq!(r.attempt_count, 3);
2131        assert_eq!(r.node_class.as_deref(), Some("Interface"));
2132        assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
2133        assert_eq!(r.goal.as_deref(), Some("Implement API"));
2134        assert_eq!(r.parent_id.as_deref(), Some("root"));
2135        assert!(r.children.is_some());
2136        assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
2137        assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
2138    }
2139
2140    #[test]
2141    fn test_task_graph_edge_roundtrip() {
2142        let store = test_store();
2143        let sid = "test-graph-1";
2144        seed_session(&store, sid);
2145
2146        let edge = TaskGraphEdgeRow {
2147            session_id: sid.to_string(),
2148            parent_node_id: "parent-1".to_string(),
2149            child_node_id: "child-1".to_string(),
2150            edge_type: "depends_on".to_string(),
2151        };
2152        store.record_task_graph_edge(&edge).unwrap();
2153
2154        let edges = store.get_task_graph_edges(sid).unwrap();
2155        assert_eq!(edges.len(), 1);
2156        assert_eq!(edges[0].parent_node_id, "parent-1");
2157        assert_eq!(edges[0].child_node_id, "child-1");
2158        assert_eq!(edges[0].edge_type, "depends_on");
2159    }
2160
2161    #[test]
2162    fn test_verification_result_roundtrip() {
2163        let store = test_store();
2164        let sid = "test-vr-1";
2165        seed_session(&store, sid);
2166
2167        let row = VerificationResultRow {
2168            session_id: sid.to_string(),
2169            node_id: "node-v".to_string(),
2170            result_json: r#"{"syntax_ok":true}"#.to_string(),
2171            syntax_ok: true,
2172            build_ok: true,
2173            tests_ok: false,
2174            lint_ok: true,
2175            diagnostics_count: 2,
2176            tests_passed: 5,
2177            tests_failed: 1,
2178            degraded: false,
2179            degraded_reason: None,
2180        };
2181        store.record_verification_result(&row).unwrap();
2182
2183        let got = store.get_verification_result(sid, "node-v").unwrap();
2184        assert!(got.is_some());
2185        let got = got.unwrap();
2186        assert!(got.syntax_ok);
2187        assert!(got.build_ok);
2188        assert!(!got.tests_ok);
2189        assert_eq!(got.tests_passed, 5);
2190        assert_eq!(got.tests_failed, 1);
2191        assert!(!got.degraded);
2192    }
2193
2194    #[test]
2195    fn test_verification_result_degraded() {
2196        let store = test_store();
2197        let sid = "test-vr-deg";
2198        seed_session(&store, sid);
2199
2200        let row = VerificationResultRow {
2201            session_id: sid.to_string(),
2202            node_id: "node-d".to_string(),
2203            result_json: "{}".to_string(),
2204            syntax_ok: true,
2205            build_ok: false,
2206            tests_ok: false,
2207            lint_ok: false,
2208            diagnostics_count: 0,
2209            tests_passed: 0,
2210            tests_failed: 0,
2211            degraded: true,
2212            degraded_reason: Some("LSP unavailable".to_string()),
2213        };
2214        store.record_verification_result(&row).unwrap();
2215
2216        let got = store
2217            .get_verification_result(sid, "node-d")
2218            .unwrap()
2219            .unwrap();
2220        assert!(got.degraded);
2221        assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
2222    }
2223
2224    #[test]
2225    fn test_artifact_bundle_roundtrip() {
2226        let store = test_store();
2227        let sid = "test-ab-1";
2228        seed_session(&store, sid);
2229
2230        let row = ArtifactBundleRow {
2231            session_id: sid.to_string(),
2232            node_id: "node-a".to_string(),
2233            bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
2234            artifact_count: 3,
2235            command_count: 1,
2236            touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
2237        };
2238        store.record_artifact_bundle(&row).unwrap();
2239
2240        let got = store.get_artifact_bundle(sid, "node-a").unwrap();
2241        assert!(got.is_some());
2242        let got = got.unwrap();
2243        assert_eq!(got.artifact_count, 3);
2244        assert_eq!(got.command_count, 1);
2245        assert!(got.touched_files.contains("main.rs"));
2246    }
2247
2248    #[test]
2249    fn test_latest_node_states_dedup() {
2250        let store = test_store();
2251        let sid = "test-dedup";
2252        seed_session(&store, sid);
2253
2254        // Insert two states for the same node
2255        let r1 = NodeStateRecord {
2256            node_id: "node-x".to_string(),
2257            session_id: sid.to_string(),
2258            state: "Coding".to_string(),
2259            v_total: 0.5,
2260            merkle_hash: None,
2261            attempt_count: 1,
2262            node_class: None,
2263            owner_plugin: None,
2264            goal: None,
2265            parent_id: None,
2266            children: None,
2267            last_error_type: None,
2268            committed_at: None,
2269        };
2270        store.record_node_state(&r1).unwrap();
2271
2272        let r2 = NodeStateRecord {
2273            node_id: "node-x".to_string(),
2274            session_id: sid.to_string(),
2275            state: "Completed".to_string(),
2276            v_total: 0.3,
2277            merkle_hash: None,
2278            attempt_count: 2,
2279            node_class: Some("Implementation".to_string()),
2280            owner_plugin: None,
2281            goal: Some("Updated goal".to_string()),
2282            parent_id: None,
2283            children: None,
2284            last_error_type: None,
2285            committed_at: Some("2025-01-02T00:00:00Z".to_string()),
2286        };
2287        store.record_node_state(&r2).unwrap();
2288
2289        // get_latest should return only the last entry
2290        let latest = store.get_latest_node_states(sid).unwrap();
2291        assert_eq!(latest.len(), 1);
2292        assert_eq!(latest[0].state, "Completed");
2293        assert_eq!(latest[0].attempt_count, 2);
2294        assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
2295    }
2296
2297    #[test]
2298    fn test_backward_compat_empty_phase8_fields() {
2299        let store = test_store();
2300        let sid = "test-compat";
2301        seed_session(&store, sid);
2302
2303        // Insert a node with all Phase 8 fields as None (pre-Phase-8 session)
2304        let r = NodeStateRecord {
2305            node_id: "old-node".to_string(),
2306            session_id: sid.to_string(),
2307            state: "COMPLETED".to_string(),
2308            v_total: 1.0,
2309            merkle_hash: None,
2310            attempt_count: 1,
2311            node_class: None,
2312            owner_plugin: None,
2313            goal: None,
2314            parent_id: None,
2315            children: None,
2316            last_error_type: None,
2317            committed_at: None,
2318        };
2319        store.record_node_state(&r).unwrap();
2320
2321        let latest = store.get_latest_node_states(sid).unwrap();
2322        assert_eq!(latest.len(), 1);
2323        assert!(latest[0].node_class.is_none());
2324        assert!(latest[0].goal.is_none());
2325        assert!(latest[0].committed_at.is_none());
2326
2327        // Verification and artifact lookups should return None
2328        let vr = store.get_verification_result(sid, "old-node").unwrap();
2329        assert!(vr.is_none());
2330        let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
2331        assert!(ab.is_none());
2332    }
2333
2334    #[test]
2335    fn test_review_outcome_roundtrip() {
2336        let store = test_store();
2337        let sid = "test-review";
2338        seed_session(&store, sid);
2339
2340        let row = ReviewOutcomeRow {
2341            session_id: sid.to_string(),
2342            node_id: "node-r".to_string(),
2343            outcome: "approved".to_string(),
2344            reviewer_note: Some("LGTM".to_string()),
2345            energy_at_review: None,
2346            degraded: None,
2347            escalation_category: None,
2348        };
2349        store.record_review_outcome(&row).unwrap();
2350
2351        let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
2352        assert_eq!(outcomes.len(), 1);
2353        assert_eq!(outcomes[0].outcome, "approved");
2354        assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
2355    }
2356
2357    #[test]
2358    fn test_review_outcome_with_audit_fields() {
2359        let store = test_store();
2360        let sid = "test-review-audit";
2361        seed_session(&store, sid);
2362
2363        let row = ReviewOutcomeRow {
2364            session_id: sid.to_string(),
2365            node_id: "node-a".to_string(),
2366            outcome: "rejected".to_string(),
2367            reviewer_note: Some("Needs rework".to_string()),
2368            energy_at_review: Some(0.42),
2369            degraded: Some(true),
2370            escalation_category: Some("complexity".to_string()),
2371        };
2372        store.record_review_outcome(&row).unwrap();
2373
2374        let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
2375        assert_eq!(outcomes.len(), 1);
2376        assert_eq!(outcomes[0].outcome, "rejected");
2377        assert_eq!(outcomes[0].energy_at_review, Some(0.42));
2378        assert_eq!(outcomes[0].degraded, Some(true));
2379        assert_eq!(
2380            outcomes[0].escalation_category.as_deref(),
2381            Some("complexity")
2382        );
2383    }
2384
2385    #[test]
2386    fn test_get_all_review_outcomes() {
2387        let store = test_store();
2388        let sid = "test-review-all";
2389        seed_session(&store, sid);
2390
2391        for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
2392            let row = ReviewOutcomeRow {
2393                session_id: sid.to_string(),
2394                node_id: node.to_string(),
2395                outcome: outcome.to_string(),
2396                reviewer_note: None,
2397                energy_at_review: None,
2398                degraded: None,
2399                escalation_category: None,
2400            };
2401            store.record_review_outcome(&row).unwrap();
2402        }
2403
2404        let all = store.get_all_review_outcomes(sid).unwrap();
2405        assert_eq!(all.len(), 3);
2406    }
2407
2408    #[test]
2409    fn test_feature_charter_roundtrip() {
2410        let store = test_store();
2411        let sid = "test-charter";
2412        seed_session(&store, sid);
2413
2414        let row = FeatureCharterRow {
2415            charter_id: "ch-1".to_string(),
2416            session_id: sid.to_string(),
2417            scope_description: "Add authentication module".to_string(),
2418            max_modules: Some(3),
2419            max_files: Some(10),
2420            max_revisions: Some(5),
2421            language_constraint: Some("rust".to_string()),
2422        };
2423        store.record_feature_charter(&row).unwrap();
2424
2425        let got = store.get_feature_charter(sid).unwrap();
2426        assert!(got.is_some());
2427        let got = got.unwrap();
2428        assert_eq!(got.charter_id, "ch-1");
2429        assert_eq!(got.scope_description, "Add authentication module");
2430        assert_eq!(got.max_modules, Some(3));
2431        assert_eq!(got.language_constraint.as_deref(), Some("rust"));
2432    }
2433
2434    #[test]
2435    fn test_feature_charter_returns_none_for_missing() {
2436        let store = test_store();
2437        let sid = "test-charter-miss";
2438        seed_session(&store, sid);
2439
2440        let got = store.get_feature_charter(sid).unwrap();
2441        assert!(got.is_none());
2442    }
2443
2444    #[test]
2445    fn test_plan_revision_roundtrip() {
2446        let store = test_store();
2447        let sid = "test-rev";
2448        seed_session(&store, sid);
2449
2450        let row = PlanRevisionRow {
2451            revision_id: "rev-1".to_string(),
2452            session_id: sid.to_string(),
2453            sequence: 1,
2454            plan_json: r#"{"tasks":[]}"#.to_string(),
2455            reason: "initial plan".to_string(),
2456            supersedes: None,
2457            status: "active".to_string(),
2458        };
2459        store.record_plan_revision(&row).unwrap();
2460
2461        let active = store.get_active_plan_revision(sid).unwrap();
2462        assert!(active.is_some());
2463        let active = active.unwrap();
2464        assert_eq!(active.revision_id, "rev-1");
2465        assert_eq!(active.sequence, 1);
2466        assert_eq!(active.status, "active");
2467    }
2468
2469    #[test]
2470    fn test_plan_revision_supersede() {
2471        let store = test_store();
2472        let sid = "test-rev-sup";
2473        seed_session(&store, sid);
2474
2475        let r1 = PlanRevisionRow {
2476            revision_id: "rev-1".to_string(),
2477            session_id: sid.to_string(),
2478            sequence: 1,
2479            plan_json: "{}".to_string(),
2480            reason: "initial".to_string(),
2481            supersedes: None,
2482            status: "active".to_string(),
2483        };
2484        store.record_plan_revision(&r1).unwrap();
2485
2486        // Supersede rev-1
2487        store.supersede_plan_revision("rev-1").unwrap();
2488
2489        let r2 = PlanRevisionRow {
2490            revision_id: "rev-2".to_string(),
2491            session_id: sid.to_string(),
2492            sequence: 2,
2493            plan_json: r#"{"tasks":["a"]}"#.to_string(),
2494            reason: "verifier feedback".to_string(),
2495            supersedes: Some("rev-1".to_string()),
2496            status: "active".to_string(),
2497        };
2498        store.record_plan_revision(&r2).unwrap();
2499
2500        // Only rev-2 should be active
2501        let active = store.get_active_plan_revision(sid).unwrap().unwrap();
2502        assert_eq!(active.revision_id, "rev-2");
2503
2504        // All revisions returned in order
2505        let all = store.get_plan_revisions(sid).unwrap();
2506        assert_eq!(all.len(), 2);
2507        assert_eq!(all[0].status, "superseded");
2508        assert_eq!(all[1].status, "active");
2509    }
2510
2511    #[test]
2512    fn test_repair_footprint_roundtrip() {
2513        let store = test_store();
2514        let sid = "test-repair";
2515        seed_session(&store, sid);
2516
2517        let row = RepairFootprintRow {
2518            footprint_id: "fp-1".to_string(),
2519            session_id: sid.to_string(),
2520            node_id: "node-a".to_string(),
2521            revision_id: "rev-1".to_string(),
2522            attempt: 1,
2523            affected_files: r#"["src/main.rs"]"#.to_string(),
2524            bundle_json: "{}".to_string(),
2525            diagnosis: "missing import".to_string(),
2526            resolved: false,
2527        };
2528        store.record_repair_footprint(&row).unwrap();
2529
2530        let footprints = store.get_repair_footprints(sid, "node-a").unwrap();
2531        assert_eq!(footprints.len(), 1);
2532        assert_eq!(footprints[0].footprint_id, "fp-1");
2533        assert_eq!(footprints[0].diagnosis, "missing import");
2534        assert!(!footprints[0].resolved);
2535    }
2536
2537    #[test]
2538    fn test_repair_footprint_resolve() {
2539        let store = test_store();
2540        let sid = "test-repair-res";
2541        seed_session(&store, sid);
2542
2543        let row = RepairFootprintRow {
2544            footprint_id: "fp-2".to_string(),
2545            session_id: sid.to_string(),
2546            node_id: "node-b".to_string(),
2547            revision_id: "rev-1".to_string(),
2548            attempt: 1,
2549            affected_files: "[]".to_string(),
2550            bundle_json: "{}".to_string(),
2551            diagnosis: "type error".to_string(),
2552            resolved: false,
2553        };
2554        store.record_repair_footprint(&row).unwrap();
2555
2556        store.resolve_repair_footprint("fp-2").unwrap();
2557
2558        let footprints = store.get_repair_footprints(sid, "node-b").unwrap();
2559        assert_eq!(footprints.len(), 1);
2560        assert!(footprints[0].resolved);
2561    }
2562
2563    #[test]
2564    fn test_budget_envelope_upsert_and_get() {
2565        let store = test_store();
2566        let sid = "test-budget";
2567        seed_session(&store, sid);
2568
2569        let row = BudgetEnvelopeRow {
2570            session_id: sid.to_string(),
2571            max_steps: Some(100),
2572            steps_used: 5,
2573            max_revisions: Some(10),
2574            revisions_used: 1,
2575            max_cost_usd: Some(5.0),
2576            cost_used_usd: 0.25,
2577        };
2578        store.upsert_budget_envelope(&row).unwrap();
2579
2580        let got = store.get_budget_envelope(sid).unwrap();
2581        assert!(got.is_some());
2582        let got = got.unwrap();
2583        assert_eq!(got.max_steps, Some(100));
2584        assert_eq!(got.steps_used, 5);
2585        assert_eq!(got.cost_used_usd, 0.25);
2586    }
2587
2588    #[test]
2589    fn test_budget_envelope_upsert_updates() {
2590        let store = test_store();
2591        let sid = "test-budget-up";
2592        seed_session(&store, sid);
2593
2594        let row1 = BudgetEnvelopeRow {
2595            session_id: sid.to_string(),
2596            max_steps: Some(100),
2597            steps_used: 0,
2598            max_revisions: None,
2599            revisions_used: 0,
2600            max_cost_usd: None,
2601            cost_used_usd: 0.0,
2602        };
2603        store.upsert_budget_envelope(&row1).unwrap();
2604
2605        // Update with new values
2606        let row2 = BudgetEnvelopeRow {
2607            session_id: sid.to_string(),
2608            max_steps: Some(100),
2609            steps_used: 42,
2610            max_revisions: Some(5),
2611            revisions_used: 3,
2612            max_cost_usd: Some(10.0),
2613            cost_used_usd: 4.5,
2614        };
2615        store.upsert_budget_envelope(&row2).unwrap();
2616
2617        let got = store.get_budget_envelope(sid).unwrap().unwrap();
2618        assert_eq!(got.steps_used, 42);
2619        assert_eq!(got.revisions_used, 3);
2620        assert_eq!(got.cost_used_usd, 4.5);
2621    }
2622
2623    #[test]
2624    fn test_budget_envelope_missing_returns_none() {
2625        let store = test_store();
2626        let sid = "test-budget-miss";
2627        seed_session(&store, sid);
2628
2629        let got = store.get_budget_envelope(sid).unwrap();
2630        assert!(got.is_none());
2631    }
2632
2633    #[test]
2634    fn test_read_only_store_queries_work() {
2635        let temp_dir = std::env::temp_dir();
2636        let db_path = temp_dir.join(format!("perspt_ro_test_{}.db", uuid::Uuid::new_v4()));
2637
2638        // Create and seed a normal store
2639        {
2640            let store = SessionStore::open(&db_path).unwrap();
2641            seed_session(&store, "ro-test");
2642        }
2643
2644        // Open read-only and verify queries work
2645        let ro = SessionStore::open_read_only(&db_path).unwrap();
2646        let sessions = ro.list_recent_sessions(10).unwrap();
2647        assert_eq!(sessions.len(), 1);
2648        assert_eq!(sessions[0].session_id, "ro-test");
2649    }
2650
2651    #[test]
2652    fn test_read_only_store_rejects_writes() {
2653        let temp_dir = std::env::temp_dir();
2654        let db_path = temp_dir.join(format!("perspt_ro_wr_{}.db", uuid::Uuid::new_v4()));
2655
2656        // Create the DB first
2657        {
2658            let _store = SessionStore::open(&db_path).unwrap();
2659        }
2660
2661        // Open read-only and verify writes fail
2662        let ro = SessionStore::open_read_only(&db_path).unwrap();
2663        let record = SessionRecord {
2664            session_id: "should-fail".to_string(),
2665            task: "test".to_string(),
2666            working_dir: "/tmp".to_string(),
2667            merkle_root: None,
2668            detected_toolchain: None,
2669            status: "RUNNING".to_string(),
2670        };
2671        assert!(ro.create_session(&record).is_err());
2672    }
2673
2674    // =================================================================
2675    // PSP-7: SRBN step records and correction attempts round-trip tests
2676    // =================================================================
2677
2678    #[test]
2679    fn test_srbn_step_record_roundtrip() {
2680        let store = test_store();
2681        let sid = "step-sess";
2682        seed_session(&store, sid);
2683
2684        let r1 = SrbnStepRecord {
2685            session_id: sid.to_string(),
2686            node_id: "n1".to_string(),
2687            step: "speculate".to_string(),
2688            outcome: "ok".to_string(),
2689            energy_json: Some(r#"{"v_syn":0.0}"#.to_string()),
2690            parse_state: Some("FullyParsed".to_string()),
2691            retry_classification: None,
2692            attempt_count: 0,
2693            duration_ms: 120,
2694        };
2695        let r2 = SrbnStepRecord {
2696            session_id: sid.to_string(),
2697            node_id: "n1".to_string(),
2698            step: "verify".to_string(),
2699            outcome: "retry".to_string(),
2700            energy_json: Some(r#"{"v_syn":5.0}"#.to_string()),
2701            parse_state: None,
2702            retry_classification: Some("MalformedResponse".to_string()),
2703            attempt_count: 1,
2704            duration_ms: 300,
2705        };
2706
2707        store.record_step(&r1).unwrap();
2708        store.record_step(&r2).unwrap();
2709
2710        let timeline = store.get_step_timeline(sid, "n1").unwrap();
2711        assert_eq!(timeline.len(), 2);
2712        assert_eq!(timeline[0].step, "speculate");
2713        assert_eq!(timeline[0].outcome, "ok");
2714        assert_eq!(timeline[0].duration_ms, 120);
2715        assert_eq!(timeline[1].step, "verify");
2716        assert_eq!(
2717            timeline[1].retry_classification.as_deref(),
2718            Some("MalformedResponse")
2719        );
2720
2721        let all = store.get_session_steps(sid).unwrap();
2722        assert_eq!(all.len(), 2);
2723    }
2724
2725    #[test]
2726    fn test_correction_attempt_roundtrip() {
2727        let store = test_store();
2728        let sid = "corr-sess";
2729        seed_session(&store, sid);
2730
2731        let a1 = CorrectionAttemptRow {
2732            session_id: sid.to_string(),
2733            node_id: "n1".to_string(),
2734            attempt: 1,
2735            parse_state: "FullyParsed".to_string(),
2736            retry_classification: None,
2737            response_fingerprint: "abc123".to_string(),
2738            response_length: 4096,
2739            energy_json: Some(r#"{"v_syn":2.0}"#.to_string()),
2740            accepted: false,
2741            rejection_reason: Some("v_syn too high".to_string()),
2742            created_at: 1700000000,
2743        };
2744        let a2 = CorrectionAttemptRow {
2745            session_id: sid.to_string(),
2746            node_id: "n1".to_string(),
2747            attempt: 2,
2748            parse_state: "FullyParsed".to_string(),
2749            retry_classification: None,
2750            response_fingerprint: "def456".to_string(),
2751            response_length: 3800,
2752            energy_json: Some(r#"{"v_syn":0.0}"#.to_string()),
2753            accepted: true,
2754            rejection_reason: None,
2755            created_at: 1700000010,
2756        };
2757
2758        store.record_correction_attempt(&a1).unwrap();
2759        store.record_correction_attempt(&a2).unwrap();
2760
2761        let attempts = store.get_correction_attempts(sid, "n1").unwrap();
2762        assert_eq!(attempts.len(), 2);
2763        assert_eq!(attempts[0].attempt, 1);
2764        assert!(!attempts[0].accepted);
2765        assert_eq!(
2766            attempts[0].rejection_reason.as_deref(),
2767            Some("v_syn too high")
2768        );
2769        assert_eq!(attempts[1].attempt, 2);
2770        assert!(attempts[1].accepted);
2771        assert!(attempts[1].rejection_reason.is_none());
2772    }
2773
2774    #[test]
2775    fn test_step_timeline_filters_by_node() {
2776        let store = test_store();
2777        let sid = "filter-sess";
2778        seed_session(&store, sid);
2779
2780        for (node, step) in [("n1", "speculate"), ("n2", "speculate"), ("n1", "verify")] {
2781            store
2782                .record_step(&SrbnStepRecord {
2783                    session_id: sid.to_string(),
2784                    node_id: node.to_string(),
2785                    step: step.to_string(),
2786                    outcome: "ok".to_string(),
2787                    energy_json: None,
2788                    parse_state: None,
2789                    retry_classification: None,
2790                    attempt_count: 0,
2791                    duration_ms: 0,
2792                })
2793                .unwrap();
2794        }
2795
2796        assert_eq!(store.get_step_timeline(sid, "n1").unwrap().len(), 2);
2797        assert_eq!(store.get_step_timeline(sid, "n2").unwrap().len(), 1);
2798        assert_eq!(store.get_session_steps(sid).unwrap().len(), 3);
2799    }
2800}