toasty_core/stmt/
value_stream.rs1use crate::stmt::Type;
2
3use super::Value;
4
5use std::{
6 collections::VecDeque,
7 fmt, mem,
8 panic::Location,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tokio_stream::{Stream, StreamExt};
13
14#[derive(Default)]
35pub struct ValueStream {
36 buffer: Buffer,
37 stream: Option<DynStream>,
38
39 ty: Option<(Type, &'static Location<'static>)>,
41}
42
43#[derive(Debug)]
44struct Iter<I> {
45 iter: I,
46}
47
48#[derive(Clone, Default)]
49enum Buffer {
50 #[default]
51 Empty,
52 One(Value),
53 Many(VecDeque<Value>),
54}
55
56type DynStream = Pin<Box<dyn Stream<Item = crate::Result<Value>> + Send + 'static>>;
57
58impl ValueStream {
59 pub fn from_value(value: impl Into<Value>) -> Self {
61 Self {
62 buffer: Buffer::One(value.into()),
63 stream: None,
64 ty: None,
65 }
66 }
67
68 pub fn from_stream<T: Stream<Item = crate::Result<Value>> + Send + 'static>(stream: T) -> Self {
70 Self {
71 buffer: Buffer::Empty,
72 stream: Some(Box::pin(stream)),
73 ty: None,
74 }
75 }
76
77 pub fn from_vec(records: Vec<Value>) -> Self {
79 Self {
80 buffer: Buffer::Many(records.into()),
81 stream: None,
82 ty: None,
83 }
84 }
85
86 #[allow(clippy::should_implement_trait)]
88 pub fn from_iter<T, I>(iter: I) -> Self
89 where
90 T: Into<Value>,
91 I: Iterator<Item = crate::Result<T>> + Send + 'static,
92 {
93 Self::from_stream(Iter { iter })
94 }
95
96 pub async fn next(&mut self) -> Option<crate::Result<Value>> {
98 StreamExt::next(self).await
99 }
100
101 pub async fn peek(&mut self) -> Option<crate::Result<&Value>> {
103 if self.buffer.is_empty() {
104 match self.next().await {
105 Some(Ok(value)) => self.buffer.push(value),
106 Some(Err(e)) => return Some(Err(e)),
107 None => return None,
108 }
109 }
110
111 self.buffer.first().map(Ok)
112 }
113
114 pub async fn tap(&mut self) -> crate::Result<()> {
117 if let Some(Err(e)) = self.peek().await {
118 Err(e)
119 } else {
120 Ok(())
121 }
122 }
123
124 pub fn min_len(&self) -> usize {
129 let (ret, _) = self.size_hint();
130 ret
131 }
132
133 pub async fn collect(mut self) -> crate::Result<Vec<Value>> {
135 let mut ret = Vec::with_capacity(self.min_len());
136
137 while let Some(res) = self.next().await {
138 ret.push(res?);
139 }
140
141 Ok(ret)
142 }
143
144 pub async fn dup(&mut self) -> crate::Result<Self> {
149 self.buffer().await?;
150
151 Ok(Self {
152 buffer: self.buffer.clone(),
153 stream: None,
154 ty: self.ty.clone(),
155 })
156 }
157
158 pub fn try_clone(&self) -> Option<Self> {
161 if self.stream.is_some() {
162 return None;
163 }
164
165 Some(Self {
166 buffer: self.buffer.clone(),
167 stream: None,
168 ty: self.ty.clone(),
169 })
170 }
171
172 pub async fn buffer(&mut self) -> crate::Result<()> {
177 if let Some(stream) = &mut self.stream {
178 while let Some(res) = stream.next().await {
179 let value = res?;
180
181 if let Some((ty, location)) = &self.ty {
182 assert!(
183 value.is_a(ty),
184 "expected `{ty:?}`; was={value:#?}; origin={location}"
185 );
186 }
187
188 self.buffer.push(value);
189 }
190 }
191
192 Ok(())
193 }
194
195 pub fn is_buffered(&self) -> bool {
197 self.stream.is_none()
198 }
199
200 pub fn buffered_to_vec(&self) -> Vec<Value> {
203 match &self.buffer {
204 Buffer::Empty => Vec::new(),
205 Buffer::One(value) => vec![value.clone()],
206 Buffer::Many(values) => values.iter().cloned().collect(),
207 }
208 }
209
210 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Value> {
218 assert!(self.stream.is_none());
219
220 match &mut self.buffer {
222 Buffer::Empty => Box::new(None.into_iter()),
223 Buffer::One(v) => Box::new(Some(v).into_iter()),
224 Buffer::Many(v) => Box::new(v.iter_mut()) as Box<dyn Iterator<Item = &mut Value>>,
225 }
226 }
227
228 #[track_caller]
239 pub fn typed(mut self, ty: Type) -> ValueStream {
240 let location = Location::caller();
241
242 match &self.ty {
243 Some((prev, _)) => assert_eq!(*prev, ty),
244 None => {
245 match &self.buffer {
246 Buffer::One(value) => assert!(
247 value.is_a(&ty),
248 "expected `{ty:?}`; was={value:#?}; origin={location}"
249 ),
250 Buffer::Many(values) => {
251 for value in values {
252 assert!(
253 value.is_a(&ty),
254 "expected `{ty:?}`; was={value:#?}; origin={location}"
255 );
256 }
257 }
258 _ => {}
259 }
260
261 self.ty = Some((ty, location));
262 }
263 }
264
265 self
266 }
267}
268
269impl Stream for ValueStream {
270 type Item = crate::Result<Value>;
271
272 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
273 if let Some(next) = self.buffer.next() {
274 Poll::Ready(Some(Ok(next)))
275 } else if let Some(stream) = self.stream.as_mut() {
276 let next = Pin::new(stream).poll_next(cx);
277 if let Poll::Ready(Some(Ok(value))) = &next
278 && let Some((ty, location)) = &self.ty
279 {
280 assert!(
281 value.is_a(ty),
282 "expected `{ty:?}`; was={value:#?}; origin={location}"
283 );
284 }
285 next
286 } else {
287 Poll::Ready(None)
288 }
289 }
290
291 fn size_hint(&self) -> (usize, Option<usize>) {
292 let (mut low, mut high) = match &self.stream {
293 Some(stream) => stream.size_hint(),
294 None => (0, Some(0)),
295 };
296
297 let buffered = self.buffer.len();
298
299 low += buffered;
300
301 if let Some(high) = high.as_mut() {
302 *high += buffered;
303 }
304
305 (low, high)
306 }
307}
308
309impl From<Value> for ValueStream {
310 fn from(src: Value) -> Self {
311 Self {
312 buffer: Buffer::One(src),
313 stream: None,
314 ty: None,
315 }
316 }
317}
318
319impl From<Vec<Value>> for ValueStream {
320 fn from(value: Vec<Value>) -> Self {
321 Self::from_vec(value)
322 }
323}
324
325impl<I> Unpin for Iter<I> {}
326
327impl<T, I> Stream for Iter<I>
328where
329 I: Iterator<Item = crate::Result<T>>,
330 T: Into<Value>,
331{
332 type Item = crate::Result<Value>;
333
334 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
335 Poll::Ready(self.iter.next().map(|res| res.map(|item| item.into())))
336 }
337
338 fn size_hint(&self) -> (usize, Option<usize>) {
339 self.iter.size_hint()
340 }
341}
342
343impl fmt::Debug for ValueStream {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 f.debug_struct("RecordStream").finish()
346 }
347}
348
349impl Buffer {
350 fn is_empty(&self) -> bool {
351 self.len() == 0
352 }
353
354 fn len(&self) -> usize {
355 match self {
356 Self::Empty => 0,
357 Self::One(_) => 1,
358 Self::Many(v) => v.len(),
359 }
360 }
361
362 fn first(&self) -> Option<&Value> {
363 match self {
364 Self::Empty => None,
365 Self::One(value) => Some(value),
366 Self::Many(values) => values.front(),
367 }
368 }
369
370 fn next(&mut self) -> Option<Value> {
371 match self {
372 Self::Empty => None,
373 Self::One(_) => {
374 let Self::One(value) = mem::take(self) else {
375 panic!()
376 };
377 Some(value)
378 }
379 Self::Many(values) => values.pop_front(),
380 }
381 }
382
383 fn push(&mut self, value: Value) {
384 match self {
385 Self::Empty => {
386 *self = Self::One(value);
387 }
388 Self::One(_) => {
389 let Self::One(first) = mem::replace(self, Self::Many(VecDeque::with_capacity(2)))
390 else {
391 panic!()
392 };
393
394 let Self::Many(values) = self else { panic!() };
395
396 values.push_back(first);
397 values.push_back(value);
398 }
399 Self::Many(values) => {
400 values.push_back(value);
401 }
402 }
403 }
404}