Skip to main content

toasty_driver_dynamodb/
lib.rs

1#![warn(missing_docs)]
2
3//! Toasty driver for [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) using
4//! the [`aws-sdk-dynamodb`](https://docs.rs/aws-sdk-dynamodb) SDK.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! # async fn example() -> toasty_core::Result<()> {
10//! use toasty_driver_dynamodb::DynamoDb;
11//!
12//! let driver = DynamoDb::from_env("dynamodb://localhost".to_string()).await?;
13//! # Ok(())
14//! # }
15//! ```
16
17mod op;
18mod r#type;
19mod value;
20
21pub(crate) use r#type::TypeExt;
22pub(crate) use value::Value;
23
24use async_trait::async_trait;
25use toasty_core::{
26    Error, Result, Schema,
27    driver::{Capability, Driver, ExecResponse, operation::Operation},
28    schema::db::{self, Column, ColumnId, Migration, SchemaDiff, Table},
29    stmt::{self, ExprContext},
30};
31
32use aws_sdk_dynamodb::{
33    Client,
34    error::SdkError,
35    operation::transact_write_items::TransactWriteItemsError,
36    operation::update_item::UpdateItemError,
37    types::{
38        AttributeDefinition, AttributeValue, BillingMode, Delete, GlobalSecondaryIndex,
39        KeySchemaElement, KeyType, KeysAndAttributes, Projection, ProjectionType, Put, PutRequest,
40        ReturnValuesOnConditionCheckFailure, TransactWriteItem, Update, WriteRequest,
41    },
42};
43use std::{borrow::Cow, collections::HashMap, sync::Arc};
44
45/// A DynamoDB [`Driver`] backed by the AWS SDK.
46///
47/// Create one with [`DynamoDb::from_env`] to load AWS credentials and region
48/// from the environment, or [`DynamoDb::new`] / [`DynamoDb::with_sdk_config`]
49/// for manual setup.
50#[derive(Debug, Clone)]
51pub struct DynamoDb {
52    url: String,
53    client: Client,
54}
55
56impl DynamoDb {
57    /// Create driver with pre-built client (backward compatible, synchronous)
58    pub fn new(url: String, client: Client) -> Self {
59        Self { url, client }
60    }
61
62    /// Create driver loading AWS config from environment (async factory)
63    /// Reads: AWS_REGION, AWS_ENDPOINT_URL_DYNAMODB, AWS credentials, etc.
64    pub async fn from_env(url: String) -> Result<Self> {
65        use aws_config::BehaviorVersion;
66
67        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
68        let client = Client::new(&sdk_config);
69        Ok(Self::new(url, client))
70    }
71
72    /// Create driver with custom SdkConfig (synchronous)
73    pub fn with_sdk_config(url: String, sdk_config: &aws_config::SdkConfig) -> Self {
74        let client = Client::new(sdk_config);
75        Self::new(url, client)
76    }
77}
78
79#[async_trait]
80impl Driver for DynamoDb {
81    fn url(&self) -> Cow<'_, str> {
82        Cow::Borrowed(&self.url)
83    }
84
85    fn capability(&self) -> &'static Capability {
86        &Capability::DYNAMODB
87    }
88
89    async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::driver::Connection>> {
90        // Clone the shared client - cheap operation (Client uses Arc internally)
91        Ok(Box::new(Connection::new(self.client.clone())))
92    }
93
94    fn generate_migration(&self, _schema_diff: &SchemaDiff<'_>) -> Migration {
95        unimplemented!(
96            "DynamoDB migrations are not yet supported. DynamoDB schema changes require manual table updates through the AWS console or SDK."
97        )
98    }
99
100    async fn reset_db(&self) -> toasty_core::Result<()> {
101        // Use shared client directly
102        let mut exclusive_start_table_name = None;
103        loop {
104            let mut req = self.client.list_tables();
105            if let Some(start) = &exclusive_start_table_name {
106                req = req.exclusive_start_table_name(start);
107            }
108
109            let resp = req
110                .send()
111                .await
112                .map_err(toasty_core::Error::driver_operation_failed)?;
113
114            if let Some(table_names) = &resp.table_names {
115                for table_name in table_names {
116                    self.client
117                        .delete_table()
118                        .table_name(table_name)
119                        .send()
120                        .await
121                        .map_err(toasty_core::Error::driver_operation_failed)?;
122                }
123            }
124
125            exclusive_start_table_name = resp.last_evaluated_table_name;
126            if exclusive_start_table_name.is_none() {
127                break;
128            }
129        }
130
131        Ok(())
132    }
133}
134
135/// An open connection to DynamoDB.
136#[derive(Debug)]
137pub struct Connection {
138    /// Handle to the AWS SDK client
139    client: Client,
140}
141
142impl Connection {
143    /// Wrap an existing [`aws_sdk_dynamodb::Client`] as a Toasty connection.
144    pub fn new(client: Client) -> Self {
145        Self { client }
146    }
147}
148
149#[async_trait]
150impl toasty_core::driver::Connection for Connection {
151    async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
152        self.exec2(schema, op).await
153    }
154
155    async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
156        for table in &schema.db.tables {
157            tracing::debug!(table = %table.name, "creating table");
158            self.create_table(&schema.db, table, true).await?;
159        }
160        Ok(())
161    }
162
163    async fn applied_migrations(
164        &mut self,
165    ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
166        todo!("DynamoDB migrations are not yet implemented")
167    }
168
169    async fn apply_migration(
170        &mut self,
171        _id: u64,
172        _name: &str,
173        _migration: &toasty_core::schema::db::Migration,
174    ) -> Result<()> {
175        todo!("DynamoDB migrations are not yet implemented")
176    }
177}
178
179impl Connection {
180    async fn exec2(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
181        match op {
182            Operation::GetByKey(op) => self.exec_get_by_key(schema, op).await,
183            Operation::QueryPk(op) => self.exec_query_pk(schema, op).await,
184            Operation::DeleteByKey(op) => self.exec_delete_by_key(&schema.db, op).await,
185            Operation::UpdateByKey(op) => self.exec_update_by_key(&schema.db, op).await,
186            Operation::FindPkByIndex(op) => self.exec_find_pk_by_index(schema, op).await,
187            Operation::QuerySql(op) => {
188                assert!(
189                    op.last_insert_id_hack.is_none(),
190                    "last_insert_id_hack is MySQL-specific and should not be set for DynamoDB"
191                );
192                match op.stmt {
193                    stmt::Statement::Insert(insert) => self.exec_insert(&schema.db, insert).await,
194                    _ => todo!("op={:#?}", op.stmt),
195                }
196            }
197            Operation::Transaction(_) => Err(Error::unsupported_feature(
198                "transactions are not supported by the DynamoDB driver",
199            )),
200            _ => todo!("op={op:#?}"),
201        }
202    }
203}
204
205fn ddb_key(table: &Table, key: &stmt::Value) -> HashMap<String, AttributeValue> {
206    let mut ret = HashMap::new();
207
208    for (index, column) in table.primary_key_columns().enumerate() {
209        let value = match key {
210            stmt::Value::Record(record) => &record[index],
211            value => value,
212        };
213
214        ret.insert(column.name.clone(), Value::from(value.clone()).to_ddb());
215    }
216
217    ret
218}
219
220/// Convert a DynamoDB AttributeValue to stmt::Value (type-inferred).
221fn attr_value_to_stmt_value(attr: &AttributeValue) -> stmt::Value {
222    use AttributeValue as AV;
223
224    match attr {
225        AV::S(s) => stmt::Value::String(s.clone()),
226        AV::N(n) => {
227            // Try to parse as i64 first (most common), fallback to string
228            n.parse::<i64>()
229                .map(stmt::Value::I64)
230                .unwrap_or_else(|_| stmt::Value::String(n.clone()))
231        }
232        AV::Bool(b) => stmt::Value::Bool(*b),
233        AV::B(bytes) => stmt::Value::Bytes(bytes.clone().into_inner()),
234        AV::Null(_) => stmt::Value::Null,
235        // For complex types, convert to string representation
236        _ => stmt::Value::String(format!("{:?}", attr)),
237    }
238}
239
240/// Serialize a DynamoDB LastEvaluatedKey (for pagination) into stmt::Value.
241/// Format: flat record [name1, value1, name2, value2, ...]
242/// Example: { "pk": S("abc"), "sk": N("42") } → Record([String("pk"), String("abc"), String("sk"), I64(42)])
243fn serialize_ddb_cursor(last_key: &HashMap<String, AttributeValue>) -> stmt::Value {
244    let mut fields = Vec::with_capacity(last_key.len() * 2);
245
246    for (name, attr_value) in last_key {
247        fields.push(stmt::Value::String(name.clone()));
248        fields.push(attr_value_to_stmt_value(attr_value));
249    }
250
251    stmt::Value::Record(stmt::ValueRecord::from_vec(fields))
252}
253
254/// Deserialize a stmt::Value cursor into a DynamoDB ExclusiveStartKey.
255/// Expects flat record format: [name1, value1, name2, value2, ...]
256fn deserialize_ddb_cursor(cursor: &stmt::Value) -> HashMap<String, AttributeValue> {
257    let mut ret = HashMap::new();
258
259    if let stmt::Value::Record(fields) = cursor {
260        // Process pairs: [name, value, name, value, ...]
261        for chunk in fields.chunks(2) {
262            if chunk.len() == 2
263                && let (stmt::Value::String(name), value) = (&chunk[0], &chunk[1])
264            {
265                ret.insert(name.clone(), Value::from(value.clone()).to_ddb());
266            }
267        }
268    }
269
270    ret
271}
272
273fn ddb_key_schema(
274    partition_columns: &[&Column],
275    range_columns: &[&Column],
276) -> Vec<KeySchemaElement> {
277    let mut ks = vec![];
278
279    for col in partition_columns {
280        ks.push(
281            KeySchemaElement::builder()
282                .attribute_name(&col.name)
283                .key_type(KeyType::Hash)
284                .build()
285                .unwrap(),
286        );
287    }
288
289    for col in range_columns {
290        ks.push(
291            KeySchemaElement::builder()
292                .attribute_name(&col.name)
293                .key_type(KeyType::Range)
294                .build()
295                .unwrap(),
296        );
297    }
298
299    ks
300}
301
302fn item_to_record<'a, 'stmt>(
303    item: &HashMap<String, AttributeValue>,
304    columns: impl Iterator<Item = &'a Column>,
305) -> Result<stmt::ValueRecord> {
306    Ok(stmt::ValueRecord::from_vec(
307        columns
308            .map(|column| {
309                if let Some(value) = item.get(&column.name) {
310                    Value::from_ddb(&column.ty, value).into_inner()
311                } else {
312                    stmt::Value::Null
313                }
314            })
315            .collect(),
316    ))
317}
318
319fn ddb_expression(
320    cx: &ExprContext<'_, db::Schema>,
321    attrs: &mut ExprAttrs,
322    primary: bool,
323    expr: &stmt::Expr,
324) -> String {
325    match expr {
326        stmt::Expr::BinaryOp(expr_binary_op) => {
327            let lhs = ddb_expression(cx, attrs, primary, &expr_binary_op.lhs);
328            let rhs = ddb_expression(cx, attrs, primary, &expr_binary_op.rhs);
329
330            match expr_binary_op.op {
331                stmt::BinaryOp::Eq => format!("{lhs} = {rhs}"),
332                stmt::BinaryOp::Ne if primary => {
333                    todo!("!= conditions on primary key not supported")
334                }
335                stmt::BinaryOp::Ne => format!("{lhs} <> {rhs}"),
336                stmt::BinaryOp::Gt => format!("{lhs} > {rhs}"),
337                stmt::BinaryOp::Ge => format!("{lhs} >= {rhs}"),
338                stmt::BinaryOp::Lt => format!("{lhs} < {rhs}"),
339                stmt::BinaryOp::Le => format!("{lhs} <= {rhs}"),
340            }
341        }
342        stmt::Expr::Reference(expr_reference) => {
343            let column = cx.resolve_expr_reference(expr_reference).as_column_unwrap();
344            attrs.column(column).to_string()
345        }
346        stmt::Expr::Value(val) => attrs.value(val),
347        stmt::Expr::And(expr_and) => {
348            let operands = expr_and
349                .operands
350                .iter()
351                .map(|operand| ddb_expression(cx, attrs, primary, operand))
352                .collect::<Vec<_>>();
353            operands.join(" AND ")
354        }
355        stmt::Expr::Or(expr_or) => {
356            let operands = expr_or
357                .operands
358                .iter()
359                .map(|operand| ddb_expression(cx, attrs, primary, operand))
360                .collect::<Vec<_>>();
361            operands.join(" OR ")
362        }
363        stmt::Expr::InList(in_list) => {
364            let expr = ddb_expression(cx, attrs, primary, &in_list.expr);
365
366            // Extract the list items and create individual attribute values
367            let items = match &*in_list.list {
368                stmt::Expr::Value(stmt::Value::List(vals)) => vals
369                    .iter()
370                    .map(|val| attrs.value(val))
371                    .collect::<Vec<_>>()
372                    .join(", "),
373                _ => {
374                    // If it's not a literal list, treat it as a single expression
375                    ddb_expression(cx, attrs, primary, &in_list.list)
376                }
377            };
378
379            format!("{expr} IN ({items})")
380        }
381        stmt::Expr::IsNull(expr_is_null) => {
382            let inner = ddb_expression(cx, attrs, primary, &expr_is_null.expr);
383            format!("attribute_not_exists({inner})")
384        }
385        stmt::Expr::Not(expr_not) => {
386            let inner = ddb_expression(cx, attrs, primary, &expr_not.expr);
387            format!("(NOT {inner})")
388        }
389        stmt::Expr::StartsWith(expr_starts_with) => {
390            let expr = ddb_expression(cx, attrs, primary, &expr_starts_with.expr);
391            let prefix = ddb_expression(cx, attrs, primary, &expr_starts_with.prefix);
392            format!("begins_with({expr}, {prefix})")
393        }
394        stmt::Expr::Like(_) => {
395            panic!(
396                "LIKE is not supported by the DynamoDB driver; use starts_with for prefix matching"
397            )
398        }
399        _ => todo!("FILTER = {:#?}", expr),
400    }
401}
402
403#[derive(Default)]
404struct ExprAttrs {
405    columns: HashMap<ColumnId, String>,
406    attr_names: HashMap<String, String>,
407    attr_values: HashMap<String, AttributeValue>,
408}
409
410impl ExprAttrs {
411    fn column(&mut self, column: &Column) -> &str {
412        use std::collections::hash_map::Entry;
413
414        match self.columns.entry(column.id) {
415            Entry::Vacant(e) => {
416                let name = format!("#col_{}", column.id.index);
417                self.attr_names.insert(name.clone(), column.name.clone());
418                e.insert(name)
419            }
420            Entry::Occupied(e) => e.into_mut(),
421        }
422    }
423
424    fn value(&mut self, val: &stmt::Value) -> String {
425        self.ddb_value(Value::from(val.clone()).to_ddb())
426    }
427
428    fn ddb_value(&mut self, val: AttributeValue) -> String {
429        let i = self.attr_values.len();
430        let name = format!(":v_{i}");
431        self.attr_values.insert(name.clone(), val);
432        name
433    }
434}