1#![warn(missing_docs)]
2
3mod value;
21pub(crate) use value::Value;
22
23use async_trait::async_trait;
24use rusqlite::Connection as RusqliteConnection;
25use std::{
26 borrow::Cow,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use toasty_core::{
31 Result, Schema,
32 driver::{
33 Capability, Driver, ExecResponse,
34 operation::{IsolationLevel, Operation, Transaction},
35 },
36 schema::db::{self, Migration, SchemaDiff, Table},
37 stmt,
38};
39use toasty_sql::{self as sql};
40use url::Url;
41
42#[derive(Debug)]
52pub enum Sqlite {
53 File(PathBuf),
55 InMemory,
57}
58
59impl Sqlite {
60 pub fn new(url: impl Into<String>) -> Result<Self> {
62 let url_str = url.into();
63 let url = Url::parse(&url_str).map_err(toasty_core::Error::driver_operation_failed)?;
64
65 if url.scheme() != "sqlite" {
66 return Err(toasty_core::Error::invalid_connection_url(format!(
67 "connection URL does not have a `sqlite` scheme; url={}",
68 url_str
69 )));
70 }
71
72 if url.path() == ":memory:" {
73 Ok(Self::InMemory)
74 } else {
75 Ok(Self::File(PathBuf::from(url.path())))
76 }
77 }
78
79 pub fn in_memory() -> Self {
81 Self::InMemory
82 }
83
84 pub fn open<P: AsRef<Path>>(path: P) -> Self {
86 Self::File(path.as_ref().to_path_buf())
87 }
88}
89
90#[async_trait]
91impl Driver for Sqlite {
92 fn url(&self) -> Cow<'_, str> {
93 match self {
94 Sqlite::InMemory => Cow::Borrowed("sqlite::memory:"),
95 Sqlite::File(path) => Cow::Owned(format!("sqlite:{}", path.display())),
96 }
97 }
98
99 fn capability(&self) -> &'static Capability {
100 &Capability::SQLITE
101 }
102
103 async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::Connection>> {
104 let connection = match self {
105 Sqlite::File(path) => Connection::open(path)?,
106 Sqlite::InMemory => Connection::in_memory(),
107 };
108 Ok(Box::new(connection))
109 }
110
111 fn max_connections(&self) -> Option<usize> {
112 matches!(self, Self::InMemory).then_some(1)
113 }
114
115 fn generate_migration(&self, schema_diff: &SchemaDiff<'_>) -> Migration {
116 let statements = sql::MigrationStatement::from_diff(schema_diff, &Capability::SQLITE);
117
118 let sql_strings: Vec<String> = statements
119 .iter()
120 .map(|stmt| sql::Serializer::sqlite(stmt.schema()).serialize(stmt.statement()))
121 .collect();
122
123 Migration::new_sql_with_breakpoints(&sql_strings)
124 }
125
126 async fn reset_db(&self) -> toasty_core::Result<()> {
127 match self {
128 Sqlite::File(path) => {
129 if path.exists() {
131 std::fs::remove_file(path)
132 .map_err(toasty_core::Error::driver_operation_failed)?;
133 }
134 }
135 Sqlite::InMemory => {
136 }
138 }
139
140 Ok(())
141 }
142}
143
144#[derive(Debug)]
146pub struct Connection {
147 connection: RusqliteConnection,
148}
149
150impl Connection {
151 pub fn in_memory() -> Self {
153 let connection = RusqliteConnection::open_in_memory().unwrap();
154
155 Self { connection }
156 }
157
158 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
160 let connection =
161 RusqliteConnection::open(path).map_err(toasty_core::Error::driver_operation_failed)?;
162 let sqlite = Self { connection };
163 Ok(sqlite)
164 }
165}
166
167#[async_trait]
168impl toasty_core::driver::Connection for Connection {
169 async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
170 tracing::trace!(driver = "sqlite", op = %op.name(), "driver exec");
171
172 let (sql, typed_params, ret_tys) = match op {
173 Operation::QuerySql(op) => {
174 assert!(
175 op.last_insert_id_hack.is_none(),
176 "last_insert_id_hack is MySQL-specific and should not be set for SQLite"
177 );
178 (sql::Statement::from(op.stmt), op.params, op.ret)
179 }
180 Operation::Transaction(mut op) => {
182 if let Transaction::Start { isolation, .. } = &mut op {
183 if !matches!(isolation, Some(IsolationLevel::Serializable) | None) {
184 return Err(toasty_core::Error::unsupported_feature(
185 "SQLite only supports Serializable isolation",
186 ));
187 }
188 *isolation = None;
189 }
190 let sql = sql::Serializer::sqlite(&schema.db).serialize_transaction(&op);
191 self.connection
192 .execute(&sql, [])
193 .map_err(toasty_core::Error::driver_operation_failed)?;
194 return Ok(ExecResponse::count(0));
195 }
196 _ => todo!("op={:#?}", op),
197 };
198
199 let sql_str = sql::Serializer::sqlite(&schema.db).serialize(&sql);
200
201 tracing::debug!(db.system = "sqlite", db.statement = %sql_str, params = typed_params.len(), "executing SQL");
202
203 let mut stmt = self.connection.prepare_cached(&sql_str).unwrap();
204
205 let width = match &sql {
206 sql::Statement::Query(stmt) => match &stmt.body {
207 stmt::ExprSet::Select(stmt) => {
208 Some(stmt.returning.as_project_unwrap().as_record_unwrap().len())
209 }
210 _ => todo!(),
211 },
212 sql::Statement::Insert(stmt) => stmt
213 .returning
214 .as_ref()
215 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len()),
216 sql::Statement::Delete(stmt) => stmt
217 .returning
218 .as_ref()
219 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len()),
220 sql::Statement::Update(stmt) => {
221 assert!(stmt.condition.is_none(), "stmt={stmt:#?}");
222 stmt.returning
223 .as_ref()
224 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len())
225 }
226 _ => None,
227 };
228
229 let params = typed_params
230 .into_iter()
231 .map(|tv| Value::from(tv.value))
232 .collect::<Vec<_>>();
233
234 if width.is_none() {
235 let count = stmt
236 .execute(rusqlite::params_from_iter(params.iter()))
237 .map_err(toasty_core::Error::driver_operation_failed)?;
238
239 return Ok(ExecResponse::count(count as _));
240 }
241
242 let mut rows = stmt
243 .query(rusqlite::params_from_iter(params.iter()))
244 .unwrap();
245
246 let mut ret = vec![];
247
248 let ret_tys = &ret_tys.as_ref().unwrap();
249
250 loop {
251 match rows.next() {
252 Ok(Some(row)) => {
253 let mut items = vec![];
254
255 let width = width.unwrap();
256
257 for index in 0..width {
258 items.push(Value::from_sql(row, index, &ret_tys[index]).into_inner());
259 }
260
261 ret.push(stmt::ValueRecord::from_vec(items).into());
262 }
263 Ok(None) => break,
264 Err(err) => {
265 return Err(toasty_core::Error::driver_operation_failed(err));
266 }
267 }
268 }
269
270 Ok(ExecResponse::value_stream(stmt::ValueStream::from_vec(ret)))
271 }
272
273 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
274 for table in &schema.db.tables {
275 tracing::debug!(table = %table.name, "creating table");
276 self.create_table(&schema.db, table)?;
277 }
278
279 Ok(())
280 }
281
282 async fn applied_migrations(
283 &mut self,
284 ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
285 self.connection
287 .execute(
288 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
289 id INTEGER PRIMARY KEY,
290 name TEXT NOT NULL,
291 applied_at TEXT NOT NULL
292 )",
293 [],
294 )
295 .map_err(toasty_core::Error::driver_operation_failed)?;
296
297 let mut stmt = self
299 .connection
300 .prepare("SELECT id FROM __toasty_migrations ORDER BY applied_at")
301 .map_err(toasty_core::Error::driver_operation_failed)?;
302
303 let rows = stmt
304 .query_map([], |row| {
305 let id: i64 = row.get(0)?;
306 Ok(toasty_core::schema::db::AppliedMigration::new(id as u64))
307 })
308 .map_err(toasty_core::Error::driver_operation_failed)?;
309
310 rows.collect::<rusqlite::Result<Vec<_>>>()
311 .map_err(toasty_core::Error::driver_operation_failed)
312 }
313
314 async fn apply_migration(
315 &mut self,
316 id: u64,
317 name: &str,
318 migration: &toasty_core::schema::db::Migration,
319 ) -> Result<()> {
320 tracing::info!(id = id, name = %name, "applying migration");
321 self.connection
323 .execute(
324 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
325 id INTEGER PRIMARY KEY,
326 name TEXT NOT NULL,
327 applied_at TEXT NOT NULL
328 )",
329 [],
330 )
331 .map_err(toasty_core::Error::driver_operation_failed)?;
332
333 self.connection
335 .execute("BEGIN", [])
336 .map_err(toasty_core::Error::driver_operation_failed)?;
337
338 for statement in migration.statements() {
340 if let Err(e) = self
341 .connection
342 .execute(statement, [])
343 .map_err(toasty_core::Error::driver_operation_failed)
344 {
345 self.connection
346 .execute("ROLLBACK", [])
347 .map_err(toasty_core::Error::driver_operation_failed)?;
348 return Err(e);
349 }
350 }
351
352 if let Err(e) = self.connection.execute(
354 "INSERT INTO __toasty_migrations (id, name, applied_at) VALUES (?1, ?2, datetime('now'))",
355 rusqlite::params![id as i64, name],
356 ).map_err(toasty_core::Error::driver_operation_failed) {
357 self.connection.execute("ROLLBACK", []).map_err(toasty_core::Error::driver_operation_failed)?;
358 return Err(e);
359 }
360
361 self.connection
363 .execute("COMMIT", [])
364 .map_err(toasty_core::Error::driver_operation_failed)?;
365 Ok(())
366 }
367}
368
369impl Connection {
370 fn create_table(&mut self, schema: &db::Schema, table: &Table) -> Result<()> {
371 let serializer = sql::Serializer::sqlite(schema);
372
373 let stmt = serializer.serialize(&sql::Statement::create_table(table, &Capability::SQLITE));
374
375 self.connection
376 .execute(&stmt, [])
377 .map_err(toasty_core::Error::driver_operation_failed)?;
378
379 for index in &table.indices {
381 if index.primary_key {
383 continue;
384 }
385
386 let stmt = serializer.serialize(&sql::Statement::create_index(index));
387
388 self.connection
389 .execute(&stmt, [])
390 .map_err(toasty_core::Error::driver_operation_failed)?;
391 }
392 Ok(())
393 }
394}