1#![warn(missing_docs)]
2
3mod value;
21pub(crate) use value::Value;
22
23use async_trait::async_trait;
24use rusqlite::Connection as RusqliteConnection;
25use std::{
26 borrow::Cow,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use toasty_core::{
31 Result, Schema,
32 driver::{
33 Capability, Driver, ExecResponse,
34 operation::{IsolationLevel, Operation, Transaction},
35 },
36 schema::db::{self, Migration, SchemaDiff, Table},
37 stmt,
38};
39use toasty_sql::{self as sql, TypedValue};
40use url::Url;
41
42#[derive(Debug)]
52pub enum Sqlite {
53 File(PathBuf),
55 InMemory,
57}
58
59impl Sqlite {
60 pub fn new(url: impl Into<String>) -> Result<Self> {
62 let url_str = url.into();
63 let url = Url::parse(&url_str).map_err(toasty_core::Error::driver_operation_failed)?;
64
65 if url.scheme() != "sqlite" {
66 return Err(toasty_core::Error::invalid_connection_url(format!(
67 "connection URL does not have a `sqlite` scheme; url={}",
68 url_str
69 )));
70 }
71
72 if url.path() == ":memory:" {
73 Ok(Self::InMemory)
74 } else {
75 Ok(Self::File(PathBuf::from(url.path())))
76 }
77 }
78
79 pub fn in_memory() -> Self {
81 Self::InMemory
82 }
83
84 pub fn open<P: AsRef<Path>>(path: P) -> Self {
86 Self::File(path.as_ref().to_path_buf())
87 }
88}
89
90#[async_trait]
91impl Driver for Sqlite {
92 fn url(&self) -> Cow<'_, str> {
93 match self {
94 Sqlite::InMemory => Cow::Borrowed("sqlite::memory:"),
95 Sqlite::File(path) => Cow::Owned(format!("sqlite:{}", path.display())),
96 }
97 }
98
99 fn capability(&self) -> &'static Capability {
100 &Capability::SQLITE
101 }
102
103 async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::Connection>> {
104 let connection = match self {
105 Sqlite::File(path) => Connection::open(path)?,
106 Sqlite::InMemory => Connection::in_memory(),
107 };
108 Ok(Box::new(connection))
109 }
110
111 fn max_connections(&self) -> Option<usize> {
112 matches!(self, Self::InMemory).then_some(1)
113 }
114
115 fn generate_migration(&self, schema_diff: &SchemaDiff<'_>) -> Migration {
116 let statements = sql::MigrationStatement::from_diff(schema_diff, &Capability::SQLITE);
117
118 let sql_strings: Vec<String> = statements
119 .iter()
120 .map(|stmt| {
121 let mut params = Vec::<TypedValue>::new();
122 let sql =
123 sql::Serializer::sqlite(stmt.schema()).serialize(stmt.statement(), &mut params);
124 assert!(
125 params.is_empty(),
126 "migration statements should not have parameters"
127 );
128 sql
129 })
130 .collect();
131
132 Migration::new_sql_with_breakpoints(&sql_strings)
133 }
134
135 async fn reset_db(&self) -> toasty_core::Result<()> {
136 match self {
137 Sqlite::File(path) => {
138 if path.exists() {
140 std::fs::remove_file(path)
141 .map_err(toasty_core::Error::driver_operation_failed)?;
142 }
143 }
144 Sqlite::InMemory => {
145 }
147 }
148
149 Ok(())
150 }
151}
152
153#[derive(Debug)]
155pub struct Connection {
156 connection: RusqliteConnection,
157}
158
159impl Connection {
160 pub fn in_memory() -> Self {
162 let connection = RusqliteConnection::open_in_memory().unwrap();
163
164 Self { connection }
165 }
166
167 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
169 let connection =
170 RusqliteConnection::open(path).map_err(toasty_core::Error::driver_operation_failed)?;
171 let sqlite = Self { connection };
172 Ok(sqlite)
173 }
174}
175
176#[async_trait]
177impl toasty_core::driver::Connection for Connection {
178 async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
179 tracing::trace!(driver = "sqlite", op = %op.name(), "driver exec");
180
181 let (sql, ret_tys): (sql::Statement, _) = match op {
182 Operation::QuerySql(op) => {
183 assert!(
184 op.last_insert_id_hack.is_none(),
185 "last_insert_id_hack is MySQL-specific and should not be set for SQLite"
186 );
187 (op.stmt.into(), op.ret)
188 }
189 Operation::Transaction(mut op) => {
191 if let Transaction::Start { isolation, .. } = &mut op {
192 if !matches!(isolation, Some(IsolationLevel::Serializable) | None) {
193 return Err(toasty_core::Error::unsupported_feature(
194 "SQLite only supports Serializable isolation",
195 ));
196 }
197 *isolation = None;
198 }
199 let sql = sql::Serializer::sqlite(&schema.db).serialize_transaction(&op);
200 self.connection
201 .execute(&sql, [])
202 .map_err(toasty_core::Error::driver_operation_failed)?;
203 return Ok(ExecResponse::count(0));
204 }
205 _ => todo!("op={:#?}", op),
206 };
207
208 let mut params: Vec<toasty_sql::TypedValue> = vec![];
209 let sql_str = sql::Serializer::sqlite(&schema.db).serialize(&sql, &mut params);
210
211 tracing::debug!(db.system = "sqlite", db.statement = %sql_str, params = params.len(), "executing SQL");
212
213 let mut stmt = self.connection.prepare_cached(&sql_str).unwrap();
214
215 let width = match &sql {
216 sql::Statement::Query(stmt) => match &stmt.body {
217 stmt::ExprSet::Select(stmt) => {
218 Some(stmt.returning.as_expr_unwrap().as_record_unwrap().len())
219 }
220 _ => todo!(),
221 },
222 sql::Statement::Insert(stmt) => stmt
223 .returning
224 .as_ref()
225 .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
226 sql::Statement::Delete(stmt) => stmt
227 .returning
228 .as_ref()
229 .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
230 sql::Statement::Update(stmt) => {
231 assert!(stmt.condition.is_none(), "stmt={stmt:#?}");
232 stmt.returning
233 .as_ref()
234 .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len())
235 }
236 _ => None,
237 };
238
239 let params = params
240 .into_iter()
241 .map(|tv| Value::from(tv.value))
242 .collect::<Vec<_>>();
243
244 if width.is_none() {
245 let count = stmt
246 .execute(rusqlite::params_from_iter(params.iter()))
247 .map_err(toasty_core::Error::driver_operation_failed)?;
248
249 return Ok(ExecResponse::count(count as _));
250 }
251
252 let mut rows = stmt
253 .query(rusqlite::params_from_iter(params.iter()))
254 .unwrap();
255
256 let mut ret = vec![];
257
258 let ret_tys = &ret_tys.as_ref().unwrap();
259
260 loop {
261 match rows.next() {
262 Ok(Some(row)) => {
263 let mut items = vec![];
264
265 let width = width.unwrap();
266
267 for index in 0..width {
268 items.push(Value::from_sql(row, index, &ret_tys[index]).into_inner());
269 }
270
271 ret.push(stmt::ValueRecord::from_vec(items).into());
272 }
273 Ok(None) => break,
274 Err(err) => {
275 return Err(toasty_core::Error::driver_operation_failed(err));
276 }
277 }
278 }
279
280 Ok(ExecResponse::value_stream(stmt::ValueStream::from_vec(ret)))
281 }
282
283 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
284 for table in &schema.db.tables {
285 tracing::debug!(table = %table.name, "creating table");
286 self.create_table(&schema.db, table)?;
287 }
288
289 Ok(())
290 }
291
292 async fn applied_migrations(
293 &mut self,
294 ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
295 self.connection
297 .execute(
298 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
299 id INTEGER PRIMARY KEY,
300 name TEXT NOT NULL,
301 applied_at TEXT NOT NULL
302 )",
303 [],
304 )
305 .map_err(toasty_core::Error::driver_operation_failed)?;
306
307 let mut stmt = self
309 .connection
310 .prepare("SELECT id FROM __toasty_migrations ORDER BY applied_at")
311 .map_err(toasty_core::Error::driver_operation_failed)?;
312
313 let rows = stmt
314 .query_map([], |row| {
315 let id: i64 = row.get(0)?;
316 Ok(toasty_core::schema::db::AppliedMigration::new(id as u64))
317 })
318 .map_err(toasty_core::Error::driver_operation_failed)?;
319
320 rows.collect::<rusqlite::Result<Vec<_>>>()
321 .map_err(toasty_core::Error::driver_operation_failed)
322 }
323
324 async fn apply_migration(
325 &mut self,
326 id: u64,
327 name: &str,
328 migration: &toasty_core::schema::db::Migration,
329 ) -> Result<()> {
330 tracing::info!(id = id, name = %name, "applying migration");
331 self.connection
333 .execute(
334 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
335 id INTEGER PRIMARY KEY,
336 name TEXT NOT NULL,
337 applied_at TEXT NOT NULL
338 )",
339 [],
340 )
341 .map_err(toasty_core::Error::driver_operation_failed)?;
342
343 self.connection
345 .execute("BEGIN", [])
346 .map_err(toasty_core::Error::driver_operation_failed)?;
347
348 for statement in migration.statements() {
350 if let Err(e) = self
351 .connection
352 .execute(statement, [])
353 .map_err(toasty_core::Error::driver_operation_failed)
354 {
355 self.connection
356 .execute("ROLLBACK", [])
357 .map_err(toasty_core::Error::driver_operation_failed)?;
358 return Err(e);
359 }
360 }
361
362 if let Err(e) = self.connection.execute(
364 "INSERT INTO __toasty_migrations (id, name, applied_at) VALUES (?1, ?2, datetime('now'))",
365 rusqlite::params![id as i64, name],
366 ).map_err(toasty_core::Error::driver_operation_failed) {
367 self.connection.execute("ROLLBACK", []).map_err(toasty_core::Error::driver_operation_failed)?;
368 return Err(e);
369 }
370
371 self.connection
373 .execute("COMMIT", [])
374 .map_err(toasty_core::Error::driver_operation_failed)?;
375 Ok(())
376 }
377}
378
379impl Connection {
380 fn create_table(&mut self, schema: &db::Schema, table: &Table) -> Result<()> {
381 let serializer = sql::Serializer::sqlite(schema);
382
383 let mut params: Vec<toasty_sql::TypedValue> = vec![];
384 let stmt = serializer.serialize(
385 &sql::Statement::create_table(table, &Capability::SQLITE),
386 &mut params,
387 );
388 assert!(params.is_empty());
389
390 self.connection
391 .execute(&stmt, [])
392 .map_err(toasty_core::Error::driver_operation_failed)?;
393
394 for index in &table.indices {
396 if index.primary_key {
398 continue;
399 }
400
401 let stmt = serializer.serialize(&sql::Statement::create_index(index), &mut params);
402 assert!(params.is_empty());
403
404 self.connection
405 .execute(&stmt, [])
406 .map_err(toasty_core::Error::driver_operation_failed)?;
407 }
408 Ok(())
409 }
410}