1use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9
10use crate::schema::init_schema;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct SessionRecord {
15 pub session_id: String,
16 pub task: String,
17 pub working_dir: String,
18 pub merkle_root: Option<Vec<u8>>,
19 pub detected_toolchain: Option<String>,
20 pub status: String,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeStateRecord {
26 pub node_id: String,
27 pub session_id: String,
28 pub state: String,
29 pub v_total: f32,
30 pub merkle_hash: Option<Vec<u8>>,
31 pub attempt_count: i32,
32 pub node_class: Option<String>,
34 pub owner_plugin: Option<String>,
35 pub goal: Option<String>,
36 pub parent_id: Option<String>,
37 pub children: Option<String>,
39 pub last_error_type: Option<String>,
40 pub committed_at: Option<String>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct EnergyRecord {
46 pub node_id: String,
47 pub session_id: String,
48 pub v_syn: f32,
49 pub v_str: f32,
50 pub v_log: f32,
51 pub v_boot: f32,
52 pub v_sheaf: f32,
53 pub v_total: f32,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct LlmRequestRecord {
59 pub session_id: String,
60 pub node_id: Option<String>,
61 pub model: String,
62 pub prompt: String,
63 pub response: String,
64 pub tokens_in: i32,
65 pub tokens_out: i32,
66 pub latency_ms: i32,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StructuralDigestRecord {
72 pub digest_id: String,
73 pub session_id: String,
74 pub node_id: String,
75 pub source_path: String,
76 pub artifact_kind: String,
77 pub hash: Vec<u8>,
78 pub version: i32,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ContextProvenanceRecord {
84 pub session_id: String,
85 pub node_id: String,
86 pub context_package_id: String,
87 pub structural_hashes: String,
89 pub summary_hashes: String,
91 pub dependency_hashes: String,
93 pub included_file_count: i32,
94 pub total_bytes: i32,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct EscalationReportRecord {
100 pub session_id: String,
101 pub node_id: String,
102 pub category: String,
104 pub action: String,
106 pub energy_snapshot: String,
108 pub stage_outcomes: String,
110 pub evidence: String,
112 pub affected_node_ids: String,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct RewriteRecordRow {
119 pub session_id: String,
120 pub node_id: String,
121 pub action: String,
123 pub category: String,
125 pub requeued_nodes: String,
127 pub inserted_nodes: String,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SheafValidationRow {
134 pub session_id: String,
135 pub node_id: String,
136 pub validator_class: String,
137 pub plugin_source: Option<String>,
138 pub passed: bool,
139 pub evidence_summary: String,
140 pub affected_files: String,
142 pub v_sheaf_contribution: f32,
143 pub requeue_targets: String,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ProvisionalBranchRow {
154 pub branch_id: String,
155 pub session_id: String,
156 pub node_id: String,
157 pub parent_node_id: String,
158 pub state: String,
159 pub parent_seal_hash: Option<Vec<u8>>,
160 pub sandbox_dir: Option<String>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct BranchLineageRow {
166 pub lineage_id: String,
167 pub parent_branch_id: String,
168 pub child_branch_id: String,
169 pub depends_on_seal: bool,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct InterfaceSealRow {
175 pub seal_id: String,
176 pub session_id: String,
177 pub node_id: String,
178 pub sealed_path: String,
179 pub artifact_kind: String,
180 pub seal_hash: Vec<u8>,
181 pub version: i32,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct BranchFlushRow {
187 pub flush_id: String,
188 pub session_id: String,
189 pub parent_node_id: String,
190 pub flushed_branch_ids: String,
192 pub requeue_node_ids: String,
194 pub reason: String,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct TaskGraphEdgeRow {
204 pub session_id: String,
205 pub parent_node_id: String,
206 pub child_node_id: String,
207 pub edge_type: String,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct ReviewOutcomeRow {
213 pub session_id: String,
214 pub node_id: String,
215 pub outcome: String,
217 pub reviewer_note: Option<String>,
218 pub energy_at_review: Option<f64>,
220 pub degraded: Option<bool>,
222 pub escalation_category: Option<String>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct VerificationResultRow {
229 pub session_id: String,
230 pub node_id: String,
231 pub result_json: String,
233 pub syntax_ok: bool,
235 pub build_ok: bool,
236 pub tests_ok: bool,
237 pub lint_ok: bool,
238 pub diagnostics_count: i32,
239 pub tests_passed: i32,
240 pub tests_failed: i32,
241 pub degraded: bool,
242 pub degraded_reason: Option<String>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct ArtifactBundleRow {
248 pub session_id: String,
249 pub node_id: String,
250 pub bundle_json: String,
252 pub artifact_count: i32,
253 pub command_count: i32,
254 pub touched_files: String,
256}
257
258#[derive(Debug, Clone)]
264pub struct FeatureCharterRow {
265 pub charter_id: String,
266 pub session_id: String,
267 pub scope_description: String,
268 pub max_modules: Option<i32>,
269 pub max_files: Option<i32>,
270 pub max_revisions: Option<i32>,
271 pub language_constraint: Option<String>,
272}
273
274#[derive(Debug, Clone)]
276pub struct PlanRevisionRow {
277 pub revision_id: String,
278 pub session_id: String,
279 pub sequence: i32,
280 pub plan_json: String,
281 pub reason: String,
282 pub supersedes: Option<String>,
283 pub status: String,
284}
285
286#[derive(Debug, Clone)]
288pub struct RepairFootprintRow {
289 pub footprint_id: String,
290 pub session_id: String,
291 pub node_id: String,
292 pub revision_id: String,
293 pub attempt: i32,
294 pub affected_files: String,
295 pub bundle_json: String,
296 pub diagnosis: String,
297 pub resolved: bool,
298}
299
300#[derive(Debug, Clone)]
302pub struct BudgetEnvelopeRow {
303 pub session_id: String,
304 pub max_steps: Option<i32>,
305 pub steps_used: i32,
306 pub max_revisions: Option<i32>,
307 pub revisions_used: i32,
308 pub max_cost_usd: Option<f64>,
309 pub cost_used_usd: f64,
310}
311
312#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct SrbnStepRecord {
319 pub session_id: String,
320 pub node_id: String,
321 pub step: String,
323 pub outcome: String,
325 pub energy_json: Option<String>,
327 pub parse_state: Option<String>,
329 pub retry_classification: Option<String>,
331 pub attempt_count: i32,
333 pub duration_ms: i32,
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct CorrectionAttemptRow {
340 pub session_id: String,
341 pub node_id: String,
342 pub attempt: i32,
343 pub parse_state: String,
344 pub retry_classification: Option<String>,
345 pub response_fingerprint: String,
346 pub response_length: i32,
347 pub energy_json: Option<String>,
349 pub accepted: bool,
350 pub rejection_reason: Option<String>,
351 pub created_at: i64,
353}
354
355use std::sync::Mutex;
356
357pub struct SessionStore {
359 conn: Mutex<Connection>,
360}
361
362impl SessionStore {
363 pub fn new() -> Result<Self> {
365 let db_path = Self::default_db_path()?;
366 Self::open(&db_path)
367 }
368
369 pub fn open(path: &PathBuf) -> Result<Self> {
371 if let Some(parent) = path.parent() {
373 std::fs::create_dir_all(parent)?;
374 }
375
376 let conn = Connection::open(path).context("Failed to open DuckDB")?;
377 init_schema(&conn)?;
378
379 Ok(Self {
380 conn: Mutex::new(conn),
381 })
382 }
383
384 pub fn open_read_only(path: &std::path::Path) -> Result<Self> {
390 let config = duckdb::Config::default()
391 .access_mode(duckdb::AccessMode::ReadOnly)
392 .context("Failed to configure DuckDB read-only mode")?;
393 let conn = Connection::open_with_flags(path, config)
394 .context("Failed to open DuckDB in read-only mode")?;
395 Ok(Self {
396 conn: Mutex::new(conn),
397 })
398 }
399
400 pub fn default_db_path() -> Result<PathBuf> {
402 perspt_core::paths::database_path().context("Could not determine platform data directory")
403 }
404
405 pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
407 self.conn.lock().unwrap().execute(
408 r#"
409 INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
410 VALUES (?, ?, ?, ?, ?, ?)
411 "#,
412 [
413 &session.session_id,
414 &session.task,
415 &session.working_dir,
416 &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
417 &session.detected_toolchain.clone().unwrap_or_default(),
418 &session.status,
419 ],
420 )?;
421 Ok(())
422 }
423
424 pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
426 self.conn.lock().unwrap().execute(
427 "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
428 [hex::encode(merkle_root), session_id.to_string()],
429 )?;
430 Ok(())
431 }
432
433 pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
435 let v_total = record.v_total.to_string();
436 let merkle_hash = record
437 .merkle_hash
438 .as_ref()
439 .map(hex::encode)
440 .unwrap_or_default();
441 let attempt_count = record.attempt_count.to_string();
442 let node_class = record.node_class.clone().unwrap_or_default();
443 let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
444 let goal = record.goal.clone().unwrap_or_default();
445 let parent_id = record.parent_id.clone().unwrap_or_default();
446 let children = record.children.clone().unwrap_or_default();
447 let last_error_type = record.last_error_type.clone().unwrap_or_default();
448 let committed_at = record.committed_at.clone().unwrap_or_default();
449
450 self.conn.lock().unwrap().execute(
451 r#"
452 INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
453 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
454 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
455 "#,
456 [
457 &record.node_id,
458 &record.session_id,
459 &record.state,
460 &v_total,
461 &merkle_hash,
462 &attempt_count,
463 &node_class,
464 &owner_plugin,
465 &goal,
466 &parent_id,
467 &children,
468 &last_error_type,
469 &committed_at,
470 ],
471 )?;
472 Ok(())
473 }
474
475 pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
477 self.conn.lock().unwrap().execute(
478 r#"
479 INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
480 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
481 "#,
482 [
483 &record.node_id,
484 &record.session_id,
485 &record.v_syn.to_string(),
486 &record.v_str.to_string(),
487 &record.v_log.to_string(),
488 &record.v_boot.to_string(),
489 &record.v_sheaf.to_string(),
490 &record.v_total.to_string(),
491 ],
492 )?;
493 Ok(())
494 }
495
496 pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
498 let conn = self.conn.lock().unwrap();
499 let mut stmt = conn.prepare(
500 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
501 )?;
502
503 let mut rows = stmt.query([session_id])?;
504 if let Some(row) = rows.next()? {
505 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
508
509 Ok(Some(SessionRecord {
510 session_id: row.get(0)?,
511 task: row.get(1)?,
512 working_dir: row.get(2)?,
513 merkle_root,
514 detected_toolchain: row.get(4)?,
515 status: row.get(5)?,
516 }))
517 } else {
518 Ok(None)
519 }
520 }
521
522 pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
524 let data_dir = dirs::data_local_dir()
525 .context("Could not find local data directory")?
526 .join("perspt")
527 .join("sessions")
528 .join(session_id);
529 Ok(data_dir)
530 }
531
532 pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
534 let dir = self.get_session_dir(session_id)?;
535 if !dir.exists() {
536 std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
537 }
538 Ok(dir)
539 }
540
541 pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
543 let conn = self.conn.lock().unwrap();
544 let mut stmt = conn.prepare(
545 "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
546 )?;
547
548 let mut rows = stmt.query([session_id, node_id])?;
549 let mut records = Vec::new();
550
551 while let Some(row) = rows.next()? {
552 records.push(EnergyRecord {
553 node_id: row.get(0)?,
554 session_id: row.get(1)?,
555 v_syn: row.get::<_, f64>(2)? as f32,
556 v_str: row.get::<_, f64>(3)? as f32,
557 v_log: row.get::<_, f64>(4)? as f32,
558 v_boot: row.get::<_, f64>(5)? as f32,
559 v_sheaf: row.get::<_, f64>(6)? as f32,
560 v_total: row.get::<_, f64>(7)? as f32,
561 });
562 }
563
564 Ok(records)
565 }
566
567 pub fn get_session_energy_history(&self, session_id: &str) -> Result<Vec<EnergyRecord>> {
569 let conn = self.conn.lock().unwrap();
570 let mut stmt = conn.prepare(
571 "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? ORDER BY timestamp"
572 )?;
573
574 let mut rows = stmt.query([session_id])?;
575 let mut records = Vec::new();
576
577 while let Some(row) = rows.next()? {
578 records.push(EnergyRecord {
579 node_id: row.get(0)?,
580 session_id: row.get(1)?,
581 v_syn: row.get::<_, f64>(2)? as f32,
582 v_str: row.get::<_, f64>(3)? as f32,
583 v_log: row.get::<_, f64>(4)? as f32,
584 v_boot: row.get::<_, f64>(5)? as f32,
585 v_sheaf: row.get::<_, f64>(6)? as f32,
586 v_total: row.get::<_, f64>(7)? as f32,
587 });
588 }
589
590 Ok(records)
591 }
592
593 pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
595 self.list_sessions_paginated(limit, 0)
596 }
597
598 pub fn list_sessions_paginated(
600 &self,
601 limit: usize,
602 offset: usize,
603 ) -> Result<Vec<SessionRecord>> {
604 let conn = self.conn.lock().unwrap();
605 let mut stmt = conn.prepare(
606 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
607 FROM sessions ORDER BY created_at DESC LIMIT ? OFFSET ?",
608 )?;
609
610 let mut rows = stmt.query([limit.to_string(), offset.to_string()])?;
611 let mut records = Vec::new();
612
613 while let Some(row) = rows.next()? {
614 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
615
616 records.push(SessionRecord {
617 session_id: row.get(0)?,
618 task: row.get(1)?,
619 working_dir: row.get(2)?,
620 merkle_root,
621 detected_toolchain: row.get(4)?,
622 status: row.get(5)?,
623 });
624 }
625
626 Ok(records)
627 }
628
629 pub fn count_sessions(&self) -> Result<usize> {
631 let conn = self.conn.lock().unwrap();
632 let mut stmt = conn.prepare("SELECT COUNT(*) FROM sessions")?;
633 let mut rows = stmt.query([])?;
634 if let Some(row) = rows.next()? {
635 let count: i64 = row.get(0)?;
636 Ok(count as usize)
637 } else {
638 Ok(0)
639 }
640 }
641
642 pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
644 let conn = self.conn.lock().unwrap();
645 let mut stmt = conn.prepare(
646 "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
647 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
648 FROM node_states WHERE session_id = ? ORDER BY created_at",
649 )?;
650
651 let mut rows = stmt.query([session_id])?;
652 let mut records = Vec::new();
653
654 while let Some(row) = rows.next()? {
655 records.push(NodeStateRecord {
656 node_id: row.get(0)?,
657 session_id: row.get(1)?,
658 state: row.get(2)?,
659 v_total: row.get::<_, f64>(3)? as f32,
660 merkle_hash: row
661 .get::<_, Option<String>>(4)?
662 .and_then(|s| hex::decode(s).ok()),
663 attempt_count: row.get(5)?,
664 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
665 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
666 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
667 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
668 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
669 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
670 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
671 });
672 }
673
674 Ok(records)
675 }
676
677 pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
679 self.conn.lock().unwrap().execute(
680 "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
681 [status, session_id],
682 )?;
683 Ok(())
684 }
685
686 pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
688 let conn = self.conn.lock().unwrap();
689 conn.execute(
690 r#"
691 INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
692 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
693 "#,
694 [
695 &record.session_id,
696 &record.node_id.clone().unwrap_or_default(),
697 &record.model,
698 &record.prompt,
699 &record.response,
700 &record.tokens_in.to_string(),
701 &record.tokens_out.to_string(),
702 &record.latency_ms.to_string(),
703 ],
704 )?;
705 Ok(())
706 }
707
708 pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
710 let conn = self.conn.lock().unwrap();
711 let mut stmt = conn.prepare(
712 "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
713 FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
714 )?;
715
716 let mut rows = stmt.query([session_id])?;
717 let mut records = Vec::new();
718
719 while let Some(row) = rows.next()? {
720 let node_id: Option<String> = row.get(1)?;
721 records.push(LlmRequestRecord {
722 session_id: row.get(0)?,
723 node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
724 None
725 } else {
726 node_id
727 },
728 model: row.get(2)?,
729 prompt: row.get(3)?,
730 response: row.get(4)?,
731 tokens_in: row.get(5)?,
732 tokens_out: row.get(6)?,
733 latency_ms: row.get(7)?,
734 });
735 }
736
737 Ok(records)
738 }
739
740 pub fn get_global_llm_summary(&self) -> Result<(i64, i64, i64, i64)> {
742 let conn = self.conn.lock().unwrap();
743 let mut stmt = conn.prepare(
744 "SELECT COUNT(*), \
745 COALESCE(SUM(CASE WHEN tokens_in > 0 THEN tokens_in ELSE (LENGTH(prompt) + 3) / 4 END), 0), \
746 COALESCE(SUM(CASE WHEN tokens_out > 0 THEN tokens_out ELSE (LENGTH(response) + 3) / 4 END), 0), \
747 COALESCE(MEDIAN(latency_ms), 0) \
748 FROM llm_requests",
749 )?;
750 let result = stmt.query_row([], |row| {
751 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
752 })?;
753 Ok(result)
754 }
755
756 pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
762 self.conn.lock().unwrap().execute(
763 r#"
764 INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
765 VALUES (?, ?, ?, ?, ?, ?, ?)
766 "#,
767 [
768 &record.digest_id,
769 &record.session_id,
770 &record.node_id,
771 &record.source_path,
772 &record.artifact_kind,
773 &hex::encode(&record.hash),
774 &record.version.to_string(),
775 ],
776 )?;
777 Ok(())
778 }
779
780 pub fn get_structural_digests(
782 &self,
783 session_id: &str,
784 node_id: &str,
785 ) -> Result<Vec<StructuralDigestRecord>> {
786 let conn = self.conn.lock().unwrap();
787 let mut stmt = conn.prepare(
788 "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
789 FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
790 )?;
791
792 let mut rows = stmt.query([session_id, node_id])?;
793 let mut records = Vec::new();
794
795 while let Some(row) = rows.next()? {
796 records.push(StructuralDigestRecord {
797 digest_id: row.get(0)?,
798 session_id: row.get(1)?,
799 node_id: row.get(2)?,
800 source_path: row.get(3)?,
801 artifact_kind: row.get(4)?,
802 hash: row
803 .get::<_, String>(5)
804 .ok()
805 .and_then(|s| hex::decode(s).ok())
806 .unwrap_or_default(),
807 version: row.get(5)?,
808 });
809 }
810
811 Ok(records)
812 }
813
814 pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
816 self.conn.lock().unwrap().execute(
817 r#"
818 INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
819 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
820 "#,
821 [
822 &record.session_id,
823 &record.node_id,
824 &record.context_package_id,
825 &record.structural_hashes,
826 &record.summary_hashes,
827 &record.dependency_hashes,
828 &record.included_file_count.to_string(),
829 &record.total_bytes.to_string(),
830 ],
831 )?;
832 Ok(())
833 }
834
835 pub fn get_context_provenance(
837 &self,
838 session_id: &str,
839 node_id: &str,
840 ) -> Result<Option<ContextProvenanceRecord>> {
841 let conn = self.conn.lock().unwrap();
842 let mut stmt = conn.prepare(
843 "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
844 FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
845 )?;
846
847 let mut rows = stmt.query([session_id, node_id])?;
848 if let Some(row) = rows.next()? {
849 Ok(Some(ContextProvenanceRecord {
850 session_id: row.get(0)?,
851 node_id: row.get(1)?,
852 context_package_id: row.get(2)?,
853 structural_hashes: row.get(3)?,
854 summary_hashes: row.get(4)?,
855 dependency_hashes: row.get(5)?,
856 included_file_count: row.get(6)?,
857 total_bytes: row.get(7)?,
858 }))
859 } else {
860 Ok(None)
861 }
862 }
863
864 pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
870 self.conn.lock().unwrap().execute(
871 r#"
872 INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
873 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
874 "#,
875 [
876 &record.session_id,
877 &record.node_id,
878 &record.category,
879 &record.action,
880 &record.energy_snapshot,
881 &record.stage_outcomes,
882 &record.evidence,
883 &record.affected_node_ids,
884 ],
885 )?;
886 Ok(())
887 }
888
889 pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
891 let conn = self.conn.lock().unwrap();
892 let mut stmt = conn.prepare(
893 "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
894 FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
895 )?;
896 let mut rows = stmt.query([session_id])?;
897 let mut records = Vec::new();
898 while let Some(row) = rows.next()? {
899 records.push(EscalationReportRecord {
900 session_id: row.get(0)?,
901 node_id: row.get(1)?,
902 category: row.get(2)?,
903 action: row.get(3)?,
904 energy_snapshot: row.get(4)?,
905 stage_outcomes: row.get(5)?,
906 evidence: row.get(6)?,
907 affected_node_ids: row.get(7)?,
908 });
909 }
910 Ok(records)
911 }
912
913 pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
915 self.conn.lock().unwrap().execute(
916 r#"
917 INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
918 VALUES (?, ?, ?, ?, ?, ?)
919 "#,
920 [
921 &record.session_id,
922 &record.node_id,
923 &record.action,
924 &record.category,
925 &record.requeued_nodes,
926 &record.inserted_nodes,
927 ],
928 )?;
929 Ok(())
930 }
931
932 pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
934 let conn = self.conn.lock().unwrap();
935 let mut stmt = conn.prepare(
936 "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
937 FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
938 )?;
939 let mut rows = stmt.query([session_id])?;
940 let mut records = Vec::new();
941 while let Some(row) = rows.next()? {
942 records.push(RewriteRecordRow {
943 session_id: row.get(0)?,
944 node_id: row.get(1)?,
945 action: row.get(2)?,
946 category: row.get(3)?,
947 requeued_nodes: row.get(4)?,
948 inserted_nodes: row.get(5)?,
949 });
950 }
951 Ok(records)
952 }
953
954 pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
956 self.conn.lock().unwrap().execute(
957 r#"
958 INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
959 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
960 "#,
961 [
962 &record.session_id,
963 &record.node_id,
964 &record.validator_class,
965 &record.plugin_source.clone().unwrap_or_default(),
966 &record.passed.to_string(),
967 &record.evidence_summary,
968 &record.affected_files,
969 &record.v_sheaf_contribution.to_string(),
970 &record.requeue_targets,
971 ],
972 )?;
973 Ok(())
974 }
975
976 pub fn get_sheaf_validations(
978 &self,
979 session_id: &str,
980 node_id: &str,
981 ) -> Result<Vec<SheafValidationRow>> {
982 let conn = self.conn.lock().unwrap();
983 let mut stmt = conn.prepare(
984 "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
985 FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
986 )?;
987 let mut rows = stmt.query([session_id, node_id])?;
988 let mut records = Vec::new();
989 while let Some(row) = rows.next()? {
990 records.push(SheafValidationRow {
991 session_id: row.get(0)?,
992 node_id: row.get(1)?,
993 validator_class: row.get(2)?,
994 plugin_source: row.get::<_, Option<String>>(3)?,
995 passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
996 evidence_summary: row.get(5)?,
997 affected_files: row.get(6)?,
998 v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
999 requeue_targets: row.get(8)?,
1000 });
1001 }
1002 Ok(records)
1003 }
1004
1005 pub fn get_all_sheaf_validations(&self, session_id: &str) -> Result<Vec<SheafValidationRow>> {
1007 let conn = self.conn.lock().unwrap();
1008 let mut stmt = conn.prepare(
1009 "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
1010 FROM sheaf_validations WHERE session_id = ? ORDER BY created_at",
1011 )?;
1012 let mut rows = stmt.query([session_id])?;
1013 let mut records = Vec::new();
1014 while let Some(row) = rows.next()? {
1015 records.push(SheafValidationRow {
1016 session_id: row.get(0)?,
1017 node_id: row.get(1)?,
1018 validator_class: row.get(2)?,
1019 plugin_source: row.get::<_, Option<String>>(3)?,
1020 passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1021 evidence_summary: row.get(5)?,
1022 affected_files: row.get(6)?,
1023 v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1024 requeue_targets: row.get(8)?,
1025 });
1026 }
1027 Ok(records)
1028 }
1029
1030 pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
1036 self.conn.lock().unwrap().execute(
1037 r#"
1038 INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
1039 VALUES (?, ?, ?, ?, ?, ?, ?)
1040 "#,
1041 [
1042 &record.branch_id,
1043 &record.session_id,
1044 &record.node_id,
1045 &record.parent_node_id,
1046 &record.state,
1047 &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
1048 &record.sandbox_dir.clone().unwrap_or_default(),
1049 ],
1050 )?;
1051 Ok(())
1052 }
1053
1054 pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
1056 self.conn.lock().unwrap().execute(
1057 "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
1058 [new_state, branch_id],
1059 )?;
1060 Ok(())
1061 }
1062
1063 pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
1065 let conn = self.conn.lock().unwrap();
1066 let mut stmt = conn.prepare(
1067 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1068 FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
1069 )?;
1070 let mut rows = stmt.query([session_id])?;
1071 let mut records = Vec::new();
1072 while let Some(row) = rows.next()? {
1073 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1075 records.push(ProvisionalBranchRow {
1076 branch_id: row.get(0)?,
1077 session_id: row.get(1)?,
1078 node_id: row.get(2)?,
1079 parent_node_id: row.get(3)?,
1080 state: row.get(4)?,
1081 parent_seal_hash,
1082 sandbox_dir: row.get::<_, Option<String>>(6)?,
1083 });
1084 }
1085 Ok(records)
1086 }
1087
1088 pub fn get_live_branches_for_parent(
1090 &self,
1091 session_id: &str,
1092 parent_node_id: &str,
1093 ) -> Result<Vec<ProvisionalBranchRow>> {
1094 let conn = self.conn.lock().unwrap();
1095 let mut stmt = conn.prepare(
1096 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1097 FROM provisional_branches
1098 WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
1099 ORDER BY created_at",
1100 )?;
1101 let mut rows = stmt.query([session_id, parent_node_id])?;
1102 let mut records = Vec::new();
1103 while let Some(row) = rows.next()? {
1104 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1106 records.push(ProvisionalBranchRow {
1107 branch_id: row.get(0)?,
1108 session_id: row.get(1)?,
1109 node_id: row.get(2)?,
1110 parent_node_id: row.get(3)?,
1111 state: row.get(4)?,
1112 parent_seal_hash,
1113 sandbox_dir: row.get::<_, Option<String>>(6)?,
1114 });
1115 }
1116 Ok(records)
1117 }
1118
1119 pub fn flush_branches_for_parent(
1121 &self,
1122 session_id: &str,
1123 parent_node_id: &str,
1124 ) -> Result<Vec<String>> {
1125 let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
1126 let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
1127 for bid in &branch_ids {
1128 self.update_branch_state(bid, "flushed")?;
1129 }
1130 Ok(branch_ids)
1131 }
1132
1133 pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
1139 self.conn.lock().unwrap().execute(
1140 r#"
1141 INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
1142 VALUES (?, ?, ?, ?)
1143 "#,
1144 [
1145 &record.lineage_id,
1146 &record.parent_branch_id,
1147 &record.child_branch_id,
1148 &record.depends_on_seal.to_string(),
1149 ],
1150 )?;
1151 Ok(())
1152 }
1153
1154 pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1156 let conn = self.conn.lock().unwrap();
1157 let mut stmt =
1158 conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1159 let mut rows = stmt.query([parent_branch_id])?;
1160 let mut ids = Vec::new();
1161 while let Some(row) = rows.next()? {
1162 ids.push(row.get(0)?);
1163 }
1164 Ok(ids)
1165 }
1166
1167 pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1173 self.conn.lock().unwrap().execute(
1174 r#"
1175 INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1176 VALUES (?, ?, ?, ?, ?, ?, ?)
1177 "#,
1178 [
1179 &record.seal_id,
1180 &record.session_id,
1181 &record.node_id,
1182 &record.sealed_path,
1183 &record.artifact_kind,
1184 &hex::encode(&record.seal_hash),
1185 &record.version.to_string(),
1186 ],
1187 )?;
1188 Ok(())
1189 }
1190
1191 pub fn get_interface_seals(
1193 &self,
1194 session_id: &str,
1195 node_id: &str,
1196 ) -> Result<Vec<InterfaceSealRow>> {
1197 let conn = self.conn.lock().unwrap();
1198 let mut stmt = conn.prepare(
1199 "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1200 FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1201 )?;
1202 let mut rows = stmt.query([session_id, node_id])?;
1203 let mut records = Vec::new();
1204 while let Some(row) = rows.next()? {
1205 records.push(InterfaceSealRow {
1206 seal_id: row.get(0)?,
1207 session_id: row.get(1)?,
1208 node_id: row.get(2)?,
1209 sealed_path: row.get(3)?,
1210 artifact_kind: row.get(4)?,
1211 seal_hash: row
1212 .get::<_, String>(5)
1213 .ok()
1214 .and_then(|h| hex::decode(h).ok())
1215 .unwrap_or_default(),
1216 version: row.get::<_, i32>(6)?,
1217 });
1218 }
1219 Ok(records)
1220 }
1221
1222 pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1224 let conn = self.conn.lock().unwrap();
1225 let count: i64 = conn.query_row(
1226 "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1227 [session_id, node_id],
1228 |row| row.get(0),
1229 )?;
1230 Ok(count > 0)
1231 }
1232
1233 pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1239 self.conn.lock().unwrap().execute(
1240 r#"
1241 INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1242 VALUES (?, ?, ?, ?, ?, ?)
1243 "#,
1244 [
1245 &record.flush_id,
1246 &record.session_id,
1247 &record.parent_node_id,
1248 &record.flushed_branch_ids,
1249 &record.requeue_node_ids,
1250 &record.reason,
1251 ],
1252 )?;
1253 Ok(())
1254 }
1255
1256 pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1258 let conn = self.conn.lock().unwrap();
1259 let mut stmt = conn.prepare(
1260 "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1261 FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1262 )?;
1263 let mut rows = stmt.query([session_id])?;
1264 let mut records = Vec::new();
1265 while let Some(row) = rows.next()? {
1266 records.push(BranchFlushRow {
1267 flush_id: row.get(0)?,
1268 session_id: row.get(1)?,
1269 parent_node_id: row.get(2)?,
1270 flushed_branch_ids: row.get(3)?,
1271 requeue_node_ids: row.get(4)?,
1272 reason: row.get(5)?,
1273 });
1274 }
1275 Ok(records)
1276 }
1277
1278 pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1286 let conn = self.conn.lock().unwrap();
1287 let mut stmt = conn.prepare(
1288 "WITH ranked AS ( \
1289 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1290 FROM node_states WHERE session_id = ? \
1291 ) \
1292 SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1293 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1294 FROM ranked WHERE rn = 1 ORDER BY created_at",
1295 )?;
1296
1297 let mut rows = stmt.query([session_id])?;
1298 let mut records = Vec::new();
1299
1300 while let Some(row) = rows.next()? {
1301 records.push(NodeStateRecord {
1302 node_id: row.get(0)?,
1303 session_id: row.get(1)?,
1304 state: row.get(2)?,
1305 v_total: row.get::<_, f64>(3)? as f32,
1306 merkle_hash: row
1307 .get::<_, Option<String>>(4)?
1308 .and_then(|s| hex::decode(s).ok()),
1309 attempt_count: row.get(5)?,
1310 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1311 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1312 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1313 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1314 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1315 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1316 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1317 });
1318 }
1319
1320 Ok(records)
1321 }
1322
1323 pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1325 self.conn.lock().unwrap().execute(
1326 r#"
1327 INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1328 VALUES (?, ?, ?, ?)
1329 "#,
1330 [
1331 &record.session_id,
1332 &record.parent_node_id,
1333 &record.child_node_id,
1334 &record.edge_type,
1335 ],
1336 )?;
1337 Ok(())
1338 }
1339
1340 pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1342 let conn = self.conn.lock().unwrap();
1343 let mut stmt = conn.prepare(
1344 "SELECT session_id, parent_node_id, child_node_id, edge_type \
1345 FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1346 )?;
1347 let mut rows = stmt.query([session_id])?;
1348 let mut records = Vec::new();
1349 while let Some(row) = rows.next()? {
1350 records.push(TaskGraphEdgeRow {
1351 session_id: row.get(0)?,
1352 parent_node_id: row.get(1)?,
1353 child_node_id: row.get(2)?,
1354 edge_type: row.get(3)?,
1355 });
1356 }
1357 Ok(records)
1358 }
1359
1360 pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1362 let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1363 let escalation_category = record.escalation_category.clone().unwrap_or_default();
1364 self.conn.lock().unwrap().execute(
1365 r#"
1366 INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1367 energy_at_review, degraded, escalation_category)
1368 VALUES (?, ?, ?, ?, ?, ?, ?)
1369 "#,
1370 duckdb::params![
1371 record.session_id,
1372 record.node_id,
1373 record.outcome,
1374 reviewer_note,
1375 record.energy_at_review.unwrap_or(0.0),
1376 record.degraded.unwrap_or(false),
1377 escalation_category,
1378 ],
1379 )?;
1380 Ok(())
1381 }
1382
1383 pub fn get_review_outcomes(
1385 &self,
1386 session_id: &str,
1387 node_id: &str,
1388 ) -> Result<Vec<ReviewOutcomeRow>> {
1389 let conn = self.conn.lock().unwrap();
1390 let mut stmt = conn.prepare(
1391 "SELECT session_id, node_id, outcome, reviewer_note, \
1392 energy_at_review, degraded, escalation_category \
1393 FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1394 )?;
1395 let mut rows = stmt.query([session_id, node_id])?;
1396 let mut records = Vec::new();
1397 while let Some(row) = rows.next()? {
1398 records.push(ReviewOutcomeRow {
1399 session_id: row.get(0)?,
1400 node_id: row.get(1)?,
1401 outcome: row.get(2)?,
1402 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1403 energy_at_review: row.get::<_, Option<f64>>(4)?,
1404 degraded: row.get::<_, Option<bool>>(5)?,
1405 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1406 });
1407 }
1408 Ok(records)
1409 }
1410
1411 pub fn get_latest_review_outcome(
1413 &self,
1414 session_id: &str,
1415 node_id: &str,
1416 ) -> Result<Option<ReviewOutcomeRow>> {
1417 let conn = self.conn.lock().unwrap();
1418 let mut stmt = conn.prepare(
1419 "SELECT session_id, node_id, outcome, reviewer_note, \
1420 energy_at_review, degraded, escalation_category \
1421 FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1422 ORDER BY created_at DESC LIMIT 1",
1423 )?;
1424 let mut rows = stmt.query([session_id, node_id])?;
1425 if let Some(row) = rows.next()? {
1426 Ok(Some(ReviewOutcomeRow {
1427 session_id: row.get(0)?,
1428 node_id: row.get(1)?,
1429 outcome: row.get(2)?,
1430 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1431 energy_at_review: row.get::<_, Option<f64>>(4)?,
1432 degraded: row.get::<_, Option<bool>>(5)?,
1433 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1434 }))
1435 } else {
1436 Ok(None)
1437 }
1438 }
1439
1440 pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1442 let conn = self.conn.lock().unwrap();
1443 let mut stmt = conn.prepare(
1444 "SELECT session_id, node_id, outcome, reviewer_note, \
1445 energy_at_review, degraded, escalation_category \
1446 FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1447 )?;
1448 let mut rows = stmt.query([session_id])?;
1449 let mut records = Vec::new();
1450 while let Some(row) = rows.next()? {
1451 records.push(ReviewOutcomeRow {
1452 session_id: row.get(0)?,
1453 node_id: row.get(1)?,
1454 outcome: row.get(2)?,
1455 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1456 energy_at_review: row.get::<_, Option<f64>>(4)?,
1457 degraded: row.get::<_, Option<bool>>(5)?,
1458 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1459 });
1460 }
1461 Ok(records)
1462 }
1463
1464 pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1470 let syntax_ok = record.syntax_ok.to_string();
1471 let build_ok = record.build_ok.to_string();
1472 let tests_ok = record.tests_ok.to_string();
1473 let lint_ok = record.lint_ok.to_string();
1474 let diagnostics_count = record.diagnostics_count.to_string();
1475 let tests_passed = record.tests_passed.to_string();
1476 let tests_failed = record.tests_failed.to_string();
1477 let degraded = record.degraded.to_string();
1478 let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1479
1480 self.conn.lock().unwrap().execute(
1481 r#"
1482 INSERT INTO verification_results (session_id, node_id, result_json,
1483 syntax_ok, build_ok, tests_ok, lint_ok,
1484 diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1485 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1486 "#,
1487 [
1488 &record.session_id,
1489 &record.node_id,
1490 &record.result_json,
1491 &syntax_ok,
1492 &build_ok,
1493 &tests_ok,
1494 &lint_ok,
1495 &diagnostics_count,
1496 &tests_passed,
1497 &tests_failed,
1498 °raded,
1499 °raded_reason,
1500 ],
1501 )?;
1502 Ok(())
1503 }
1504
1505 pub fn get_verification_result(
1507 &self,
1508 session_id: &str,
1509 node_id: &str,
1510 ) -> Result<Option<VerificationResultRow>> {
1511 let conn = self.conn.lock().unwrap();
1512 let mut stmt = conn.prepare(
1513 "SELECT session_id, node_id, result_json, \
1514 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1515 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1516 FROM verification_results \
1517 WHERE session_id = ? AND node_id = ? \
1518 ORDER BY created_at DESC LIMIT 1",
1519 )?;
1520 let mut rows = stmt.query([session_id, node_id])?;
1521 if let Some(row) = rows.next()? {
1522 Ok(Some(VerificationResultRow {
1523 session_id: row.get(0)?,
1524 node_id: row.get(1)?,
1525 result_json: row.get(2)?,
1526 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1527 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1528 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1529 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1530 diagnostics_count: row.get(7)?,
1531 tests_passed: row.get(8)?,
1532 tests_failed: row.get(9)?,
1533 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1534 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1535 }))
1536 } else {
1537 Ok(None)
1538 }
1539 }
1540
1541 pub fn get_all_verification_results(
1543 &self,
1544 session_id: &str,
1545 ) -> Result<Vec<VerificationResultRow>> {
1546 let conn = self.conn.lock().unwrap();
1547 let mut stmt = conn.prepare(
1548 "WITH ranked AS ( \
1549 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1550 FROM verification_results WHERE session_id = ? \
1551 ) \
1552 SELECT session_id, node_id, result_json, \
1553 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1554 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1555 FROM ranked WHERE rn = 1 ORDER BY created_at",
1556 )?;
1557 let mut rows = stmt.query([session_id])?;
1558 let mut records = Vec::new();
1559 while let Some(row) = rows.next()? {
1560 records.push(VerificationResultRow {
1561 session_id: row.get(0)?,
1562 node_id: row.get(1)?,
1563 result_json: row.get(2)?,
1564 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1565 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1566 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1567 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1568 diagnostics_count: row.get(7)?,
1569 tests_passed: row.get(8)?,
1570 tests_failed: row.get(9)?,
1571 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1572 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1573 });
1574 }
1575 Ok(records)
1576 }
1577
1578 pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1580 let artifact_count = record.artifact_count.to_string();
1581 let command_count = record.command_count.to_string();
1582
1583 self.conn.lock().unwrap().execute(
1584 r#"
1585 INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1586 artifact_count, command_count, touched_files)
1587 VALUES (?, ?, ?, ?, ?, ?)
1588 "#,
1589 [
1590 &record.session_id,
1591 &record.node_id,
1592 &record.bundle_json,
1593 &artifact_count,
1594 &command_count,
1595 &record.touched_files,
1596 ],
1597 )?;
1598 Ok(())
1599 }
1600
1601 pub fn get_artifact_bundle(
1603 &self,
1604 session_id: &str,
1605 node_id: &str,
1606 ) -> Result<Option<ArtifactBundleRow>> {
1607 let conn = self.conn.lock().unwrap();
1608 let mut stmt = conn.prepare(
1609 "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1610 FROM artifact_bundles \
1611 WHERE session_id = ? AND node_id = ? \
1612 ORDER BY created_at DESC LIMIT 1",
1613 )?;
1614 let mut rows = stmt.query([session_id, node_id])?;
1615 if let Some(row) = rows.next()? {
1616 Ok(Some(ArtifactBundleRow {
1617 session_id: row.get(0)?,
1618 node_id: row.get(1)?,
1619 bundle_json: row.get(2)?,
1620 artifact_count: row.get(3)?,
1621 command_count: row.get(4)?,
1622 touched_files: row.get(5)?,
1623 }))
1624 } else {
1625 Ok(None)
1626 }
1627 }
1628}
1629
1630impl SessionStore {
1635 pub fn record_feature_charter(&self, row: &FeatureCharterRow) -> Result<()> {
1637 let conn = self.conn.lock().unwrap();
1638 conn.execute(
1639 "INSERT INTO feature_charters (charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint) \
1640 VALUES (?, ?, ?, ?, ?, ?, ?)",
1641 duckdb::params![
1642 row.charter_id,
1643 row.session_id,
1644 row.scope_description,
1645 row.max_modules,
1646 row.max_files,
1647 row.max_revisions,
1648 row.language_constraint,
1649 ],
1650 )?;
1651 Ok(())
1652 }
1653
1654 pub fn get_feature_charter(&self, session_id: &str) -> Result<Option<FeatureCharterRow>> {
1656 let conn = self.conn.lock().unwrap();
1657 let mut stmt = conn.prepare(
1658 "SELECT charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint \
1659 FROM feature_charters WHERE session_id = ? LIMIT 1",
1660 )?;
1661 let mut rows = stmt.query([session_id])?;
1662 if let Some(row) = rows.next()? {
1663 Ok(Some(FeatureCharterRow {
1664 charter_id: row.get(0)?,
1665 session_id: row.get(1)?,
1666 scope_description: row.get(2)?,
1667 max_modules: row.get(3)?,
1668 max_files: row.get(4)?,
1669 max_revisions: row.get(5)?,
1670 language_constraint: row.get(6)?,
1671 }))
1672 } else {
1673 Ok(None)
1674 }
1675 }
1676
1677 pub fn record_plan_revision(&self, row: &PlanRevisionRow) -> Result<()> {
1679 let conn = self.conn.lock().unwrap();
1680 conn.execute(
1681 "INSERT INTO plan_revisions (revision_id, session_id, sequence, plan_json, reason, supersedes, status) \
1682 VALUES (?, ?, ?, ?, ?, ?, ?)",
1683 duckdb::params![
1684 row.revision_id,
1685 row.session_id,
1686 row.sequence,
1687 row.plan_json,
1688 row.reason,
1689 row.supersedes,
1690 row.status,
1691 ],
1692 )?;
1693 Ok(())
1694 }
1695
1696 pub fn get_active_plan_revision(&self, session_id: &str) -> Result<Option<PlanRevisionRow>> {
1698 let conn = self.conn.lock().unwrap();
1699 let mut stmt = conn.prepare(
1700 "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1701 FROM plan_revisions WHERE session_id = ? AND status = 'active' \
1702 ORDER BY sequence DESC LIMIT 1",
1703 )?;
1704 let mut rows = stmt.query([session_id])?;
1705 if let Some(row) = rows.next()? {
1706 Ok(Some(PlanRevisionRow {
1707 revision_id: row.get(0)?,
1708 session_id: row.get(1)?,
1709 sequence: row.get(2)?,
1710 plan_json: row.get(3)?,
1711 reason: row.get(4)?,
1712 supersedes: row.get(5)?,
1713 status: row.get(6)?,
1714 }))
1715 } else {
1716 Ok(None)
1717 }
1718 }
1719
1720 pub fn get_plan_revisions(&self, session_id: &str) -> Result<Vec<PlanRevisionRow>> {
1722 let conn = self.conn.lock().unwrap();
1723 let mut stmt = conn.prepare(
1724 "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1725 FROM plan_revisions WHERE session_id = ? ORDER BY sequence ASC",
1726 )?;
1727 let mut rows = stmt.query([session_id])?;
1728 let mut results = Vec::new();
1729 while let Some(row) = rows.next()? {
1730 results.push(PlanRevisionRow {
1731 revision_id: row.get(0)?,
1732 session_id: row.get(1)?,
1733 sequence: row.get(2)?,
1734 plan_json: row.get(3)?,
1735 reason: row.get(4)?,
1736 supersedes: row.get(5)?,
1737 status: row.get(6)?,
1738 });
1739 }
1740 Ok(results)
1741 }
1742
1743 pub fn supersede_plan_revision(&self, revision_id: &str) -> Result<()> {
1745 let conn = self.conn.lock().unwrap();
1746 conn.execute(
1747 "UPDATE plan_revisions SET status = 'superseded' WHERE revision_id = ?",
1748 [revision_id],
1749 )?;
1750 Ok(())
1751 }
1752
1753 pub fn record_repair_footprint(&self, row: &RepairFootprintRow) -> Result<()> {
1755 let conn = self.conn.lock().unwrap();
1756 conn.execute(
1757 "INSERT INTO repair_footprints (footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved) \
1758 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
1759 duckdb::params![
1760 row.footprint_id,
1761 row.session_id,
1762 row.node_id,
1763 row.revision_id,
1764 row.attempt,
1765 row.affected_files,
1766 row.bundle_json,
1767 row.diagnosis,
1768 row.resolved,
1769 ],
1770 )?;
1771 Ok(())
1772 }
1773
1774 pub fn get_repair_footprints(
1776 &self,
1777 session_id: &str,
1778 node_id: &str,
1779 ) -> Result<Vec<RepairFootprintRow>> {
1780 let conn = self.conn.lock().unwrap();
1781 let mut stmt = conn.prepare(
1782 "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1783 FROM repair_footprints WHERE session_id = ? AND node_id = ? ORDER BY attempt ASC",
1784 )?;
1785 let mut rows = stmt.query([session_id, node_id])?;
1786 let mut results = Vec::new();
1787 while let Some(row) = rows.next()? {
1788 results.push(RepairFootprintRow {
1789 footprint_id: row.get(0)?,
1790 session_id: row.get(1)?,
1791 node_id: row.get(2)?,
1792 revision_id: row.get(3)?,
1793 attempt: row.get(4)?,
1794 affected_files: row.get(5)?,
1795 bundle_json: row.get(6)?,
1796 diagnosis: row.get(7)?,
1797 resolved: row.get(8)?,
1798 });
1799 }
1800 Ok(results)
1801 }
1802
1803 pub fn get_all_repair_footprints(&self, session_id: &str) -> Result<Vec<RepairFootprintRow>> {
1805 let conn = self.conn.lock().unwrap();
1806 let mut stmt = conn.prepare(
1807 "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1808 FROM repair_footprints WHERE session_id = ? ORDER BY attempt ASC",
1809 )?;
1810 let mut rows = stmt.query([session_id])?;
1811 let mut results = Vec::new();
1812 while let Some(row) = rows.next()? {
1813 results.push(RepairFootprintRow {
1814 footprint_id: row.get(0)?,
1815 session_id: row.get(1)?,
1816 node_id: row.get(2)?,
1817 revision_id: row.get(3)?,
1818 attempt: row.get(4)?,
1819 affected_files: row.get(5)?,
1820 bundle_json: row.get(6)?,
1821 diagnosis: row.get(7)?,
1822 resolved: row.get(8)?,
1823 });
1824 }
1825 Ok(results)
1826 }
1827
1828 pub fn resolve_repair_footprint(&self, footprint_id: &str) -> Result<()> {
1830 let conn = self.conn.lock().unwrap();
1831 conn.execute(
1832 "UPDATE repair_footprints SET resolved = true WHERE footprint_id = ?",
1833 [footprint_id],
1834 )?;
1835 Ok(())
1836 }
1837
1838 pub fn upsert_budget_envelope(&self, row: &BudgetEnvelopeRow) -> Result<()> {
1840 let conn = self.conn.lock().unwrap();
1841 conn.execute(
1843 "INSERT INTO budget_envelopes (session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd) \
1844 VALUES (?, ?, ?, ?, ?, ?, ?) \
1845 ON CONFLICT (session_id) DO UPDATE SET \
1846 max_steps = EXCLUDED.max_steps, steps_used = EXCLUDED.steps_used, \
1847 max_revisions = EXCLUDED.max_revisions, revisions_used = EXCLUDED.revisions_used, \
1848 max_cost_usd = EXCLUDED.max_cost_usd, cost_used_usd = EXCLUDED.cost_used_usd",
1849 duckdb::params![
1850 row.session_id,
1851 row.max_steps,
1852 row.steps_used,
1853 row.max_revisions,
1854 row.revisions_used,
1855 row.max_cost_usd,
1856 row.cost_used_usd,
1857 ],
1858 )?;
1859 Ok(())
1860 }
1861
1862 pub fn get_budget_envelope(&self, session_id: &str) -> Result<Option<BudgetEnvelopeRow>> {
1864 let conn = self.conn.lock().unwrap();
1865 let mut stmt = conn.prepare(
1866 "SELECT session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd \
1867 FROM budget_envelopes WHERE session_id = ?",
1868 )?;
1869 let mut rows = stmt.query([session_id])?;
1870 if let Some(row) = rows.next()? {
1871 Ok(Some(BudgetEnvelopeRow {
1872 session_id: row.get(0)?,
1873 max_steps: row.get(1)?,
1874 steps_used: row.get(2)?,
1875 max_revisions: row.get(3)?,
1876 revisions_used: row.get(4)?,
1877 max_cost_usd: row.get(5)?,
1878 cost_used_usd: row.get(6)?,
1879 }))
1880 } else {
1881 Ok(None)
1882 }
1883 }
1884}
1885
1886impl SessionStore {
1891 pub fn record_step(&self, record: &SrbnStepRecord) -> Result<()> {
1893 let conn = self.conn.lock().unwrap();
1894 conn.execute(
1895 r#"INSERT INTO srbn_step_records
1896 (session_id, node_id, step, outcome, energy_json,
1897 parse_state, retry_classification, attempt_count, duration_ms)
1898 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1899 duckdb::params![
1900 record.session_id,
1901 record.node_id,
1902 record.step,
1903 record.outcome,
1904 record.energy_json,
1905 record.parse_state,
1906 record.retry_classification,
1907 record.attempt_count,
1908 record.duration_ms,
1909 ],
1910 )?;
1911 Ok(())
1912 }
1913
1914 pub fn get_step_timeline(
1916 &self,
1917 session_id: &str,
1918 node_id: &str,
1919 ) -> Result<Vec<SrbnStepRecord>> {
1920 let conn = self.conn.lock().unwrap();
1921 let mut stmt = conn.prepare(
1922 r#"SELECT session_id, node_id, step, outcome, energy_json,
1923 parse_state, retry_classification, attempt_count, duration_ms
1924 FROM srbn_step_records
1925 WHERE session_id = ? AND node_id = ?
1926 ORDER BY id ASC"#,
1927 )?;
1928 let rows = stmt.query_map(duckdb::params![session_id, node_id], |row| {
1929 Ok(SrbnStepRecord {
1930 session_id: row.get(0)?,
1931 node_id: row.get(1)?,
1932 step: row.get(2)?,
1933 outcome: row.get(3)?,
1934 energy_json: row.get(4)?,
1935 parse_state: row.get(5)?,
1936 retry_classification: row.get(6)?,
1937 attempt_count: row.get(7)?,
1938 duration_ms: row.get(8)?,
1939 })
1940 })?;
1941 let mut results = Vec::new();
1942 for row in rows {
1943 results.push(row?);
1944 }
1945 Ok(results)
1946 }
1947
1948 pub fn get_session_steps(&self, session_id: &str) -> Result<Vec<SrbnStepRecord>> {
1950 let conn = self.conn.lock().unwrap();
1951 let mut stmt = conn.prepare(
1952 r#"SELECT session_id, node_id, step, outcome, energy_json,
1953 parse_state, retry_classification, attempt_count, duration_ms
1954 FROM srbn_step_records
1955 WHERE session_id = ?
1956 ORDER BY id ASC"#,
1957 )?;
1958 let rows = stmt.query_map(duckdb::params![session_id], |row| {
1959 Ok(SrbnStepRecord {
1960 session_id: row.get(0)?,
1961 node_id: row.get(1)?,
1962 step: row.get(2)?,
1963 outcome: row.get(3)?,
1964 energy_json: row.get(4)?,
1965 parse_state: row.get(5)?,
1966 retry_classification: row.get(6)?,
1967 attempt_count: row.get(7)?,
1968 duration_ms: row.get(8)?,
1969 })
1970 })?;
1971 let mut results = Vec::new();
1972 for row in rows {
1973 results.push(row?);
1974 }
1975 Ok(results)
1976 }
1977
1978 pub fn record_correction_attempt(&self, record: &CorrectionAttemptRow) -> Result<()> {
1980 let conn = self.conn.lock().unwrap();
1981 conn.execute(
1982 r#"INSERT INTO correction_attempts
1983 (session_id, node_id, attempt, parse_state, retry_classification,
1984 response_fingerprint, response_length, energy_json,
1985 accepted, rejection_reason, created_at)
1986 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
1987 duckdb::params![
1988 record.session_id,
1989 record.node_id,
1990 record.attempt,
1991 record.parse_state,
1992 record.retry_classification,
1993 record.response_fingerprint,
1994 record.response_length,
1995 record.energy_json,
1996 record.accepted,
1997 record.rejection_reason,
1998 record.created_at,
1999 ],
2000 )?;
2001 Ok(())
2002 }
2003
2004 pub fn get_correction_attempts(
2006 &self,
2007 session_id: &str,
2008 node_id: &str,
2009 ) -> Result<Vec<CorrectionAttemptRow>> {
2010 let conn = self.conn.lock().unwrap();
2011 let mut stmt = conn.prepare(
2012 r#"SELECT session_id, node_id, attempt, parse_state, retry_classification,
2013 response_fingerprint, response_length, energy_json,
2014 accepted, rejection_reason, created_at
2015 FROM correction_attempts
2016 WHERE session_id = ? AND node_id = ?
2017 ORDER BY attempt ASC"#,
2018 )?;
2019 let rows = stmt.query_map(duckdb::params![session_id, node_id], |row| {
2020 Ok(CorrectionAttemptRow {
2021 session_id: row.get(0)?,
2022 node_id: row.get(1)?,
2023 attempt: row.get(2)?,
2024 parse_state: row.get(3)?,
2025 retry_classification: row.get(4)?,
2026 response_fingerprint: row.get(5)?,
2027 response_length: row.get(6)?,
2028 energy_json: row.get(7)?,
2029 accepted: row.get(8)?,
2030 rejection_reason: row.get(9)?,
2031 created_at: row.get(10)?,
2032 })
2033 })?;
2034 let mut results = Vec::new();
2035 for row in rows {
2036 results.push(row?);
2037 }
2038 Ok(results)
2039 }
2040
2041 pub fn get_session_correction_attempts(
2043 &self,
2044 session_id: &str,
2045 ) -> Result<Vec<CorrectionAttemptRow>> {
2046 let conn = self.conn.lock().unwrap();
2047 let mut stmt = conn.prepare(
2048 r#"SELECT session_id, node_id, attempt, parse_state, retry_classification,
2049 response_fingerprint, response_length, energy_json,
2050 accepted, rejection_reason, created_at
2051 FROM correction_attempts
2052 WHERE session_id = ?
2053 ORDER BY node_id ASC, attempt ASC"#,
2054 )?;
2055 let rows = stmt.query_map(duckdb::params![session_id], |row| {
2056 Ok(CorrectionAttemptRow {
2057 session_id: row.get(0)?,
2058 node_id: row.get(1)?,
2059 attempt: row.get(2)?,
2060 parse_state: row.get(3)?,
2061 retry_classification: row.get(4)?,
2062 response_fingerprint: row.get(5)?,
2063 response_length: row.get(6)?,
2064 energy_json: row.get(7)?,
2065 accepted: row.get(8)?,
2066 rejection_reason: row.get(9)?,
2067 created_at: row.get(10)?,
2068 })
2069 })?;
2070 let mut results = Vec::new();
2071 for row in rows {
2072 results.push(row?);
2073 }
2074 Ok(results)
2075 }
2076}
2077
2078#[cfg(test)]
2079mod tests {
2080 use super::*;
2081
2082 fn test_store() -> SessionStore {
2084 let temp_dir = std::env::temp_dir();
2085 let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
2086 SessionStore::open(&db_path).expect("Failed to create test store")
2087 }
2088
2089 fn seed_session(store: &SessionStore, session_id: &str) {
2090 let record = SessionRecord {
2091 session_id: session_id.to_string(),
2092 task: "test task".to_string(),
2093 working_dir: "/tmp/test".to_string(),
2094 merkle_root: None,
2095 detected_toolchain: None,
2096 status: "RUNNING".to_string(),
2097 };
2098 store.create_session(&record).unwrap();
2099 }
2100
2101 #[test]
2102 fn test_node_state_phase8_roundtrip() {
2103 let store = test_store();
2104 let sid = "test-sess-1";
2105 seed_session(&store, sid);
2106
2107 let record = NodeStateRecord {
2108 node_id: "node-1".to_string(),
2109 session_id: sid.to_string(),
2110 state: "Completed".to_string(),
2111 v_total: 0.42,
2112 merkle_hash: Some(vec![0xab; 32]),
2113 attempt_count: 3,
2114 node_class: Some("Interface".to_string()),
2115 owner_plugin: Some("rust".to_string()),
2116 goal: Some("Implement API".to_string()),
2117 parent_id: Some("root".to_string()),
2118 children: Some(r#"["child-a","child-b"]"#.to_string()),
2119 last_error_type: Some("CompilationError".to_string()),
2120 committed_at: Some("2025-01-01T00:00:00Z".to_string()),
2121 };
2122
2123 store.record_node_state(&record).unwrap();
2124
2125 let states = store.get_latest_node_states(sid).unwrap();
2126 assert_eq!(states.len(), 1);
2127 let r = &states[0];
2128 assert_eq!(r.node_id, "node-1");
2129 assert_eq!(r.state, "Completed");
2130 assert_eq!(r.attempt_count, 3);
2131 assert_eq!(r.node_class.as_deref(), Some("Interface"));
2132 assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
2133 assert_eq!(r.goal.as_deref(), Some("Implement API"));
2134 assert_eq!(r.parent_id.as_deref(), Some("root"));
2135 assert!(r.children.is_some());
2136 assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
2137 assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
2138 }
2139
2140 #[test]
2141 fn test_task_graph_edge_roundtrip() {
2142 let store = test_store();
2143 let sid = "test-graph-1";
2144 seed_session(&store, sid);
2145
2146 let edge = TaskGraphEdgeRow {
2147 session_id: sid.to_string(),
2148 parent_node_id: "parent-1".to_string(),
2149 child_node_id: "child-1".to_string(),
2150 edge_type: "depends_on".to_string(),
2151 };
2152 store.record_task_graph_edge(&edge).unwrap();
2153
2154 let edges = store.get_task_graph_edges(sid).unwrap();
2155 assert_eq!(edges.len(), 1);
2156 assert_eq!(edges[0].parent_node_id, "parent-1");
2157 assert_eq!(edges[0].child_node_id, "child-1");
2158 assert_eq!(edges[0].edge_type, "depends_on");
2159 }
2160
2161 #[test]
2162 fn test_verification_result_roundtrip() {
2163 let store = test_store();
2164 let sid = "test-vr-1";
2165 seed_session(&store, sid);
2166
2167 let row = VerificationResultRow {
2168 session_id: sid.to_string(),
2169 node_id: "node-v".to_string(),
2170 result_json: r#"{"syntax_ok":true}"#.to_string(),
2171 syntax_ok: true,
2172 build_ok: true,
2173 tests_ok: false,
2174 lint_ok: true,
2175 diagnostics_count: 2,
2176 tests_passed: 5,
2177 tests_failed: 1,
2178 degraded: false,
2179 degraded_reason: None,
2180 };
2181 store.record_verification_result(&row).unwrap();
2182
2183 let got = store.get_verification_result(sid, "node-v").unwrap();
2184 assert!(got.is_some());
2185 let got = got.unwrap();
2186 assert!(got.syntax_ok);
2187 assert!(got.build_ok);
2188 assert!(!got.tests_ok);
2189 assert_eq!(got.tests_passed, 5);
2190 assert_eq!(got.tests_failed, 1);
2191 assert!(!got.degraded);
2192 }
2193
2194 #[test]
2195 fn test_verification_result_degraded() {
2196 let store = test_store();
2197 let sid = "test-vr-deg";
2198 seed_session(&store, sid);
2199
2200 let row = VerificationResultRow {
2201 session_id: sid.to_string(),
2202 node_id: "node-d".to_string(),
2203 result_json: "{}".to_string(),
2204 syntax_ok: true,
2205 build_ok: false,
2206 tests_ok: false,
2207 lint_ok: false,
2208 diagnostics_count: 0,
2209 tests_passed: 0,
2210 tests_failed: 0,
2211 degraded: true,
2212 degraded_reason: Some("LSP unavailable".to_string()),
2213 };
2214 store.record_verification_result(&row).unwrap();
2215
2216 let got = store
2217 .get_verification_result(sid, "node-d")
2218 .unwrap()
2219 .unwrap();
2220 assert!(got.degraded);
2221 assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
2222 }
2223
2224 #[test]
2225 fn test_artifact_bundle_roundtrip() {
2226 let store = test_store();
2227 let sid = "test-ab-1";
2228 seed_session(&store, sid);
2229
2230 let row = ArtifactBundleRow {
2231 session_id: sid.to_string(),
2232 node_id: "node-a".to_string(),
2233 bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
2234 artifact_count: 3,
2235 command_count: 1,
2236 touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
2237 };
2238 store.record_artifact_bundle(&row).unwrap();
2239
2240 let got = store.get_artifact_bundle(sid, "node-a").unwrap();
2241 assert!(got.is_some());
2242 let got = got.unwrap();
2243 assert_eq!(got.artifact_count, 3);
2244 assert_eq!(got.command_count, 1);
2245 assert!(got.touched_files.contains("main.rs"));
2246 }
2247
2248 #[test]
2249 fn test_latest_node_states_dedup() {
2250 let store = test_store();
2251 let sid = "test-dedup";
2252 seed_session(&store, sid);
2253
2254 let r1 = NodeStateRecord {
2256 node_id: "node-x".to_string(),
2257 session_id: sid.to_string(),
2258 state: "Coding".to_string(),
2259 v_total: 0.5,
2260 merkle_hash: None,
2261 attempt_count: 1,
2262 node_class: None,
2263 owner_plugin: None,
2264 goal: None,
2265 parent_id: None,
2266 children: None,
2267 last_error_type: None,
2268 committed_at: None,
2269 };
2270 store.record_node_state(&r1).unwrap();
2271
2272 let r2 = NodeStateRecord {
2273 node_id: "node-x".to_string(),
2274 session_id: sid.to_string(),
2275 state: "Completed".to_string(),
2276 v_total: 0.3,
2277 merkle_hash: None,
2278 attempt_count: 2,
2279 node_class: Some("Implementation".to_string()),
2280 owner_plugin: None,
2281 goal: Some("Updated goal".to_string()),
2282 parent_id: None,
2283 children: None,
2284 last_error_type: None,
2285 committed_at: Some("2025-01-02T00:00:00Z".to_string()),
2286 };
2287 store.record_node_state(&r2).unwrap();
2288
2289 let latest = store.get_latest_node_states(sid).unwrap();
2291 assert_eq!(latest.len(), 1);
2292 assert_eq!(latest[0].state, "Completed");
2293 assert_eq!(latest[0].attempt_count, 2);
2294 assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
2295 }
2296
2297 #[test]
2298 fn test_backward_compat_empty_phase8_fields() {
2299 let store = test_store();
2300 let sid = "test-compat";
2301 seed_session(&store, sid);
2302
2303 let r = NodeStateRecord {
2305 node_id: "old-node".to_string(),
2306 session_id: sid.to_string(),
2307 state: "COMPLETED".to_string(),
2308 v_total: 1.0,
2309 merkle_hash: None,
2310 attempt_count: 1,
2311 node_class: None,
2312 owner_plugin: None,
2313 goal: None,
2314 parent_id: None,
2315 children: None,
2316 last_error_type: None,
2317 committed_at: None,
2318 };
2319 store.record_node_state(&r).unwrap();
2320
2321 let latest = store.get_latest_node_states(sid).unwrap();
2322 assert_eq!(latest.len(), 1);
2323 assert!(latest[0].node_class.is_none());
2324 assert!(latest[0].goal.is_none());
2325 assert!(latest[0].committed_at.is_none());
2326
2327 let vr = store.get_verification_result(sid, "old-node").unwrap();
2329 assert!(vr.is_none());
2330 let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
2331 assert!(ab.is_none());
2332 }
2333
2334 #[test]
2335 fn test_review_outcome_roundtrip() {
2336 let store = test_store();
2337 let sid = "test-review";
2338 seed_session(&store, sid);
2339
2340 let row = ReviewOutcomeRow {
2341 session_id: sid.to_string(),
2342 node_id: "node-r".to_string(),
2343 outcome: "approved".to_string(),
2344 reviewer_note: Some("LGTM".to_string()),
2345 energy_at_review: None,
2346 degraded: None,
2347 escalation_category: None,
2348 };
2349 store.record_review_outcome(&row).unwrap();
2350
2351 let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
2352 assert_eq!(outcomes.len(), 1);
2353 assert_eq!(outcomes[0].outcome, "approved");
2354 assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
2355 }
2356
2357 #[test]
2358 fn test_review_outcome_with_audit_fields() {
2359 let store = test_store();
2360 let sid = "test-review-audit";
2361 seed_session(&store, sid);
2362
2363 let row = ReviewOutcomeRow {
2364 session_id: sid.to_string(),
2365 node_id: "node-a".to_string(),
2366 outcome: "rejected".to_string(),
2367 reviewer_note: Some("Needs rework".to_string()),
2368 energy_at_review: Some(0.42),
2369 degraded: Some(true),
2370 escalation_category: Some("complexity".to_string()),
2371 };
2372 store.record_review_outcome(&row).unwrap();
2373
2374 let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
2375 assert_eq!(outcomes.len(), 1);
2376 assert_eq!(outcomes[0].outcome, "rejected");
2377 assert_eq!(outcomes[0].energy_at_review, Some(0.42));
2378 assert_eq!(outcomes[0].degraded, Some(true));
2379 assert_eq!(
2380 outcomes[0].escalation_category.as_deref(),
2381 Some("complexity")
2382 );
2383 }
2384
2385 #[test]
2386 fn test_get_all_review_outcomes() {
2387 let store = test_store();
2388 let sid = "test-review-all";
2389 seed_session(&store, sid);
2390
2391 for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
2392 let row = ReviewOutcomeRow {
2393 session_id: sid.to_string(),
2394 node_id: node.to_string(),
2395 outcome: outcome.to_string(),
2396 reviewer_note: None,
2397 energy_at_review: None,
2398 degraded: None,
2399 escalation_category: None,
2400 };
2401 store.record_review_outcome(&row).unwrap();
2402 }
2403
2404 let all = store.get_all_review_outcomes(sid).unwrap();
2405 assert_eq!(all.len(), 3);
2406 }
2407
2408 #[test]
2409 fn test_feature_charter_roundtrip() {
2410 let store = test_store();
2411 let sid = "test-charter";
2412 seed_session(&store, sid);
2413
2414 let row = FeatureCharterRow {
2415 charter_id: "ch-1".to_string(),
2416 session_id: sid.to_string(),
2417 scope_description: "Add authentication module".to_string(),
2418 max_modules: Some(3),
2419 max_files: Some(10),
2420 max_revisions: Some(5),
2421 language_constraint: Some("rust".to_string()),
2422 };
2423 store.record_feature_charter(&row).unwrap();
2424
2425 let got = store.get_feature_charter(sid).unwrap();
2426 assert!(got.is_some());
2427 let got = got.unwrap();
2428 assert_eq!(got.charter_id, "ch-1");
2429 assert_eq!(got.scope_description, "Add authentication module");
2430 assert_eq!(got.max_modules, Some(3));
2431 assert_eq!(got.language_constraint.as_deref(), Some("rust"));
2432 }
2433
2434 #[test]
2435 fn test_feature_charter_returns_none_for_missing() {
2436 let store = test_store();
2437 let sid = "test-charter-miss";
2438 seed_session(&store, sid);
2439
2440 let got = store.get_feature_charter(sid).unwrap();
2441 assert!(got.is_none());
2442 }
2443
2444 #[test]
2445 fn test_plan_revision_roundtrip() {
2446 let store = test_store();
2447 let sid = "test-rev";
2448 seed_session(&store, sid);
2449
2450 let row = PlanRevisionRow {
2451 revision_id: "rev-1".to_string(),
2452 session_id: sid.to_string(),
2453 sequence: 1,
2454 plan_json: r#"{"tasks":[]}"#.to_string(),
2455 reason: "initial plan".to_string(),
2456 supersedes: None,
2457 status: "active".to_string(),
2458 };
2459 store.record_plan_revision(&row).unwrap();
2460
2461 let active = store.get_active_plan_revision(sid).unwrap();
2462 assert!(active.is_some());
2463 let active = active.unwrap();
2464 assert_eq!(active.revision_id, "rev-1");
2465 assert_eq!(active.sequence, 1);
2466 assert_eq!(active.status, "active");
2467 }
2468
2469 #[test]
2470 fn test_plan_revision_supersede() {
2471 let store = test_store();
2472 let sid = "test-rev-sup";
2473 seed_session(&store, sid);
2474
2475 let r1 = PlanRevisionRow {
2476 revision_id: "rev-1".to_string(),
2477 session_id: sid.to_string(),
2478 sequence: 1,
2479 plan_json: "{}".to_string(),
2480 reason: "initial".to_string(),
2481 supersedes: None,
2482 status: "active".to_string(),
2483 };
2484 store.record_plan_revision(&r1).unwrap();
2485
2486 store.supersede_plan_revision("rev-1").unwrap();
2488
2489 let r2 = PlanRevisionRow {
2490 revision_id: "rev-2".to_string(),
2491 session_id: sid.to_string(),
2492 sequence: 2,
2493 plan_json: r#"{"tasks":["a"]}"#.to_string(),
2494 reason: "verifier feedback".to_string(),
2495 supersedes: Some("rev-1".to_string()),
2496 status: "active".to_string(),
2497 };
2498 store.record_plan_revision(&r2).unwrap();
2499
2500 let active = store.get_active_plan_revision(sid).unwrap().unwrap();
2502 assert_eq!(active.revision_id, "rev-2");
2503
2504 let all = store.get_plan_revisions(sid).unwrap();
2506 assert_eq!(all.len(), 2);
2507 assert_eq!(all[0].status, "superseded");
2508 assert_eq!(all[1].status, "active");
2509 }
2510
2511 #[test]
2512 fn test_repair_footprint_roundtrip() {
2513 let store = test_store();
2514 let sid = "test-repair";
2515 seed_session(&store, sid);
2516
2517 let row = RepairFootprintRow {
2518 footprint_id: "fp-1".to_string(),
2519 session_id: sid.to_string(),
2520 node_id: "node-a".to_string(),
2521 revision_id: "rev-1".to_string(),
2522 attempt: 1,
2523 affected_files: r#"["src/main.rs"]"#.to_string(),
2524 bundle_json: "{}".to_string(),
2525 diagnosis: "missing import".to_string(),
2526 resolved: false,
2527 };
2528 store.record_repair_footprint(&row).unwrap();
2529
2530 let footprints = store.get_repair_footprints(sid, "node-a").unwrap();
2531 assert_eq!(footprints.len(), 1);
2532 assert_eq!(footprints[0].footprint_id, "fp-1");
2533 assert_eq!(footprints[0].diagnosis, "missing import");
2534 assert!(!footprints[0].resolved);
2535 }
2536
2537 #[test]
2538 fn test_repair_footprint_resolve() {
2539 let store = test_store();
2540 let sid = "test-repair-res";
2541 seed_session(&store, sid);
2542
2543 let row = RepairFootprintRow {
2544 footprint_id: "fp-2".to_string(),
2545 session_id: sid.to_string(),
2546 node_id: "node-b".to_string(),
2547 revision_id: "rev-1".to_string(),
2548 attempt: 1,
2549 affected_files: "[]".to_string(),
2550 bundle_json: "{}".to_string(),
2551 diagnosis: "type error".to_string(),
2552 resolved: false,
2553 };
2554 store.record_repair_footprint(&row).unwrap();
2555
2556 store.resolve_repair_footprint("fp-2").unwrap();
2557
2558 let footprints = store.get_repair_footprints(sid, "node-b").unwrap();
2559 assert_eq!(footprints.len(), 1);
2560 assert!(footprints[0].resolved);
2561 }
2562
2563 #[test]
2564 fn test_budget_envelope_upsert_and_get() {
2565 let store = test_store();
2566 let sid = "test-budget";
2567 seed_session(&store, sid);
2568
2569 let row = BudgetEnvelopeRow {
2570 session_id: sid.to_string(),
2571 max_steps: Some(100),
2572 steps_used: 5,
2573 max_revisions: Some(10),
2574 revisions_used: 1,
2575 max_cost_usd: Some(5.0),
2576 cost_used_usd: 0.25,
2577 };
2578 store.upsert_budget_envelope(&row).unwrap();
2579
2580 let got = store.get_budget_envelope(sid).unwrap();
2581 assert!(got.is_some());
2582 let got = got.unwrap();
2583 assert_eq!(got.max_steps, Some(100));
2584 assert_eq!(got.steps_used, 5);
2585 assert_eq!(got.cost_used_usd, 0.25);
2586 }
2587
2588 #[test]
2589 fn test_budget_envelope_upsert_updates() {
2590 let store = test_store();
2591 let sid = "test-budget-up";
2592 seed_session(&store, sid);
2593
2594 let row1 = BudgetEnvelopeRow {
2595 session_id: sid.to_string(),
2596 max_steps: Some(100),
2597 steps_used: 0,
2598 max_revisions: None,
2599 revisions_used: 0,
2600 max_cost_usd: None,
2601 cost_used_usd: 0.0,
2602 };
2603 store.upsert_budget_envelope(&row1).unwrap();
2604
2605 let row2 = BudgetEnvelopeRow {
2607 session_id: sid.to_string(),
2608 max_steps: Some(100),
2609 steps_used: 42,
2610 max_revisions: Some(5),
2611 revisions_used: 3,
2612 max_cost_usd: Some(10.0),
2613 cost_used_usd: 4.5,
2614 };
2615 store.upsert_budget_envelope(&row2).unwrap();
2616
2617 let got = store.get_budget_envelope(sid).unwrap().unwrap();
2618 assert_eq!(got.steps_used, 42);
2619 assert_eq!(got.revisions_used, 3);
2620 assert_eq!(got.cost_used_usd, 4.5);
2621 }
2622
2623 #[test]
2624 fn test_budget_envelope_missing_returns_none() {
2625 let store = test_store();
2626 let sid = "test-budget-miss";
2627 seed_session(&store, sid);
2628
2629 let got = store.get_budget_envelope(sid).unwrap();
2630 assert!(got.is_none());
2631 }
2632
2633 #[test]
2634 fn test_read_only_store_queries_work() {
2635 let temp_dir = std::env::temp_dir();
2636 let db_path = temp_dir.join(format!("perspt_ro_test_{}.db", uuid::Uuid::new_v4()));
2637
2638 {
2640 let store = SessionStore::open(&db_path).unwrap();
2641 seed_session(&store, "ro-test");
2642 }
2643
2644 let ro = SessionStore::open_read_only(&db_path).unwrap();
2646 let sessions = ro.list_recent_sessions(10).unwrap();
2647 assert_eq!(sessions.len(), 1);
2648 assert_eq!(sessions[0].session_id, "ro-test");
2649 }
2650
2651 #[test]
2652 fn test_read_only_store_rejects_writes() {
2653 let temp_dir = std::env::temp_dir();
2654 let db_path = temp_dir.join(format!("perspt_ro_wr_{}.db", uuid::Uuid::new_v4()));
2655
2656 {
2658 let _store = SessionStore::open(&db_path).unwrap();
2659 }
2660
2661 let ro = SessionStore::open_read_only(&db_path).unwrap();
2663 let record = SessionRecord {
2664 session_id: "should-fail".to_string(),
2665 task: "test".to_string(),
2666 working_dir: "/tmp".to_string(),
2667 merkle_root: None,
2668 detected_toolchain: None,
2669 status: "RUNNING".to_string(),
2670 };
2671 assert!(ro.create_session(&record).is_err());
2672 }
2673
2674 #[test]
2679 fn test_srbn_step_record_roundtrip() {
2680 let store = test_store();
2681 let sid = "step-sess";
2682 seed_session(&store, sid);
2683
2684 let r1 = SrbnStepRecord {
2685 session_id: sid.to_string(),
2686 node_id: "n1".to_string(),
2687 step: "speculate".to_string(),
2688 outcome: "ok".to_string(),
2689 energy_json: Some(r#"{"v_syn":0.0}"#.to_string()),
2690 parse_state: Some("FullyParsed".to_string()),
2691 retry_classification: None,
2692 attempt_count: 0,
2693 duration_ms: 120,
2694 };
2695 let r2 = SrbnStepRecord {
2696 session_id: sid.to_string(),
2697 node_id: "n1".to_string(),
2698 step: "verify".to_string(),
2699 outcome: "retry".to_string(),
2700 energy_json: Some(r#"{"v_syn":5.0}"#.to_string()),
2701 parse_state: None,
2702 retry_classification: Some("MalformedResponse".to_string()),
2703 attempt_count: 1,
2704 duration_ms: 300,
2705 };
2706
2707 store.record_step(&r1).unwrap();
2708 store.record_step(&r2).unwrap();
2709
2710 let timeline = store.get_step_timeline(sid, "n1").unwrap();
2711 assert_eq!(timeline.len(), 2);
2712 assert_eq!(timeline[0].step, "speculate");
2713 assert_eq!(timeline[0].outcome, "ok");
2714 assert_eq!(timeline[0].duration_ms, 120);
2715 assert_eq!(timeline[1].step, "verify");
2716 assert_eq!(
2717 timeline[1].retry_classification.as_deref(),
2718 Some("MalformedResponse")
2719 );
2720
2721 let all = store.get_session_steps(sid).unwrap();
2722 assert_eq!(all.len(), 2);
2723 }
2724
2725 #[test]
2726 fn test_correction_attempt_roundtrip() {
2727 let store = test_store();
2728 let sid = "corr-sess";
2729 seed_session(&store, sid);
2730
2731 let a1 = CorrectionAttemptRow {
2732 session_id: sid.to_string(),
2733 node_id: "n1".to_string(),
2734 attempt: 1,
2735 parse_state: "FullyParsed".to_string(),
2736 retry_classification: None,
2737 response_fingerprint: "abc123".to_string(),
2738 response_length: 4096,
2739 energy_json: Some(r#"{"v_syn":2.0}"#.to_string()),
2740 accepted: false,
2741 rejection_reason: Some("v_syn too high".to_string()),
2742 created_at: 1700000000,
2743 };
2744 let a2 = CorrectionAttemptRow {
2745 session_id: sid.to_string(),
2746 node_id: "n1".to_string(),
2747 attempt: 2,
2748 parse_state: "FullyParsed".to_string(),
2749 retry_classification: None,
2750 response_fingerprint: "def456".to_string(),
2751 response_length: 3800,
2752 energy_json: Some(r#"{"v_syn":0.0}"#.to_string()),
2753 accepted: true,
2754 rejection_reason: None,
2755 created_at: 1700000010,
2756 };
2757
2758 store.record_correction_attempt(&a1).unwrap();
2759 store.record_correction_attempt(&a2).unwrap();
2760
2761 let attempts = store.get_correction_attempts(sid, "n1").unwrap();
2762 assert_eq!(attempts.len(), 2);
2763 assert_eq!(attempts[0].attempt, 1);
2764 assert!(!attempts[0].accepted);
2765 assert_eq!(
2766 attempts[0].rejection_reason.as_deref(),
2767 Some("v_syn too high")
2768 );
2769 assert_eq!(attempts[1].attempt, 2);
2770 assert!(attempts[1].accepted);
2771 assert!(attempts[1].rejection_reason.is_none());
2772 }
2773
2774 #[test]
2775 fn test_step_timeline_filters_by_node() {
2776 let store = test_store();
2777 let sid = "filter-sess";
2778 seed_session(&store, sid);
2779
2780 for (node, step) in [("n1", "speculate"), ("n2", "speculate"), ("n1", "verify")] {
2781 store
2782 .record_step(&SrbnStepRecord {
2783 session_id: sid.to_string(),
2784 node_id: node.to_string(),
2785 step: step.to_string(),
2786 outcome: "ok".to_string(),
2787 energy_json: None,
2788 parse_state: None,
2789 retry_classification: None,
2790 attempt_count: 0,
2791 duration_ms: 0,
2792 })
2793 .unwrap();
2794 }
2795
2796 assert_eq!(store.get_step_timeline(sid, "n1").unwrap().len(), 2);
2797 assert_eq!(store.get_step_timeline(sid, "n2").unwrap().len(), 1);
2798 assert_eq!(store.get_session_steps(sid).unwrap().len(), 3);
2799 }
2800}