1#![warn(missing_docs)]
2
3mod op;
18mod r#type;
19mod value;
20
21pub(crate) use r#type::TypeExt;
22pub(crate) use value::Value;
23
24use async_trait::async_trait;
25use toasty_core::{
26 Error, Result, Schema,
27 driver::{Capability, Driver, ExecResponse, operation::Operation},
28 schema::db::{self, Column, ColumnId, Migration, SchemaDiff, Table},
29 stmt::{self, ExprContext},
30};
31
32use aws_sdk_dynamodb::{
33 Client,
34 error::SdkError,
35 operation::transact_write_items::TransactWriteItemsError,
36 operation::update_item::UpdateItemError,
37 types::{
38 AttributeDefinition, AttributeValue, BillingMode, Delete, GlobalSecondaryIndex,
39 KeySchemaElement, KeyType, KeysAndAttributes, Projection, ProjectionType, Put, PutRequest,
40 ReturnValuesOnConditionCheckFailure, TransactWriteItem, Update, WriteRequest,
41 },
42};
43use std::{borrow::Cow, collections::HashMap, sync::Arc};
44
45#[derive(Debug, Clone)]
51pub struct DynamoDb {
52 url: String,
53 client: Client,
54}
55
56impl DynamoDb {
57 pub fn new(url: String, client: Client) -> Self {
59 Self { url, client }
60 }
61
62 pub async fn from_env(url: String) -> Result<Self> {
65 use aws_config::BehaviorVersion;
66
67 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
68 let client = Client::new(&sdk_config);
69 Ok(Self::new(url, client))
70 }
71
72 pub fn with_sdk_config(url: String, sdk_config: &aws_config::SdkConfig) -> Self {
74 let client = Client::new(sdk_config);
75 Self::new(url, client)
76 }
77}
78
79#[async_trait]
80impl Driver for DynamoDb {
81 fn url(&self) -> Cow<'_, str> {
82 Cow::Borrowed(&self.url)
83 }
84
85 fn capability(&self) -> &'static Capability {
86 &Capability::DYNAMODB
87 }
88
89 async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::driver::Connection>> {
90 Ok(Box::new(Connection::new(self.client.clone())))
92 }
93
94 fn generate_migration(&self, _schema_diff: &SchemaDiff<'_>) -> Migration {
95 unimplemented!(
96 "DynamoDB migrations are not yet supported. DynamoDB schema changes require manual table updates through the AWS console or SDK."
97 )
98 }
99
100 async fn reset_db(&self) -> toasty_core::Result<()> {
101 let mut exclusive_start_table_name = None;
103 loop {
104 let mut req = self.client.list_tables();
105 if let Some(start) = &exclusive_start_table_name {
106 req = req.exclusive_start_table_name(start);
107 }
108
109 let resp = req
110 .send()
111 .await
112 .map_err(toasty_core::Error::driver_operation_failed)?;
113
114 if let Some(table_names) = &resp.table_names {
115 for table_name in table_names {
116 self.client
117 .delete_table()
118 .table_name(table_name)
119 .send()
120 .await
121 .map_err(toasty_core::Error::driver_operation_failed)?;
122 }
123 }
124
125 exclusive_start_table_name = resp.last_evaluated_table_name;
126 if exclusive_start_table_name.is_none() {
127 break;
128 }
129 }
130
131 Ok(())
132 }
133}
134
135#[derive(Debug)]
137pub struct Connection {
138 client: Client,
140}
141
142impl Connection {
143 pub fn new(client: Client) -> Self {
145 Self { client }
146 }
147}
148
149#[async_trait]
150impl toasty_core::driver::Connection for Connection {
151 async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
152 self.exec2(schema, op).await
153 }
154
155 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
156 for table in &schema.db.tables {
157 tracing::debug!(table = %table.name, "creating table");
158 self.create_table(&schema.db, table, true).await?;
159 }
160 Ok(())
161 }
162
163 async fn applied_migrations(
164 &mut self,
165 ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
166 todo!("DynamoDB migrations are not yet implemented")
167 }
168
169 async fn apply_migration(
170 &mut self,
171 _id: u64,
172 _name: &str,
173 _migration: &toasty_core::schema::db::Migration,
174 ) -> Result<()> {
175 todo!("DynamoDB migrations are not yet implemented")
176 }
177}
178
179impl Connection {
180 async fn exec2(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
181 match op {
182 Operation::GetByKey(op) => self.exec_get_by_key(schema, op).await,
183 Operation::QueryPk(op) => self.exec_query_pk(schema, op).await,
184 Operation::DeleteByKey(op) => self.exec_delete_by_key(&schema.db, op).await,
185 Operation::UpdateByKey(op) => self.exec_update_by_key(&schema.db, op).await,
186 Operation::FindPkByIndex(op) => self.exec_find_pk_by_index(schema, op).await,
187 Operation::QuerySql(op) => {
188 assert!(
189 op.last_insert_id_hack.is_none(),
190 "last_insert_id_hack is MySQL-specific and should not be set for DynamoDB"
191 );
192 match op.stmt {
193 stmt::Statement::Insert(insert) => self.exec_insert(&schema.db, insert).await,
194 _ => todo!("op={:#?}", op.stmt),
195 }
196 }
197 Operation::Transaction(_) => Err(Error::unsupported_feature(
198 "transactions are not supported by the DynamoDB driver",
199 )),
200 _ => todo!("op={op:#?}"),
201 }
202 }
203}
204
205fn ddb_key(table: &Table, key: &stmt::Value) -> HashMap<String, AttributeValue> {
206 let mut ret = HashMap::new();
207
208 for (index, column) in table.primary_key_columns().enumerate() {
209 let value = match key {
210 stmt::Value::Record(record) => &record[index],
211 value => value,
212 };
213
214 ret.insert(column.name.clone(), Value::from(value.clone()).to_ddb());
215 }
216
217 ret
218}
219
220fn attr_value_to_stmt_value(attr: &AttributeValue) -> stmt::Value {
222 use AttributeValue as AV;
223
224 match attr {
225 AV::S(s) => stmt::Value::String(s.clone()),
226 AV::N(n) => {
227 n.parse::<i64>()
229 .map(stmt::Value::I64)
230 .unwrap_or_else(|_| stmt::Value::String(n.clone()))
231 }
232 AV::Bool(b) => stmt::Value::Bool(*b),
233 AV::B(bytes) => stmt::Value::Bytes(bytes.clone().into_inner()),
234 AV::Null(_) => stmt::Value::Null,
235 _ => stmt::Value::String(format!("{:?}", attr)),
237 }
238}
239
240fn serialize_ddb_cursor(last_key: &HashMap<String, AttributeValue>) -> stmt::Value {
244 let mut fields = Vec::with_capacity(last_key.len() * 2);
245
246 for (name, attr_value) in last_key {
247 fields.push(stmt::Value::String(name.clone()));
248 fields.push(attr_value_to_stmt_value(attr_value));
249 }
250
251 stmt::Value::Record(stmt::ValueRecord::from_vec(fields))
252}
253
254fn deserialize_ddb_cursor(cursor: &stmt::Value) -> HashMap<String, AttributeValue> {
257 let mut ret = HashMap::new();
258
259 if let stmt::Value::Record(fields) = cursor {
260 for chunk in fields.chunks(2) {
262 if chunk.len() == 2
263 && let (stmt::Value::String(name), value) = (&chunk[0], &chunk[1])
264 {
265 ret.insert(name.clone(), Value::from(value.clone()).to_ddb());
266 }
267 }
268 }
269
270 ret
271}
272
273fn ddb_key_schema(
274 partition_columns: &[&Column],
275 range_columns: &[&Column],
276) -> Vec<KeySchemaElement> {
277 let mut ks = vec![];
278
279 for col in partition_columns {
280 ks.push(
281 KeySchemaElement::builder()
282 .attribute_name(&col.name)
283 .key_type(KeyType::Hash)
284 .build()
285 .unwrap(),
286 );
287 }
288
289 for col in range_columns {
290 ks.push(
291 KeySchemaElement::builder()
292 .attribute_name(&col.name)
293 .key_type(KeyType::Range)
294 .build()
295 .unwrap(),
296 );
297 }
298
299 ks
300}
301
302fn item_to_record<'a, 'stmt>(
303 item: &HashMap<String, AttributeValue>,
304 columns: impl Iterator<Item = &'a Column>,
305) -> Result<stmt::ValueRecord> {
306 Ok(stmt::ValueRecord::from_vec(
307 columns
308 .map(|column| {
309 if let Some(value) = item.get(&column.name) {
310 Value::from_ddb(&column.ty, value).into_inner()
311 } else {
312 stmt::Value::Null
313 }
314 })
315 .collect(),
316 ))
317}
318
319fn ddb_expression(
320 cx: &ExprContext<'_, db::Schema>,
321 attrs: &mut ExprAttrs,
322 primary: bool,
323 expr: &stmt::Expr,
324) -> String {
325 match expr {
326 stmt::Expr::BinaryOp(expr_binary_op) => {
327 let lhs = ddb_expression(cx, attrs, primary, &expr_binary_op.lhs);
328 let rhs = ddb_expression(cx, attrs, primary, &expr_binary_op.rhs);
329
330 match expr_binary_op.op {
331 stmt::BinaryOp::Eq => format!("{lhs} = {rhs}"),
332 stmt::BinaryOp::Ne if primary => {
333 todo!("!= conditions on primary key not supported")
334 }
335 stmt::BinaryOp::Ne => format!("{lhs} <> {rhs}"),
336 stmt::BinaryOp::Gt => format!("{lhs} > {rhs}"),
337 stmt::BinaryOp::Ge => format!("{lhs} >= {rhs}"),
338 stmt::BinaryOp::Lt => format!("{lhs} < {rhs}"),
339 stmt::BinaryOp::Le => format!("{lhs} <= {rhs}"),
340 }
341 }
342 stmt::Expr::Reference(expr_reference) => {
343 let column = cx.resolve_expr_reference(expr_reference).as_column_unwrap();
344 attrs.column(column).to_string()
345 }
346 stmt::Expr::Value(val) => attrs.value(val),
347 stmt::Expr::And(expr_and) => {
348 let operands = expr_and
349 .operands
350 .iter()
351 .map(|operand| ddb_expression(cx, attrs, primary, operand))
352 .collect::<Vec<_>>();
353 operands.join(" AND ")
354 }
355 stmt::Expr::Or(expr_or) => {
356 let operands = expr_or
357 .operands
358 .iter()
359 .map(|operand| ddb_expression(cx, attrs, primary, operand))
360 .collect::<Vec<_>>();
361 operands.join(" OR ")
362 }
363 stmt::Expr::InList(in_list) => {
364 let expr = ddb_expression(cx, attrs, primary, &in_list.expr);
365
366 let items = match &*in_list.list {
368 stmt::Expr::Value(stmt::Value::List(vals)) => vals
369 .iter()
370 .map(|val| attrs.value(val))
371 .collect::<Vec<_>>()
372 .join(", "),
373 _ => {
374 ddb_expression(cx, attrs, primary, &in_list.list)
376 }
377 };
378
379 format!("{expr} IN ({items})")
380 }
381 stmt::Expr::IsNull(expr_is_null) => {
382 let inner = ddb_expression(cx, attrs, primary, &expr_is_null.expr);
383 format!("attribute_not_exists({inner})")
384 }
385 stmt::Expr::Not(expr_not) => {
386 let inner = ddb_expression(cx, attrs, primary, &expr_not.expr);
387 format!("(NOT {inner})")
388 }
389 stmt::Expr::StartsWith(expr_starts_with) => {
390 let expr = ddb_expression(cx, attrs, primary, &expr_starts_with.expr);
391 let prefix = ddb_expression(cx, attrs, primary, &expr_starts_with.prefix);
392 format!("begins_with({expr}, {prefix})")
393 }
394 stmt::Expr::Like(_) => {
395 panic!(
396 "LIKE is not supported by the DynamoDB driver; use starts_with for prefix matching"
397 )
398 }
399 _ => todo!("FILTER = {:#?}", expr),
400 }
401}
402
403#[derive(Default)]
404struct ExprAttrs {
405 columns: HashMap<ColumnId, String>,
406 attr_names: HashMap<String, String>,
407 attr_values: HashMap<String, AttributeValue>,
408}
409
410impl ExprAttrs {
411 fn column(&mut self, column: &Column) -> &str {
412 use std::collections::hash_map::Entry;
413
414 match self.columns.entry(column.id) {
415 Entry::Vacant(e) => {
416 let name = format!("#col_{}", column.id.index);
417 self.attr_names.insert(name.clone(), column.name.clone());
418 e.insert(name)
419 }
420 Entry::Occupied(e) => e.into_mut(),
421 }
422 }
423
424 fn value(&mut self, val: &stmt::Value) -> String {
425 self.ddb_value(Value::from(val.clone()).to_ddb())
426 }
427
428 fn ddb_value(&mut self, val: AttributeValue) -> String {
429 let i = self.attr_values.len();
430 let name = format!(":v_{i}");
431 self.attr_values.insert(name.clone(), val);
432 name
433 }
434}