toasty_driver_integration_suite/
logging_driver.rs1use std::{
2 borrow::Cow,
3 sync::{Arc, Mutex},
4};
5use toasty_core::{
6 async_trait,
7 driver::{Capability, Connection, Driver, Operation, Response, Rows},
8 schema::db::{AppliedMigration, Migration, SchemaDiff},
9 Result, Schema,
10};
11
12#[derive(Debug)]
13pub struct LoggingDriver {
14 inner: Box<dyn Driver>,
15
16 ops_log: Arc<Mutex<Vec<DriverOp>>>,
19}
20
21impl LoggingDriver {
22 pub fn new(driver: Box<dyn Driver>) -> Self {
23 Self {
24 inner: driver,
25 ops_log: Arc::new(Mutex::new(Vec::new())),
26 }
27 }
28
29 pub fn ops_log_handle(&self) -> Arc<Mutex<Vec<DriverOp>>> {
31 self.ops_log.clone()
32 }
33}
34
35#[async_trait]
36impl Driver for LoggingDriver {
37 fn url(&self) -> Cow<'_, str> {
38 self.inner.url()
39 }
40
41 fn capability(&self) -> &'static Capability {
42 self.inner.capability()
43 }
44
45 async fn connect(&self) -> Result<Box<dyn Connection>> {
46 Ok(Box::new(LoggingConnection {
47 inner: self.inner.connect().await?,
48 ops_log: self.ops_log_handle(),
49 }))
50 }
51
52 fn generate_migration(&self, schema_diff: &SchemaDiff<'_>) -> Migration {
53 self.inner.generate_migration(schema_diff)
54 }
55
56 async fn reset_db(&self) -> Result<()> {
57 self.inner.reset_db().await
58 }
59}
60
61#[derive(Debug)]
62pub struct DriverOp {
63 pub operation: Operation,
64 pub response: Response,
65}
66
67#[derive(Debug)]
69pub struct LoggingConnection {
70 inner: Box<dyn Connection>,
72
73 ops_log: Arc<Mutex<Vec<DriverOp>>>,
76}
77
78#[async_trait]
79impl Connection for LoggingConnection {
80 async fn exec(&mut self, schema: &Arc<Schema>, operation: Operation) -> Result<Response> {
81 let operation_clone = operation.clone();
83
84 let mut response = self.inner.exec(schema, operation).await?;
86
87 let duplicated_response = duplicate_response_mut(&mut response).await?;
89
90 let driver_op = DriverOp {
92 operation: operation_clone,
93 response: duplicated_response,
94 };
95
96 self.ops_log
97 .lock()
98 .expect("Failed to acquire ops log lock")
99 .push(driver_op);
100
101 Ok(response)
102 }
103
104 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
105 self.inner.push_schema(schema).await
106 }
107
108 async fn applied_migrations(&mut self) -> Result<Vec<AppliedMigration>> {
109 self.inner.applied_migrations().await
110 }
111
112 async fn apply_migration(
113 &mut self,
114 id: u64,
115 name: String,
116 migration: &Migration,
117 ) -> Result<()> {
118 self.inner.apply_migration(id, name, migration).await
119 }
120}
121
122async fn duplicate_response_mut(response: &mut Response) -> Result<Response> {
125 let rows = match &mut response.rows {
126 Rows::Count(count) => Rows::Count(*count),
127 Rows::Value(_) => todo!(),
128 Rows::Stream(stream) => {
129 let duplicated_stream = stream.dup().await?;
131 Rows::Stream(duplicated_stream)
132 }
133 };
134
135 Ok(Response { rows })
136}