toasty_driver_dynamodb/
lib.rs

1mod op;
2mod r#type;
3mod value;
4
5pub(crate) use r#type::TypeExt;
6pub(crate) use value::Value;
7
8use toasty_core::{
9    async_trait,
10    driver::{operation::Operation, Capability, Driver, Response},
11    schema::db::{self, Column, ColumnId, Migration, SchemaDiff, Table},
12    stmt::{self, ExprContext},
13    Error, Result, Schema,
14};
15
16use aws_sdk_dynamodb::{
17    error::SdkError,
18    operation::update_item::UpdateItemError,
19    types::{
20        AttributeDefinition, AttributeValue, Delete, GlobalSecondaryIndex, KeySchemaElement,
21        KeyType, KeysAndAttributes, Projection, ProjectionType, ProvisionedThroughput, Put,
22        PutRequest, ReturnValuesOnConditionCheckFailure, TransactWriteItem, Update, WriteRequest,
23    },
24    Client,
25};
26use std::{borrow::Cow, collections::HashMap, sync::Arc};
27
28#[derive(Debug, Clone)]
29pub struct DynamoDb {
30    url: String,
31    client: Client,
32}
33
34impl DynamoDb {
35    /// Create driver with pre-built client (backward compatible, synchronous)
36    pub fn new(url: String, client: Client) -> Self {
37        Self { url, client }
38    }
39
40    /// Create driver loading AWS config from environment (async factory)
41    /// Reads: AWS_REGION, AWS_ENDPOINT_URL_DYNAMODB, AWS credentials, etc.
42    pub async fn from_env(url: String) -> Result<Self> {
43        use aws_config::BehaviorVersion;
44
45        let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
46        let client = Client::new(&sdk_config);
47        Ok(Self::new(url, client))
48    }
49
50    /// Create driver with custom SdkConfig (synchronous)
51    pub fn with_sdk_config(url: String, sdk_config: &aws_config::SdkConfig) -> Self {
52        let client = Client::new(sdk_config);
53        Self::new(url, client)
54    }
55}
56
57#[async_trait]
58impl Driver for DynamoDb {
59    fn url(&self) -> Cow<'_, str> {
60        Cow::Borrowed(&self.url)
61    }
62
63    fn capability(&self) -> &'static Capability {
64        &Capability::DYNAMODB
65    }
66
67    async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::driver::Connection>> {
68        // Clone the shared client - cheap operation (Client uses Arc internally)
69        Ok(Box::new(Connection::new(self.client.clone())))
70    }
71
72    fn generate_migration(&self, _schema_diff: &SchemaDiff<'_>) -> Migration {
73        unimplemented!("DynamoDB migrations are not yet supported. DynamoDB schema changes require manual table updates through the AWS console or SDK.")
74    }
75
76    async fn reset_db(&self) -> toasty_core::Result<()> {
77        // Use shared client directly
78        let mut exclusive_start_table_name = None;
79        loop {
80            let mut req = self.client.list_tables();
81            if let Some(start) = &exclusive_start_table_name {
82                req = req.exclusive_start_table_name(start);
83            }
84
85            let resp = req
86                .send()
87                .await
88                .map_err(toasty_core::Error::driver_operation_failed)?;
89
90            if let Some(table_names) = &resp.table_names {
91                for table_name in table_names {
92                    self.client
93                        .delete_table()
94                        .table_name(table_name)
95                        .send()
96                        .await
97                        .map_err(toasty_core::Error::driver_operation_failed)?;
98                }
99            }
100
101            exclusive_start_table_name = resp.last_evaluated_table_name;
102            if exclusive_start_table_name.is_none() {
103                break;
104            }
105        }
106
107        Ok(())
108    }
109}
110
111#[derive(Debug)]
112pub struct Connection {
113    /// Handle to the AWS SDK client
114    client: Client,
115}
116
117impl Connection {
118    pub fn new(client: Client) -> Self {
119        Self { client }
120    }
121}
122
123#[async_trait]
124impl toasty_core::driver::Connection for Connection {
125    async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<Response> {
126        self.exec2(schema, op).await
127    }
128
129    async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
130        for table in &schema.db.tables {
131            self.create_table(&schema.db, table, true).await?;
132        }
133        Ok(())
134    }
135
136    async fn applied_migrations(
137        &mut self,
138    ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
139        todo!("DynamoDB migrations are not yet implemented")
140    }
141
142    async fn apply_migration(
143        &mut self,
144        _id: u64,
145        _name: String,
146        _migration: &toasty_core::schema::db::Migration,
147    ) -> Result<()> {
148        todo!("DynamoDB migrations are not yet implemented")
149    }
150}
151
152impl Connection {
153    async fn exec2(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<Response> {
154        match op {
155            Operation::GetByKey(op) => self.exec_get_by_key(schema, op).await,
156            Operation::QueryPk(op) => self.exec_query_pk(schema, op).await,
157            Operation::DeleteByKey(op) => self.exec_delete_by_key(&schema.db, op).await,
158            Operation::UpdateByKey(op) => self.exec_update_by_key(&schema.db, op).await,
159            Operation::FindPkByIndex(op) => self.exec_find_pk_by_index(schema, op).await,
160            Operation::QuerySql(op) => {
161                assert!(
162                    op.last_insert_id_hack.is_none(),
163                    "last_insert_id_hack is MySQL-specific and should not be set for DynamoDB"
164                );
165                match op.stmt {
166                    stmt::Statement::Insert(op) => self.exec_insert(&schema.db, op).await,
167                    _ => todo!("op={:#?}", op),
168                }
169            }
170            Operation::Transaction(_) => Err(Error::unsupported_feature(
171                "transactions are not supported by the DynamoDB driver",
172            )),
173            _ => todo!("op={op:#?}"),
174        }
175    }
176}
177
178fn ddb_key(table: &Table, key: &stmt::Value) -> HashMap<String, AttributeValue> {
179    let mut ret = HashMap::new();
180
181    for (index, column) in table.primary_key_columns().enumerate() {
182        let value = match key {
183            stmt::Value::Record(record) => &record[index],
184            value => value,
185        };
186
187        ret.insert(column.name.clone(), Value::from(value.clone()).to_ddb());
188    }
189
190    ret
191}
192
193fn ddb_key_schema(partition: &Column, range: Option<&Column>) -> Vec<KeySchemaElement> {
194    let mut ks = vec![];
195
196    ks.push(
197        KeySchemaElement::builder()
198            .attribute_name(&partition.name)
199            .key_type(KeyType::Hash)
200            .build()
201            .unwrap(),
202    );
203
204    if let Some(range) = range {
205        ks.push(
206            KeySchemaElement::builder()
207                .attribute_name(&range.name)
208                .key_type(KeyType::Range)
209                .build()
210                .unwrap(),
211        );
212    }
213
214    ks
215}
216
217fn item_to_record<'a, 'stmt>(
218    item: &HashMap<String, AttributeValue>,
219    columns: impl Iterator<Item = &'a Column>,
220) -> Result<stmt::ValueRecord> {
221    Ok(stmt::ValueRecord::from_vec(
222        columns
223            .map(|column| {
224                if let Some(value) = item.get(&column.name) {
225                    Value::from_ddb(&column.ty, value).into_inner()
226                } else {
227                    stmt::Value::Null
228                }
229            })
230            .collect(),
231    ))
232}
233
234fn ddb_expression(
235    cx: &ExprContext<'_, db::Schema>,
236    attrs: &mut ExprAttrs,
237    primary: bool,
238    expr: &stmt::Expr,
239) -> String {
240    match expr {
241        stmt::Expr::BinaryOp(expr_binary_op) => {
242            let lhs = ddb_expression(cx, attrs, primary, &expr_binary_op.lhs);
243            let rhs = ddb_expression(cx, attrs, primary, &expr_binary_op.rhs);
244
245            match expr_binary_op.op {
246                stmt::BinaryOp::Eq => format!("{lhs} = {rhs}"),
247                stmt::BinaryOp::Ne if primary => {
248                    todo!("!= conditions on primary key not supported")
249                }
250                stmt::BinaryOp::Ne => format!("{lhs} <> {rhs}"),
251                stmt::BinaryOp::Gt => format!("{lhs} > {rhs}"),
252                stmt::BinaryOp::Ge => format!("{lhs} >= {rhs}"),
253                stmt::BinaryOp::Lt => format!("{lhs} < {rhs}"),
254                stmt::BinaryOp::Le => format!("{lhs} <= {rhs}"),
255            }
256        }
257        stmt::Expr::Reference(expr_reference) => {
258            let column = cx.resolve_expr_reference(expr_reference).expect_column();
259            attrs.column(column).to_string()
260        }
261        stmt::Expr::Value(val) => attrs.value(val),
262        stmt::Expr::And(expr_and) => {
263            let operands = expr_and
264                .operands
265                .iter()
266                .map(|operand| ddb_expression(cx, attrs, primary, operand))
267                .collect::<Vec<_>>();
268            operands.join(" AND ")
269        }
270        stmt::Expr::Or(expr_or) => {
271            let operands = expr_or
272                .operands
273                .iter()
274                .map(|operand| ddb_expression(cx, attrs, primary, operand))
275                .collect::<Vec<_>>();
276            operands.join(" OR ")
277        }
278        stmt::Expr::InList(in_list) => {
279            let expr = ddb_expression(cx, attrs, primary, &in_list.expr);
280
281            // Extract the list items and create individual attribute values
282            let items = match &*in_list.list {
283                stmt::Expr::Value(stmt::Value::List(vals)) => vals
284                    .iter()
285                    .map(|val| attrs.value(val))
286                    .collect::<Vec<_>>()
287                    .join(", "),
288                _ => {
289                    // If it's not a literal list, treat it as a single expression
290                    ddb_expression(cx, attrs, primary, &in_list.list)
291                }
292            };
293
294            format!("{expr} IN ({items})")
295        }
296        stmt::Expr::IsNull(expr_is_null) => {
297            let inner = ddb_expression(cx, attrs, primary, &expr_is_null.expr);
298            format!("attribute_not_exists({inner})")
299        }
300        stmt::Expr::Not(expr_not) => {
301            let inner = ddb_expression(cx, attrs, primary, &expr_not.expr);
302            format!("(NOT {inner})")
303        }
304        _ => todo!("FILTER = {:#?}", expr),
305    }
306}
307
308#[derive(Default)]
309struct ExprAttrs {
310    columns: HashMap<ColumnId, String>,
311    attr_names: HashMap<String, String>,
312    attr_values: HashMap<String, AttributeValue>,
313}
314
315impl ExprAttrs {
316    fn column(&mut self, column: &Column) -> &str {
317        use std::collections::hash_map::Entry;
318
319        match self.columns.entry(column.id) {
320            Entry::Vacant(e) => {
321                let name = format!("#col_{}", column.id.index);
322                self.attr_names.insert(name.clone(), column.name.clone());
323                e.insert(name)
324            }
325            Entry::Occupied(e) => e.into_mut(),
326        }
327    }
328
329    fn value(&mut self, val: &stmt::Value) -> String {
330        self.ddb_value(Value::from(val.clone()).to_ddb())
331    }
332
333    fn ddb_value(&mut self, val: AttributeValue) -> String {
334        let i = self.attr_values.len();
335        let name = format!(":v_{i}");
336        self.attr_values.insert(name.clone(), val);
337        name
338    }
339}