1#![warn(missing_docs)]
2
3mod value;
21pub(crate) use value::Value;
22
23use async_trait::async_trait;
24use rusqlite::Connection as RusqliteConnection;
25use std::{
26 borrow::Cow,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use toasty_core::{
31 Result, Schema,
32 driver::{
33 Capability, Driver, ExecResponse,
34 operation::{IsolationLevel, Operation, Transaction},
35 },
36 schema::{
37 db::{self, Migration, Table},
38 diff,
39 },
40 stmt,
41};
42use toasty_sql::{self as sql};
43use url::Url;
44
45#[derive(Debug)]
55pub enum Sqlite {
56 File(PathBuf),
58 InMemory,
60}
61
62impl Sqlite {
63 pub fn new(url: impl Into<String>) -> Result<Self> {
65 let url_str = url.into();
66 let url = Url::parse(&url_str).map_err(toasty_core::Error::driver_operation_failed)?;
67
68 if url.scheme() != "sqlite" {
69 return Err(toasty_core::Error::invalid_connection_url(format!(
70 "connection URL does not have a `sqlite` scheme; url={}",
71 url_str
72 )));
73 }
74
75 if url.path() == ":memory:" {
76 Ok(Self::InMemory)
77 } else {
78 Ok(Self::File(PathBuf::from(url.path())))
79 }
80 }
81
82 pub fn in_memory() -> Self {
84 Self::InMemory
85 }
86
87 pub fn open<P: AsRef<Path>>(path: P) -> Self {
89 Self::File(path.as_ref().to_path_buf())
90 }
91}
92
93#[async_trait]
94impl Driver for Sqlite {
95 fn url(&self) -> Cow<'_, str> {
96 match self {
97 Sqlite::InMemory => Cow::Borrowed("sqlite::memory:"),
98 Sqlite::File(path) => Cow::Owned(format!("sqlite:{}", path.display())),
99 }
100 }
101
102 fn capability(&self) -> &'static Capability {
103 &Capability::SQLITE
104 }
105
106 async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::Connection>> {
107 let connection = match self {
108 Sqlite::File(path) => Connection::open(path)?,
109 Sqlite::InMemory => Connection::in_memory(),
110 };
111 Ok(Box::new(connection))
112 }
113
114 fn max_connections(&self) -> Option<usize> {
115 matches!(self, Self::InMemory).then_some(1)
116 }
117
118 fn generate_migration(&self, schema_diff: &diff::Schema<'_>) -> Migration {
119 let statements = sql::MigrationStatement::from_diff(schema_diff, &Capability::SQLITE);
120
121 let sql_strings: Vec<String> = statements
122 .iter()
123 .map(|stmt| sql::Serializer::sqlite(stmt.schema()).serialize(stmt.statement()))
124 .collect();
125
126 Migration::new_sql_with_breakpoints(&sql_strings)
127 }
128
129 async fn reset_db(&self) -> toasty_core::Result<()> {
130 match self {
131 Sqlite::File(path) => {
132 if path.exists() {
134 std::fs::remove_file(path)
135 .map_err(toasty_core::Error::driver_operation_failed)?;
136 }
137 }
138 Sqlite::InMemory => {
139 }
141 }
142
143 Ok(())
144 }
145}
146
147#[derive(Debug)]
149pub struct Connection {
150 connection: RusqliteConnection,
151}
152
153impl Connection {
154 pub fn in_memory() -> Self {
156 let connection = RusqliteConnection::open_in_memory().unwrap();
157
158 Self { connection }
159 }
160
161 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
163 let connection =
164 RusqliteConnection::open(path).map_err(toasty_core::Error::driver_operation_failed)?;
165 let sqlite = Self { connection };
166 Ok(sqlite)
167 }
168}
169
170#[async_trait]
171impl toasty_core::driver::Connection for Connection {
172 async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
173 tracing::trace!(driver = "sqlite", op = %op.name(), "driver exec");
174
175 let (sql, typed_params, ret_tys) = match op {
176 Operation::QuerySql(op) => {
177 assert!(
178 op.last_insert_id_hack.is_none(),
179 "last_insert_id_hack is MySQL-specific and should not be set for SQLite"
180 );
181 (sql::Statement::from(op.stmt), op.params, op.ret)
182 }
183 Operation::Transaction(mut op) => {
185 if let Transaction::Start { isolation, .. } = &mut op {
186 if !matches!(isolation, Some(IsolationLevel::Serializable) | None) {
187 return Err(toasty_core::Error::unsupported_feature(
188 "SQLite only supports Serializable isolation",
189 ));
190 }
191 *isolation = None;
192 }
193 let sql = sql::Serializer::sqlite(&schema.db).serialize_transaction(&op);
194 self.connection
195 .execute(&sql, [])
196 .map_err(toasty_core::Error::driver_operation_failed)?;
197 return Ok(ExecResponse::count(0));
198 }
199 _ => todo!("op={:#?}", op),
200 };
201
202 let sql_str = sql::Serializer::sqlite(&schema.db).serialize(&sql);
203
204 tracing::debug!(db.system = "sqlite", db.statement = %sql_str, params = typed_params.len(), "executing SQL");
205
206 let mut stmt = self.connection.prepare_cached(&sql_str).unwrap();
207
208 let width = match &sql {
209 sql::Statement::Query(stmt) => match &stmt.body {
210 stmt::ExprSet::Select(stmt) => {
211 Some(stmt.returning.as_project_unwrap().as_record_unwrap().len())
212 }
213 _ => todo!(),
214 },
215 sql::Statement::Insert(stmt) => stmt
216 .returning
217 .as_ref()
218 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len()),
219 sql::Statement::Delete(stmt) => stmt
220 .returning
221 .as_ref()
222 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len()),
223 sql::Statement::Update(stmt) => {
224 assert!(stmt.condition.is_none(), "stmt={stmt:#?}");
225 stmt.returning
226 .as_ref()
227 .map(|returning| returning.as_project_unwrap().as_record_unwrap().len())
228 }
229 _ => None,
230 };
231
232 let params = typed_params
233 .into_iter()
234 .map(|tv| Value::from(tv.value))
235 .collect::<Vec<_>>();
236
237 if width.is_none() {
238 let count = stmt
239 .execute(rusqlite::params_from_iter(params.iter()))
240 .map_err(toasty_core::Error::driver_operation_failed)?;
241
242 return Ok(ExecResponse::count(count as _));
243 }
244
245 let mut rows = stmt
246 .query(rusqlite::params_from_iter(params.iter()))
247 .unwrap();
248
249 let mut ret = vec![];
250
251 let ret_tys = &ret_tys.as_ref().unwrap();
252
253 loop {
254 match rows.next() {
255 Ok(Some(row)) => {
256 let mut items = vec![];
257
258 let width = width.unwrap();
259
260 for index in 0..width {
261 items.push(Value::from_sql(row, index, &ret_tys[index]).into_inner());
262 }
263
264 ret.push(stmt::ValueRecord::from_vec(items).into());
265 }
266 Ok(None) => break,
267 Err(err) => {
268 return Err(toasty_core::Error::driver_operation_failed(err));
269 }
270 }
271 }
272
273 Ok(ExecResponse::value_stream(stmt::ValueStream::from_vec(ret)))
274 }
275
276 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
277 for table in &schema.db.tables {
278 tracing::debug!(table = %table.name, "creating table");
279 self.create_table(&schema.db, table)?;
280 }
281
282 Ok(())
283 }
284
285 async fn applied_migrations(
286 &mut self,
287 ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
288 self.connection
290 .execute(
291 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
292 id INTEGER PRIMARY KEY,
293 name TEXT NOT NULL,
294 applied_at TEXT NOT NULL
295 )",
296 [],
297 )
298 .map_err(toasty_core::Error::driver_operation_failed)?;
299
300 let mut stmt = self
302 .connection
303 .prepare("SELECT id FROM __toasty_migrations ORDER BY applied_at")
304 .map_err(toasty_core::Error::driver_operation_failed)?;
305
306 let rows = stmt
307 .query_map([], |row| {
308 let id: i64 = row.get(0)?;
309 Ok(toasty_core::schema::db::AppliedMigration::new(id as u64))
310 })
311 .map_err(toasty_core::Error::driver_operation_failed)?;
312
313 rows.collect::<rusqlite::Result<Vec<_>>>()
314 .map_err(toasty_core::Error::driver_operation_failed)
315 }
316
317 async fn apply_migration(
318 &mut self,
319 id: u64,
320 name: &str,
321 migration: &toasty_core::schema::db::Migration,
322 ) -> Result<()> {
323 tracing::info!(id = id, name = %name, "applying migration");
324 self.connection
326 .execute(
327 "CREATE TABLE IF NOT EXISTS __toasty_migrations (
328 id INTEGER PRIMARY KEY,
329 name TEXT NOT NULL,
330 applied_at TEXT NOT NULL
331 )",
332 [],
333 )
334 .map_err(toasty_core::Error::driver_operation_failed)?;
335
336 self.connection
338 .execute("BEGIN", [])
339 .map_err(toasty_core::Error::driver_operation_failed)?;
340
341 for statement in migration.statements() {
343 if let Err(e) = self
344 .connection
345 .execute(statement, [])
346 .map_err(toasty_core::Error::driver_operation_failed)
347 {
348 self.connection
349 .execute("ROLLBACK", [])
350 .map_err(toasty_core::Error::driver_operation_failed)?;
351 return Err(e);
352 }
353 }
354
355 if let Err(e) = self.connection.execute(
357 "INSERT INTO __toasty_migrations (id, name, applied_at) VALUES (?1, ?2, datetime('now'))",
358 rusqlite::params![id as i64, name],
359 ).map_err(toasty_core::Error::driver_operation_failed) {
360 self.connection.execute("ROLLBACK", []).map_err(toasty_core::Error::driver_operation_failed)?;
361 return Err(e);
362 }
363
364 self.connection
366 .execute("COMMIT", [])
367 .map_err(toasty_core::Error::driver_operation_failed)?;
368 Ok(())
369 }
370}
371
372impl Connection {
373 fn create_table(&mut self, schema: &db::Schema, table: &Table) -> Result<()> {
374 let serializer = sql::Serializer::sqlite(schema);
375
376 let stmt = serializer.serialize(&sql::Statement::create_table(table, &Capability::SQLITE));
377
378 self.connection
379 .execute(&stmt, [])
380 .map_err(toasty_core::Error::driver_operation_failed)?;
381
382 for index in &table.indices {
384 if index.primary_key {
386 continue;
387 }
388
389 let stmt = serializer.serialize(&sql::Statement::create_index(index));
390
391 self.connection
392 .execute(&stmt, [])
393 .map_err(toasty_core::Error::driver_operation_failed)?;
394 }
395 Ok(())
396 }
397}