Skip to main content

toasty_driver_dynamodb/
lib.rs

1#![warn(missing_docs)]
2
3//! Toasty driver for [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) using
4//! the [`aws-sdk-dynamodb`](https://docs.rs/aws-sdk-dynamodb) SDK.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! # async fn example() -> toasty_core::Result<()> {
10//! use toasty_driver_dynamodb::DynamoDb;
11//!
12//! let driver = DynamoDb::from_env("dynamodb://localhost".to_string()).await?;
13//! # Ok(())
14//! # }
15//! ```
16
17mod op;
18mod r#type;
19mod value;
20
21pub(crate) use r#type::TypeExt;
22pub(crate) use value::Value;
23
24use async_trait::async_trait;
25use toasty_core::{
26    Error, Result, Schema,
27    driver::{Capability, Driver, ExecResponse, operation::Operation},
28    schema::{
29        db::{self, Column, ColumnId, Migration, Table},
30        diff,
31    },
32    stmt::{self, ExprContext},
33};
34
35use aws_sdk_dynamodb::{
36    Client,
37    error::SdkError,
38    operation::transact_write_items::TransactWriteItemsError,
39    operation::update_item::UpdateItemError,
40    types::{
41        AttributeDefinition, AttributeValue, BillingMode, Delete, GlobalSecondaryIndex,
42        KeySchemaElement, KeyType, KeysAndAttributes, Projection, ProjectionType, Put, PutRequest,
43        ReturnValuesOnConditionCheckFailure, TransactWriteItem, Update, WriteRequest,
44    },
45};
46use std::{borrow::Cow, collections::HashMap, sync::Arc};
47
48/// A DynamoDB [`Driver`] backed by the AWS SDK.
49///
50/// Create one with [`DynamoDb::from_env`] to load AWS credentials and region
51/// from the environment, or [`DynamoDb::new`] / [`DynamoDb::with_sdk_config`]
52/// for manual setup.
53#[derive(Debug, Clone)]
54pub struct DynamoDb {
55    url: String,
56    client: Client,
57}
58
59impl DynamoDb {
60    /// Create driver with pre-built client (backward compatible, synchronous)
61    pub fn new(url: String, client: Client) -> Self {
62        Self { url, client }
63    }
64
65    /// Create driver loading AWS config from environment (async factory)
66    /// Reads: AWS_REGION, AWS_ENDPOINT_URL_DYNAMODB, AWS credentials, etc.
67    pub async fn from_env(url: String) -> Result<Self> {
68        use aws_config::BehaviorVersion;
69
70        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
71        let client = Client::new(&sdk_config);
72        Ok(Self::new(url, client))
73    }
74
75    /// Create driver with custom SdkConfig (synchronous)
76    pub fn with_sdk_config(url: String, sdk_config: &aws_config::SdkConfig) -> Self {
77        let client = Client::new(sdk_config);
78        Self::new(url, client)
79    }
80}
81
82#[async_trait]
83impl Driver for DynamoDb {
84    fn url(&self) -> Cow<'_, str> {
85        Cow::Borrowed(&self.url)
86    }
87
88    fn capability(&self) -> &'static Capability {
89        &Capability::DYNAMODB
90    }
91
92    async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::driver::Connection>> {
93        // Clone the shared client - cheap operation (Client uses Arc internally)
94        Ok(Box::new(Connection::new(self.client.clone())))
95    }
96
97    fn generate_migration(&self, _schema_diff: &diff::Schema<'_>) -> Migration {
98        unimplemented!(
99            "DynamoDB migrations are not yet supported. DynamoDB schema changes require manual table updates through the AWS console or SDK."
100        )
101    }
102
103    async fn reset_db(&self) -> toasty_core::Result<()> {
104        // Use shared client directly
105        let mut exclusive_start_table_name = None;
106        loop {
107            let mut req = self.client.list_tables();
108            if let Some(start) = &exclusive_start_table_name {
109                req = req.exclusive_start_table_name(start);
110            }
111
112            let resp = req
113                .send()
114                .await
115                .map_err(toasty_core::Error::driver_operation_failed)?;
116
117            if let Some(table_names) = &resp.table_names {
118                for table_name in table_names {
119                    self.client
120                        .delete_table()
121                        .table_name(table_name)
122                        .send()
123                        .await
124                        .map_err(toasty_core::Error::driver_operation_failed)?;
125                }
126            }
127
128            exclusive_start_table_name = resp.last_evaluated_table_name;
129            if exclusive_start_table_name.is_none() {
130                break;
131            }
132        }
133
134        Ok(())
135    }
136}
137
138/// An open connection to DynamoDB.
139#[derive(Debug)]
140pub struct Connection {
141    /// Handle to the AWS SDK client
142    client: Client,
143}
144
145impl Connection {
146    /// Wrap an existing [`aws_sdk_dynamodb::Client`] as a Toasty connection.
147    pub fn new(client: Client) -> Self {
148        Self { client }
149    }
150}
151
152#[async_trait]
153impl toasty_core::driver::Connection for Connection {
154    async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
155        self.exec2(schema, op).await
156    }
157
158    async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
159        for table in &schema.db.tables {
160            tracing::debug!(table = %table.name, "creating table");
161            self.create_table(&schema.db, table, true).await?;
162        }
163        Ok(())
164    }
165
166    async fn applied_migrations(
167        &mut self,
168    ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
169        todo!("DynamoDB migrations are not yet implemented")
170    }
171
172    async fn apply_migration(
173        &mut self,
174        _id: u64,
175        _name: &str,
176        _migration: &toasty_core::schema::db::Migration,
177    ) -> Result<()> {
178        todo!("DynamoDB migrations are not yet implemented")
179    }
180}
181
182impl Connection {
183    async fn exec2(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
184        match op {
185            Operation::GetByKey(op) => self.exec_get_by_key(schema, op).await,
186            Operation::QueryPk(op) => self.exec_query_pk(schema, op).await,
187            Operation::DeleteByKey(op) => self.exec_delete_by_key(&schema.db, op).await,
188            Operation::UpdateByKey(op) => self.exec_update_by_key(&schema.db, op).await,
189            Operation::FindPkByIndex(op) => self.exec_find_pk_by_index(schema, op).await,
190            Operation::QuerySql(op) => {
191                assert!(
192                    op.last_insert_id_hack.is_none(),
193                    "last_insert_id_hack is MySQL-specific and should not be set for DynamoDB"
194                );
195                match op.stmt {
196                    stmt::Statement::Insert(insert) => self.exec_insert(&schema.db, insert).await,
197                    _ => todo!("op={:#?}", op.stmt),
198                }
199            }
200            Operation::Scan(op) => self.exec_scan(schema, op).await,
201            Operation::Transaction(_) => Err(Error::unsupported_feature(
202                "transactions are not supported by the DynamoDB driver",
203            )),
204            _ => todo!("op={op:#?}"),
205        }
206    }
207}
208
209fn ddb_key(table: &Table, key: &stmt::Value) -> HashMap<String, AttributeValue> {
210    let mut ret = HashMap::new();
211
212    for (index, column) in table.primary_key_columns().enumerate() {
213        let value = match key {
214            stmt::Value::Record(record) => &record[index],
215            value => value,
216        };
217
218        ret.insert(column.name.clone(), Value::from(value.clone()).to_ddb());
219    }
220
221    ret
222}
223
224/// Convert a DynamoDB AttributeValue to stmt::Value (type-inferred).
225fn attr_value_to_stmt_value(attr: &AttributeValue) -> stmt::Value {
226    use AttributeValue as AV;
227
228    match attr {
229        AV::S(s) => stmt::Value::String(s.clone()),
230        AV::N(n) => {
231            // Try to parse as i64 first (most common), fallback to string
232            n.parse::<i64>()
233                .map(stmt::Value::I64)
234                .unwrap_or_else(|_| stmt::Value::String(n.clone()))
235        }
236        AV::Bool(b) => stmt::Value::Bool(*b),
237        AV::B(bytes) => stmt::Value::Bytes(bytes.clone().into_inner()),
238        AV::Null(_) => stmt::Value::Null,
239        // For complex types, convert to string representation
240        _ => stmt::Value::String(format!("{:?}", attr)),
241    }
242}
243
244/// Serialize a DynamoDB LastEvaluatedKey (for pagination) into stmt::Value.
245/// Format: flat record [name1, value1, name2, value2, ...]
246/// Example: { "pk": S("abc"), "sk": N("42") } → Record([String("pk"), String("abc"), String("sk"), I64(42)])
247fn serialize_ddb_cursor(last_key: &HashMap<String, AttributeValue>) -> stmt::Value {
248    let mut fields = Vec::with_capacity(last_key.len() * 2);
249
250    for (name, attr_value) in last_key {
251        fields.push(stmt::Value::String(name.clone()));
252        fields.push(attr_value_to_stmt_value(attr_value));
253    }
254
255    stmt::Value::Record(stmt::ValueRecord::from_vec(fields))
256}
257
258/// Deserialize a stmt::Value cursor into a DynamoDB ExclusiveStartKey.
259/// Expects flat record format: [name1, value1, name2, value2, ...]
260fn deserialize_ddb_cursor(cursor: &stmt::Value) -> HashMap<String, AttributeValue> {
261    let mut ret = HashMap::new();
262
263    if let stmt::Value::Record(fields) = cursor {
264        // Process pairs: [name, value, name, value, ...]
265        for chunk in fields.chunks(2) {
266            if chunk.len() == 2
267                && let (stmt::Value::String(name), value) = (&chunk[0], &chunk[1])
268            {
269                ret.insert(name.clone(), Value::from(value.clone()).to_ddb());
270            }
271        }
272    }
273
274    ret
275}
276
277fn ddb_key_schema(
278    partition_columns: &[&Column],
279    range_columns: &[&Column],
280) -> Vec<KeySchemaElement> {
281    let mut ks = vec![];
282
283    for col in partition_columns {
284        ks.push(
285            KeySchemaElement::builder()
286                .attribute_name(&col.name)
287                .key_type(KeyType::Hash)
288                .build()
289                .unwrap(),
290        );
291    }
292
293    for col in range_columns {
294        ks.push(
295            KeySchemaElement::builder()
296                .attribute_name(&col.name)
297                .key_type(KeyType::Range)
298                .build()
299                .unwrap(),
300        );
301    }
302
303    ks
304}
305
306fn item_to_record<'a, 'stmt>(
307    item: &HashMap<String, AttributeValue>,
308    columns: impl Iterator<Item = &'a Column>,
309) -> Result<stmt::ValueRecord> {
310    Ok(stmt::ValueRecord::from_vec(
311        columns
312            .map(|column| {
313                if let Some(value) = item.get(&column.name) {
314                    Value::from_ddb(&column.ty, value).into_inner()
315                } else {
316                    stmt::Value::Null
317                }
318            })
319            .collect(),
320    ))
321}
322
323fn ddb_expression(
324    cx: &ExprContext<'_, db::Schema>,
325    attrs: &mut ExprAttrs,
326    primary: bool,
327    expr: &stmt::Expr,
328) -> String {
329    match expr {
330        stmt::Expr::BinaryOp(expr_binary_op) => {
331            let lhs = ddb_expression(cx, attrs, primary, &expr_binary_op.lhs);
332            let rhs = ddb_expression(cx, attrs, primary, &expr_binary_op.rhs);
333
334            match expr_binary_op.op {
335                stmt::BinaryOp::Eq => format!("{lhs} = {rhs}"),
336                stmt::BinaryOp::Ne if primary => {
337                    todo!("!= conditions on primary key not supported")
338                }
339                stmt::BinaryOp::Ne => format!("{lhs} <> {rhs}"),
340                stmt::BinaryOp::Gt => format!("{lhs} > {rhs}"),
341                stmt::BinaryOp::Ge => format!("{lhs} >= {rhs}"),
342                stmt::BinaryOp::Lt => format!("{lhs} < {rhs}"),
343                stmt::BinaryOp::Le => format!("{lhs} <= {rhs}"),
344            }
345        }
346        stmt::Expr::Reference(expr_reference) => {
347            let column = cx.resolve_expr_reference(expr_reference).as_column_unwrap();
348            let is_bool = column.ty.is_bool();
349            let col_alias = attrs.column(column).to_string();
350            // A bare boolean column reference used as a predicate (result of
351            // `field = true` simplification) needs an explicit equality check.
352            if is_bool {
353                let true_val = attrs.ddb_value(aws_sdk_dynamodb::types::AttributeValue::Bool(true));
354                format!("{col_alias} = {true_val}")
355            } else {
356                col_alias
357            }
358        }
359        stmt::Expr::Value(val) => attrs.value(val),
360        stmt::Expr::And(expr_and) => {
361            let operands = expr_and
362                .operands
363                .iter()
364                .map(|operand| ddb_expression(cx, attrs, primary, operand))
365                .collect::<Vec<_>>();
366            operands.join(" AND ")
367        }
368        stmt::Expr::Or(expr_or) => {
369            let operands = expr_or
370                .operands
371                .iter()
372                .map(|operand| ddb_expression(cx, attrs, primary, operand))
373                .collect::<Vec<_>>();
374            format!("({})", operands.join(" OR "))
375        }
376        stmt::Expr::InList(in_list) => {
377            let expr = ddb_expression(cx, attrs, primary, &in_list.expr);
378
379            // Extract the list items and create individual attribute values
380            let items = match &*in_list.list {
381                stmt::Expr::Value(stmt::Value::List(vals)) => vals
382                    .iter()
383                    .map(|val| attrs.value(val))
384                    .collect::<Vec<_>>()
385                    .join(", "),
386                _ => {
387                    // If it's not a literal list, treat it as a single expression
388                    ddb_expression(cx, attrs, primary, &in_list.list)
389                }
390            };
391
392            format!("{expr} IN ({items})")
393        }
394        stmt::Expr::IsNull(expr_is_null) => {
395            let inner = ddb_expression(cx, attrs, primary, &expr_is_null.expr);
396            format!("attribute_not_exists({inner})")
397        }
398        stmt::Expr::Not(expr_not) => {
399            let inner = ddb_expression(cx, attrs, primary, &expr_not.expr);
400            format!("(NOT {inner})")
401        }
402        stmt::Expr::StartsWith(expr_starts_with) => {
403            let expr = ddb_expression(cx, attrs, primary, &expr_starts_with.expr);
404            let prefix = ddb_expression(cx, attrs, primary, &expr_starts_with.prefix);
405            format!("begins_with({expr}, {prefix})")
406        }
407        stmt::Expr::Like(_) => {
408            panic!(
409                "LIKE is not supported by the DynamoDB driver; use starts_with for prefix matching"
410            )
411        }
412        stmt::Expr::AnyOp(any) if matches!(any.op, stmt::BinaryOp::Eq) => {
413            // `Path::contains(value)` lowers to `value = ANY(col)`. On
414            // DynamoDB that's `contains(path, value)` — the standard List
415            // membership filter.
416            let value = ddb_expression(cx, attrs, primary, &any.lhs);
417            let path = ddb_expression(cx, attrs, primary, &any.rhs);
418            format!("contains({path}, {value})")
419        }
420        stmt::Expr::Length(expr) => {
421            let inner = ddb_expression(cx, attrs, primary, &expr.expr);
422            format!("size({inner})")
423        }
424        _ => todo!("FILTER = {:#?}", expr),
425    }
426}
427
428#[derive(Default)]
429struct ExprAttrs {
430    columns: HashMap<ColumnId, String>,
431    attr_names: HashMap<String, String>,
432    attr_values: HashMap<String, AttributeValue>,
433}
434
435impl ExprAttrs {
436    fn column(&mut self, column: &Column) -> &str {
437        use std::collections::hash_map::Entry;
438
439        match self.columns.entry(column.id) {
440            Entry::Vacant(e) => {
441                let name = format!("#col_{}", column.id.index);
442                self.attr_names.insert(name.clone(), column.name.clone());
443                e.insert(name)
444            }
445            Entry::Occupied(e) => e.into_mut(),
446        }
447    }
448
449    fn value(&mut self, val: &stmt::Value) -> String {
450        self.ddb_value(Value::from(val.clone()).to_ddb())
451    }
452
453    fn ddb_value(&mut self, val: AttributeValue) -> String {
454        let i = self.attr_values.len();
455        let name = format!(":v_{i}");
456        self.attr_values.insert(name.clone(), val);
457        name
458    }
459}