1#![warn(missing_docs)]
2
3mod op;
18mod r#type;
19mod value;
20
21pub(crate) use r#type::TypeExt;
22pub(crate) use value::Value;
23
24use async_trait::async_trait;
25use toasty_core::{
26 Error, Result, Schema,
27 driver::{Capability, Driver, ExecResponse, operation::Operation},
28 schema::{
29 db::{self, Column, ColumnId, Migration, Table},
30 diff,
31 },
32 stmt::{self, ExprContext},
33};
34
35use aws_sdk_dynamodb::{
36 Client,
37 error::SdkError,
38 operation::transact_write_items::TransactWriteItemsError,
39 operation::update_item::UpdateItemError,
40 types::{
41 AttributeDefinition, AttributeValue, BillingMode, Delete, GlobalSecondaryIndex,
42 KeySchemaElement, KeyType, KeysAndAttributes, Projection, ProjectionType, Put, PutRequest,
43 ReturnValuesOnConditionCheckFailure, TransactWriteItem, Update, WriteRequest,
44 },
45};
46use std::{borrow::Cow, collections::HashMap, sync::Arc};
47
48#[derive(Debug, Clone)]
54pub struct DynamoDb {
55 url: String,
56 client: Client,
57}
58
59impl DynamoDb {
60 pub fn new(url: String, client: Client) -> Self {
62 Self { url, client }
63 }
64
65 pub async fn from_env(url: String) -> Result<Self> {
68 use aws_config::BehaviorVersion;
69
70 let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
71 let client = Client::new(&sdk_config);
72 Ok(Self::new(url, client))
73 }
74
75 pub fn with_sdk_config(url: String, sdk_config: &aws_config::SdkConfig) -> Self {
77 let client = Client::new(sdk_config);
78 Self::new(url, client)
79 }
80}
81
82#[async_trait]
83impl Driver for DynamoDb {
84 fn url(&self) -> Cow<'_, str> {
85 Cow::Borrowed(&self.url)
86 }
87
88 fn capability(&self) -> &'static Capability {
89 &Capability::DYNAMODB
90 }
91
92 async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::driver::Connection>> {
93 Ok(Box::new(Connection::new(self.client.clone())))
95 }
96
97 fn generate_migration(&self, _schema_diff: &diff::Schema<'_>) -> Migration {
98 unimplemented!(
99 "DynamoDB migrations are not yet supported. DynamoDB schema changes require manual table updates through the AWS console or SDK."
100 )
101 }
102
103 async fn reset_db(&self) -> toasty_core::Result<()> {
104 let mut exclusive_start_table_name = None;
106 loop {
107 let mut req = self.client.list_tables();
108 if let Some(start) = &exclusive_start_table_name {
109 req = req.exclusive_start_table_name(start);
110 }
111
112 let resp = req
113 .send()
114 .await
115 .map_err(toasty_core::Error::driver_operation_failed)?;
116
117 if let Some(table_names) = &resp.table_names {
118 for table_name in table_names {
119 self.client
120 .delete_table()
121 .table_name(table_name)
122 .send()
123 .await
124 .map_err(toasty_core::Error::driver_operation_failed)?;
125 }
126 }
127
128 exclusive_start_table_name = resp.last_evaluated_table_name;
129 if exclusive_start_table_name.is_none() {
130 break;
131 }
132 }
133
134 Ok(())
135 }
136}
137
138#[derive(Debug)]
140pub struct Connection {
141 client: Client,
143}
144
145impl Connection {
146 pub fn new(client: Client) -> Self {
148 Self { client }
149 }
150}
151
152#[async_trait]
153impl toasty_core::driver::Connection for Connection {
154 async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
155 self.exec2(schema, op).await
156 }
157
158 async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
159 for table in &schema.db.tables {
160 tracing::debug!(table = %table.name, "creating table");
161 self.create_table(&schema.db, table, true).await?;
162 }
163 Ok(())
164 }
165
166 async fn applied_migrations(
167 &mut self,
168 ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
169 todo!("DynamoDB migrations are not yet implemented")
170 }
171
172 async fn apply_migration(
173 &mut self,
174 _id: u64,
175 _name: &str,
176 _migration: &toasty_core::schema::db::Migration,
177 ) -> Result<()> {
178 todo!("DynamoDB migrations are not yet implemented")
179 }
180}
181
182impl Connection {
183 async fn exec2(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
184 match op {
185 Operation::GetByKey(op) => self.exec_get_by_key(schema, op).await,
186 Operation::QueryPk(op) => self.exec_query_pk(schema, op).await,
187 Operation::DeleteByKey(op) => self.exec_delete_by_key(&schema.db, op).await,
188 Operation::UpdateByKey(op) => self.exec_update_by_key(&schema.db, op).await,
189 Operation::FindPkByIndex(op) => self.exec_find_pk_by_index(schema, op).await,
190 Operation::QuerySql(op) => {
191 assert!(
192 op.last_insert_id_hack.is_none(),
193 "last_insert_id_hack is MySQL-specific and should not be set for DynamoDB"
194 );
195 match op.stmt {
196 stmt::Statement::Insert(insert) => self.exec_insert(&schema.db, insert).await,
197 _ => todo!("op={:#?}", op.stmt),
198 }
199 }
200 Operation::Scan(op) => self.exec_scan(schema, op).await,
201 Operation::Transaction(_) => Err(Error::unsupported_feature(
202 "transactions are not supported by the DynamoDB driver",
203 )),
204 _ => todo!("op={op:#?}"),
205 }
206 }
207}
208
209fn ddb_key(table: &Table, key: &stmt::Value) -> HashMap<String, AttributeValue> {
210 let mut ret = HashMap::new();
211
212 for (index, column) in table.primary_key_columns().enumerate() {
213 let value = match key {
214 stmt::Value::Record(record) => &record[index],
215 value => value,
216 };
217
218 ret.insert(column.name.clone(), Value::from(value.clone()).to_ddb());
219 }
220
221 ret
222}
223
224fn attr_value_to_stmt_value(attr: &AttributeValue) -> stmt::Value {
226 use AttributeValue as AV;
227
228 match attr {
229 AV::S(s) => stmt::Value::String(s.clone()),
230 AV::N(n) => {
231 n.parse::<i64>()
233 .map(stmt::Value::I64)
234 .unwrap_or_else(|_| stmt::Value::String(n.clone()))
235 }
236 AV::Bool(b) => stmt::Value::Bool(*b),
237 AV::B(bytes) => stmt::Value::Bytes(bytes.clone().into_inner()),
238 AV::Null(_) => stmt::Value::Null,
239 _ => stmt::Value::String(format!("{:?}", attr)),
241 }
242}
243
244fn serialize_ddb_cursor(last_key: &HashMap<String, AttributeValue>) -> stmt::Value {
248 let mut fields = Vec::with_capacity(last_key.len() * 2);
249
250 for (name, attr_value) in last_key {
251 fields.push(stmt::Value::String(name.clone()));
252 fields.push(attr_value_to_stmt_value(attr_value));
253 }
254
255 stmt::Value::Record(stmt::ValueRecord::from_vec(fields))
256}
257
258fn deserialize_ddb_cursor(cursor: &stmt::Value) -> HashMap<String, AttributeValue> {
261 let mut ret = HashMap::new();
262
263 if let stmt::Value::Record(fields) = cursor {
264 for chunk in fields.chunks(2) {
266 if chunk.len() == 2
267 && let (stmt::Value::String(name), value) = (&chunk[0], &chunk[1])
268 {
269 ret.insert(name.clone(), Value::from(value.clone()).to_ddb());
270 }
271 }
272 }
273
274 ret
275}
276
277fn ddb_key_schema(
278 partition_columns: &[&Column],
279 range_columns: &[&Column],
280) -> Vec<KeySchemaElement> {
281 let mut ks = vec![];
282
283 for col in partition_columns {
284 ks.push(
285 KeySchemaElement::builder()
286 .attribute_name(&col.name)
287 .key_type(KeyType::Hash)
288 .build()
289 .unwrap(),
290 );
291 }
292
293 for col in range_columns {
294 ks.push(
295 KeySchemaElement::builder()
296 .attribute_name(&col.name)
297 .key_type(KeyType::Range)
298 .build()
299 .unwrap(),
300 );
301 }
302
303 ks
304}
305
306fn item_to_record<'a, 'stmt>(
307 item: &HashMap<String, AttributeValue>,
308 columns: impl Iterator<Item = &'a Column>,
309) -> Result<stmt::ValueRecord> {
310 Ok(stmt::ValueRecord::from_vec(
311 columns
312 .map(|column| {
313 if let Some(value) = item.get(&column.name) {
314 Value::from_ddb(&column.ty, value).into_inner()
315 } else {
316 stmt::Value::Null
317 }
318 })
319 .collect(),
320 ))
321}
322
323fn ddb_expression(
324 cx: &ExprContext<'_, db::Schema>,
325 attrs: &mut ExprAttrs,
326 primary: bool,
327 expr: &stmt::Expr,
328) -> String {
329 match expr {
330 stmt::Expr::BinaryOp(expr_binary_op) => {
331 let lhs = ddb_expression(cx, attrs, primary, &expr_binary_op.lhs);
332 let rhs = ddb_expression(cx, attrs, primary, &expr_binary_op.rhs);
333
334 match expr_binary_op.op {
335 stmt::BinaryOp::Eq => format!("{lhs} = {rhs}"),
336 stmt::BinaryOp::Ne if primary => {
337 todo!("!= conditions on primary key not supported")
338 }
339 stmt::BinaryOp::Ne => format!("{lhs} <> {rhs}"),
340 stmt::BinaryOp::Gt => format!("{lhs} > {rhs}"),
341 stmt::BinaryOp::Ge => format!("{lhs} >= {rhs}"),
342 stmt::BinaryOp::Lt => format!("{lhs} < {rhs}"),
343 stmt::BinaryOp::Le => format!("{lhs} <= {rhs}"),
344 }
345 }
346 stmt::Expr::Reference(expr_reference) => {
347 let column = cx.resolve_expr_reference(expr_reference).as_column_unwrap();
348 let is_bool = column.ty.is_bool();
349 let col_alias = attrs.column(column).to_string();
350 if is_bool {
353 let true_val = attrs.ddb_value(aws_sdk_dynamodb::types::AttributeValue::Bool(true));
354 format!("{col_alias} = {true_val}")
355 } else {
356 col_alias
357 }
358 }
359 stmt::Expr::Value(val) => attrs.value(val),
360 stmt::Expr::And(expr_and) => {
361 let operands = expr_and
362 .operands
363 .iter()
364 .map(|operand| ddb_expression(cx, attrs, primary, operand))
365 .collect::<Vec<_>>();
366 operands.join(" AND ")
367 }
368 stmt::Expr::Or(expr_or) => {
369 let operands = expr_or
370 .operands
371 .iter()
372 .map(|operand| ddb_expression(cx, attrs, primary, operand))
373 .collect::<Vec<_>>();
374 format!("({})", operands.join(" OR "))
375 }
376 stmt::Expr::InList(in_list) => {
377 let expr = ddb_expression(cx, attrs, primary, &in_list.expr);
378
379 let items = match &*in_list.list {
381 stmt::Expr::Value(stmt::Value::List(vals)) => vals
382 .iter()
383 .map(|val| attrs.value(val))
384 .collect::<Vec<_>>()
385 .join(", "),
386 _ => {
387 ddb_expression(cx, attrs, primary, &in_list.list)
389 }
390 };
391
392 format!("{expr} IN ({items})")
393 }
394 stmt::Expr::IsNull(expr_is_null) => {
395 let inner = ddb_expression(cx, attrs, primary, &expr_is_null.expr);
396 format!("attribute_not_exists({inner})")
397 }
398 stmt::Expr::Not(expr_not) => {
399 let inner = ddb_expression(cx, attrs, primary, &expr_not.expr);
400 format!("(NOT {inner})")
401 }
402 stmt::Expr::StartsWith(expr_starts_with) => {
403 let expr = ddb_expression(cx, attrs, primary, &expr_starts_with.expr);
404 let prefix = ddb_expression(cx, attrs, primary, &expr_starts_with.prefix);
405 format!("begins_with({expr}, {prefix})")
406 }
407 stmt::Expr::Like(_) => {
408 panic!(
409 "LIKE is not supported by the DynamoDB driver; use starts_with for prefix matching"
410 )
411 }
412 stmt::Expr::AnyOp(any) if matches!(any.op, stmt::BinaryOp::Eq) => {
413 let value = ddb_expression(cx, attrs, primary, &any.lhs);
417 let path = ddb_expression(cx, attrs, primary, &any.rhs);
418 format!("contains({path}, {value})")
419 }
420 stmt::Expr::Length(expr) => {
421 let inner = ddb_expression(cx, attrs, primary, &expr.expr);
422 format!("size({inner})")
423 }
424 _ => todo!("FILTER = {:#?}", expr),
425 }
426}
427
428#[derive(Default)]
429struct ExprAttrs {
430 columns: HashMap<ColumnId, String>,
431 attr_names: HashMap<String, String>,
432 attr_values: HashMap<String, AttributeValue>,
433}
434
435impl ExprAttrs {
436 fn column(&mut self, column: &Column) -> &str {
437 use std::collections::hash_map::Entry;
438
439 match self.columns.entry(column.id) {
440 Entry::Vacant(e) => {
441 let name = format!("#col_{}", column.id.index);
442 self.attr_names.insert(name.clone(), column.name.clone());
443 e.insert(name)
444 }
445 Entry::Occupied(e) => e.into_mut(),
446 }
447 }
448
449 fn value(&mut self, val: &stmt::Value) -> String {
450 self.ddb_value(Value::from(val.clone()).to_ddb())
451 }
452
453 fn ddb_value(&mut self, val: AttributeValue) -> String {
454 let i = self.attr_values.len();
455 let name = format!(":v_{i}");
456 self.attr_values.insert(name.clone(), val);
457 name
458 }
459}