1use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36 ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37 ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42#[sealed::sealed]
44pub trait Ordering:
45 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47 const ORDERING_KIND: StreamOrder;
49}
50
51pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84 type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90 type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95 type Min = NoOrder;
96}
97
98#[sealed::sealed]
100pub trait Retries:
101 MinRetries<Self, Min = Self>
102 + MinRetries<ExactlyOnce, Min = Self>
103 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105 const RETRIES_KIND: StreamRetry;
107}
108
109pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144 type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149 type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155 label = "required here",
156 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168 label = "required here",
169 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178pub struct Stream<
198 Type,
199 Loc,
200 Bound: Boundedness = Unbounded,
201 Order: Ordering = TotalOrder,
202 Retry: Retries = ExactlyOnce,
203> {
204 pub(crate) location: Loc,
205 pub(crate) ir_node: RefCell<HydroNode>,
206 pub(crate) flow_state: FlowState,
207
208 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212 fn drop(&mut self) {
213 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216 input: Box::new(ir_node),
217 op_metadata: HydroIrOpMetadata::new(),
218 });
219 }
220 }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224 for Stream<T, L, Unbounded, O, R>
225where
226 L: Location<'a>,
227{
228 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229 let new_meta = stream
230 .location
231 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233 Stream {
234 location: stream.location.clone(),
235 flow_state: stream.flow_state.clone(),
236 ir_node: RefCell::new(HydroNode::Cast {
237 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238 metadata: new_meta,
239 }),
240 _phantom: PhantomData,
241 }
242 }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246 for Stream<T, L, B, NoOrder, R>
247where
248 L: Location<'a>,
249{
250 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251 stream.weaken_ordering()
252 }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256 for Stream<T, L, B, O, AtLeastOnce>
257where
258 L: Location<'a>,
259{
260 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261 stream.weaken_retries()
262 }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267 L: Location<'a>,
268{
269 fn defer_tick(self) -> Self {
270 Stream::defer_tick(self)
271 }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275 for Stream<T, Tick<L>, Bounded, O, R>
276where
277 L: Location<'a>,
278{
279 type Location = Tick<L>;
280
281 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282 Stream::new(
283 location.clone(),
284 HydroNode::CycleSource {
285 cycle_id,
286 metadata: location.new_node_metadata(Self::collection_kind()),
287 },
288 )
289 }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293 for Stream<T, Tick<L>, Bounded, O, R>
294where
295 L: Location<'a>,
296{
297 type Location = Tick<L>;
298
299 fn location(&self) -> &Self::Location {
300 self.location()
301 }
302
303 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305 location.clone(),
306 HydroNode::DeferTick {
307 input: Box::new(HydroNode::CycleSource {
308 cycle_id,
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 }),
311 metadata: location.new_node_metadata(Self::collection_kind()),
312 },
313 );
314
315 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316 }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320 for Stream<T, Tick<L>, Bounded, O, R>
321where
322 L: Location<'a>,
323{
324 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325 assert_eq!(
326 Location::id(&self.location),
327 expected_location,
328 "locations do not match"
329 );
330 self.location
331 .flow_state()
332 .borrow_mut()
333 .push_root(HydroRoot::CycleSink {
334 cycle_id,
335 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336 op_metadata: HydroIrOpMetadata::new(),
337 });
338 }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342 for Stream<T, L, B, O, R>
343where
344 L: Location<'a>,
345{
346 type Location = L;
347
348 fn create_source(cycle_id: CycleId, location: L) -> Self {
349 Stream::new(
350 location.clone(),
351 HydroNode::CycleSource {
352 cycle_id,
353 metadata: location.new_node_metadata(Self::collection_kind()),
354 },
355 )
356 }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360 for Stream<T, L, B, O, R>
361where
362 L: Location<'a>,
363{
364 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365 assert_eq!(
366 Location::id(&self.location),
367 expected_location,
368 "locations do not match"
369 );
370 self.location
371 .flow_state()
372 .borrow_mut()
373 .push_root(HydroRoot::CycleSink {
374 cycle_id,
375 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376 op_metadata: HydroIrOpMetadata::new(),
377 });
378 }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383 T: Clone,
384 L: Location<'a>,
385{
386 fn clone(&self) -> Self {
387 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389 *self.ir_node.borrow_mut() = HydroNode::Tee {
390 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391 metadata: self.location.new_node_metadata(Self::collection_kind()),
392 };
393 }
394
395 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396 unreachable!()
397 };
398 Stream {
399 location: self.location.clone(),
400 flow_state: self.flow_state.clone(),
401 ir_node: HydroNode::Tee {
402 inner: SharedNode(inner.0.clone()),
403 metadata: metadata.clone(),
404 }
405 .into(),
406 _phantom: PhantomData,
407 }
408 }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413 L: Location<'a>,
414{
415 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419 let flow_state = location.flow_state().clone();
420 Stream {
421 location,
422 flow_state,
423 ir_node: RefCell::new(ir_node),
424 _phantom: PhantomData,
425 }
426 }
427
428 pub fn location(&self) -> &L {
430 &self.location
431 }
432
433 pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438 where
439 B: IsBounded,
440 {
441 crate::handoff_ref::StreamRef::new(&self.ir_node)
442 }
443
444 pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447 where
448 B: IsBounded,
449 {
450 crate::handoff_ref::StreamMut::new(&self.ir_node)
451 }
452
453 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456 where
457 L: Location<'a>,
458 {
459 if L::consistency()
460 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461 {
462 Stream::new(
464 self.location.drop_consistency(),
465 self.ir_node.replace(HydroNode::Placeholder),
466 )
467 } else {
468 Stream::new(
469 self.location.drop_consistency(),
470 HydroNode::Cast {
471 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473 T,
474 L::DropConsistency,
475 B,
476 O,
477 R,
478 >::collection_kind(
479 )),
480 },
481 )
482 }
483 }
484
485 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489 self,
490 _proof: impl crate::properties::ConsistencyProof,
491 ) -> Stream<T, L2, B, O, R>
492 where
493 L: Location<'a>,
494 {
495 if L::consistency() == L2::consistency() {
496 Stream::new(
497 self.location.with_consistency_of(),
498 self.ir_node.replace(HydroNode::Placeholder),
499 )
500 } else {
501 Stream::new(
502 self.location.with_consistency_of(),
503 HydroNode::AssertIsConsistent {
504 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505 trusted: false,
506 metadata: self
507 .location
508 .clone()
509 .with_consistency_of::<L2>()
510 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511 },
512 )
513 }
514 }
515
516 pub(crate) fn assert_has_consistency_of_trusted<
517 L2: Location<'a, DropConsistency = L::DropConsistency>,
518 >(
519 self,
520 _proof: impl crate::properties::ConsistencyProof,
521 ) -> Stream<T, L2, B, O, R>
522 where
523 L: Location<'a>,
524 {
525 if L::consistency() == L2::consistency() {
526 Stream::new(
527 self.location.with_consistency_of(),
528 self.ir_node.replace(HydroNode::Placeholder),
529 )
530 } else {
531 Stream::new(
532 self.location.with_consistency_of(),
533 HydroNode::AssertIsConsistent {
534 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535 trusted: true,
536 metadata: self
537 .location
538 .clone()
539 .with_consistency_of::<L2>()
540 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541 },
542 )
543 }
544 }
545
546 pub(crate) fn collection_kind() -> CollectionKind {
547 CollectionKind::Stream {
548 bound: B::BOUND_KIND,
549 order: O::ORDERING_KIND,
550 retry: R::RETRIES_KIND,
551 element_type: quote_type::<T>().into(),
552 }
553 }
554
555 pub fn map<U, F, C, I, const WAS_MUT: bool>(
575 self,
576 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577 ) -> Stream<U, L, B, O, R>
578 where
579 F: FnMut(T) -> U + 'a,
580 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582 {
583 let f = crate::handoff_ref::with_ref_capture(|| {
584 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585 proof.register_proof(&expr);
586 expr.into()
587 });
588 Stream::new(
589 self.location.clone(),
590 HydroNode::Map {
591 f,
592 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593 metadata: self
594 .location
595 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596 },
597 )
598 }
599
600 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625 self,
626 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627 ) -> Stream<U, L, B, O, R>
628 where
629 I: IntoIterator<Item = U>,
630 F: FnMut(T) -> I + 'a,
631 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633 {
634 let f = crate::handoff_ref::with_ref_capture(|| {
635 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636 proof.register_proof(&expr);
637 expr.into()
638 });
639 Stream::new(
640 self.location.clone(),
641 HydroNode::FlatMap {
642 f,
643 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644 metadata: self
645 .location
646 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647 },
648 )
649 }
650
651 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678 self,
679 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680 ) -> Stream<U, L, B, NoOrder, R>
681 where
682 I: IntoIterator<Item = U>,
683 F: FnMut(T) -> I + 'a,
684 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686 {
687 let f = crate::handoff_ref::with_ref_capture(|| {
688 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689 proof.register_proof(&expr);
690 expr.into()
691 });
692 Stream::new(
693 self.location.clone(),
694 HydroNode::FlatMap {
695 f,
696 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697 metadata: self
698 .location
699 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700 },
701 )
702 }
703
704 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727 where
728 T: IntoIterator<Item = U>,
729 {
730 self.flat_map_ordered(q!(|d| d))
731 }
732
733 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760 where
761 T: IntoIterator<Item = U>,
762 {
763 self.flat_map_unordered(q!(|d| d))
764 }
765
766 pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770 self,
771 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772 ) -> Stream<U, L, B, O, R>
773 where
774 S: futures::Stream<Item = U>,
775 F: FnMut(T) -> S + 'a,
776 C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777 Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778 {
779 let f = crate::handoff_ref::with_ref_capture(|| {
780 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781 proof.register_proof(&expr);
782 expr.into()
783 });
784 Stream::new(
785 self.location.clone(),
786 HydroNode::FlatMapStreamBlocking {
787 f,
788 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789 metadata: self
790 .location
791 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792 },
793 )
794 }
795
796 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800 where
801 T: futures::Stream<Item = U>,
802 {
803 self.flat_map_stream_blocking(q!(|d| d))
804 }
805
806 pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831 self,
832 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833 ) -> Self
834 where
835 F: FnMut(&T) -> bool + 'a,
836 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838 {
839 let f = crate::handoff_ref::with_ref_capture(|| {
840 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841 proof.register_proof(&expr);
842 expr.into()
843 });
844 Stream::new(
845 self.location.clone(),
846 HydroNode::Filter {
847 f,
848 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849 metadata: self.location.new_node_metadata(Self::collection_kind()),
850 },
851 )
852 }
853
854 pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
889 self,
890 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
891 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
892 where
893 F: FnMut(&T) -> bool + 'a,
894 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
895 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
896 {
897 let f = crate::handoff_ref::with_ref_capture(|| {
898 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
899 proof.register_proof(&expr);
900 expr.into()
901 });
902 let shared = SharedNode(Rc::new(RefCell::new(
903 self.ir_node.replace(HydroNode::Placeholder),
904 )));
905
906 let true_stream = Stream::new(
907 self.location.clone(),
908 HydroNode::Partition {
909 inner: SharedNode(shared.0.clone()),
910 f: f.clone(),
911 is_true: true,
912 metadata: self.location.new_node_metadata(Self::collection_kind()),
913 },
914 );
915
916 let false_stream = Stream::new(
917 self.location.clone(),
918 HydroNode::Partition {
919 inner: SharedNode(shared.0),
920 f,
921 is_true: false,
922 metadata: self.location.new_node_metadata(Self::collection_kind()),
923 },
924 );
925
926 (true_stream, false_stream)
927 }
928
929 pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
949 self,
950 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
951 ) -> Stream<U, L, B, O, R>
952 where
953 F: FnMut(T) -> Option<U> + 'a,
954 C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
955 Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
956 {
957 let f = crate::handoff_ref::with_ref_capture(|| {
958 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
959 proof.register_proof(&expr);
960 expr.into()
961 });
962 Stream::new(
963 self.location.clone(),
964 HydroNode::FilterMap {
965 f,
966 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
967 metadata: self
968 .location
969 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
970 },
971 )
972 }
973
974 pub fn cross_singleton<O2>(
999 self,
1000 other: impl Into<Optional<O2, L, Bounded>>,
1001 ) -> Stream<(T, O2), L, B, O, R>
1002 where
1003 O2: Clone,
1004 {
1005 let other: Optional<O2, L, Bounded> = other.into();
1006 check_matching_location(&self.location, &other.location);
1007
1008 Stream::new(
1009 self.location.clone(),
1010 HydroNode::CrossSingleton {
1011 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1012 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1013 metadata: self
1014 .location
1015 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1016 },
1017 )
1018 }
1019
1020 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1052 self.cross_singleton(signal.filter(q!(|b| *b)))
1053 .map(q!(|(d, _)| d))
1054 }
1055
1056 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1091 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1092 self.filter_if(signal.is_some())
1093 }
1094
1095 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1130 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1131 self.filter_if(other.is_none())
1132 }
1133
1134 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1159 self,
1160 other: Stream<T2, L, B2, O2, R2>,
1161 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1162 where
1163 T: Clone,
1164 T2: Clone,
1165 R: MinRetries<R2>,
1166 {
1167 self.map(q!(|v| ((), v)))
1168 .join(other.map(q!(|v| ((), v))))
1169 .map(q!(|((), (v1, v2))| (v1, v2)))
1170 }
1171
1172 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1191 where
1192 T: Eq + Hash,
1193 {
1194 Stream::new(
1195 self.location.clone(),
1196 HydroNode::Unique {
1197 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1198 metadata: self
1199 .location
1200 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1201 },
1202 )
1203 }
1204
1205 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1231 where
1232 T: Eq + Hash,
1233 B2: IsBounded,
1234 {
1235 check_matching_location(&self.location, &other.location);
1236
1237 Stream::new(
1238 self.location.clone(),
1239 HydroNode::Difference {
1240 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1241 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1242 metadata: self
1243 .location
1244 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1245 },
1246 )
1247 }
1248
1249 pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1270 self,
1271 f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1272 ) -> Self
1273 where
1274 F: FnMut(&T) + 'a,
1275 C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1276 Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1277 {
1278 let f = crate::handoff_ref::with_ref_capture(|| {
1279 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1280 proof.register_proof(&expr);
1281 expr.into()
1282 });
1283
1284 Stream::new(
1285 self.location.clone(),
1286 HydroNode::Inspect {
1287 f,
1288 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1289 metadata: self.location.new_node_metadata(Self::collection_kind()),
1290 },
1291 )
1292 }
1293
1294 pub fn for_each<F: FnMut(T) + 'a, C, I>(
1310 self,
1311 f: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, I>>,
1312 ) where
1313 C: ValidCommutativityFor<O>,
1314 I: ValidIdempotenceFor<R>,
1315 {
1316 let f = crate::handoff_ref::with_ref_capture(|| {
1317 let (f, proof) = f.splice_fnmut1_ctx_props(&self.location);
1318 proof.register_proof(&f);
1319 f.into()
1320 });
1321 self.location
1322 .flow_state()
1323 .borrow_mut()
1324 .push_root(HydroRoot::ForEach {
1325 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1326 f,
1327 op_metadata: HydroIrOpMetadata::new(),
1328 });
1329 }
1330
1331 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1337 where
1338 O: IsOrdered,
1339 R: IsExactlyOnce,
1340 S: 'a + futures::Sink<T> + Unpin,
1341 {
1342 self.location
1343 .flow_state()
1344 .borrow_mut()
1345 .push_root(HydroRoot::DestSink {
1346 sink: sink.splice_typed_ctx(&self.location).into(),
1347 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1348 op_metadata: HydroIrOpMetadata::new(),
1349 });
1350 }
1351
1352 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1372 where
1373 O: IsOrdered,
1374 R: IsExactlyOnce,
1375 {
1376 Stream::new(
1377 self.location.clone(),
1378 HydroNode::Enumerate {
1379 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1380 metadata: self.location.new_node_metadata(Stream::<
1381 (usize, T),
1382 L,
1383 B,
1384 TotalOrder,
1385 ExactlyOnce,
1386 >::collection_kind()),
1387 },
1388 )
1389 }
1390
1391 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1415 self,
1416 init: impl IntoQuotedMut<'a, I, L>,
1417 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1418 ) -> Singleton<A, L, B2>
1419 where
1420 I: Fn() -> A + 'a,
1421 F: 'a + Fn(&mut A, T),
1422 C: ValidCommutativityFor<O>,
1423 Idemp: ValidIdempotenceFor<R>,
1424 B: ApplyMonotoneStream<M, B2>,
1425 {
1426 let init = init.splice_fn0_ctx(&self.location).into();
1427 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1428 proof.register_proof(&comb);
1429
1430 let nondet = nondet!();
1433 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1434
1435 let core = HydroNode::Fold {
1436 init,
1437 acc: comb.into(),
1438 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1439 metadata: retried
1440 .location
1441 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1442 };
1447
1448 Singleton::new(retried.location.clone(), core)
1449 .assert_has_consistency_of(manual_proof!())
1450 }
1451
1452 pub fn reduce<F, C, Idemp>(
1475 self,
1476 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1477 ) -> Optional<T, L, B>
1478 where
1479 F: Fn(&mut T, T) + 'a,
1480 C: ValidCommutativityFor<O>,
1481 Idemp: ValidIdempotenceFor<R>,
1482 {
1483 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1484 proof.register_proof(&f);
1485
1486 let nondet = nondet!();
1487 let ordered_etc: Stream<T, L::DropConsistency, B> =
1488 self.assume_retries(nondet).assume_ordering(nondet);
1489
1490 let core = HydroNode::Reduce {
1491 f: f.into(),
1492 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1493 metadata: ordered_etc
1494 .location
1495 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1496 };
1497
1498 Optional::new(ordered_etc.location.clone(), core)
1499 .assert_has_consistency_of(manual_proof!())
1500 }
1501
1502 pub fn max(self) -> Optional<T, L, B>
1522 where
1523 T: Ord,
1524 {
1525 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1526 .assume_ordering_trusted_bounded::<TotalOrder>(
1527 nondet!(),
1528 )
1529 .reduce(q!(|curr, new| {
1530 if new > *curr {
1531 *curr = new;
1532 }
1533 }))
1534 }
1535
1536 pub fn min(self) -> Optional<T, L, B>
1556 where
1557 T: Ord,
1558 {
1559 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1560 .assume_ordering_trusted_bounded::<TotalOrder>(
1561 nondet!(),
1562 )
1563 .reduce(q!(|curr, new| {
1564 if new < *curr {
1565 *curr = new;
1566 }
1567 }))
1568 }
1569
1570 pub fn first(self) -> Optional<T, L, B>
1593 where
1594 O: IsOrdered,
1595 {
1596 self.make_totally_ordered()
1597 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1598 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1599 .reduce(q!(|_, _| {}))
1600 }
1601
1602 pub fn last(self) -> Optional<T, L, B>
1625 where
1626 O: IsOrdered,
1627 {
1628 self.make_totally_ordered()
1629 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1630 .reduce(q!(|curr, new| *curr = new))
1631 }
1632
1633 pub fn limit(
1656 self,
1657 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1658 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1659 where
1660 O: IsOrdered,
1661 R: IsExactlyOnce,
1662 {
1663 self.generator(
1664 q!(|| 0usize),
1665 q!(move |count, item| {
1666 if *count == n {
1667 Generate::Break
1668 } else {
1669 *count += 1;
1670 if *count == n {
1671 Generate::Return(item)
1672 } else {
1673 Generate::Yield(item)
1674 }
1675 }
1676 }),
1677 )
1678 }
1679
1680 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1706 where
1707 O: IsOrdered,
1708 R: IsExactlyOnce,
1709 {
1710 self.make_totally_ordered().make_exactly_once().fold(
1711 q!(|| vec![]),
1712 q!(|acc, v| {
1713 acc.push(v);
1714 }),
1715 )
1716 }
1717
1718 pub fn scan<A, U, I, F>(
1779 self,
1780 init: impl IntoQuotedMut<'a, I, L>,
1781 f: impl IntoQuotedMut<'a, F, L>,
1782 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1783 where
1784 O: IsOrdered,
1785 R: IsExactlyOnce,
1786 I: Fn() -> A + 'a,
1787 F: Fn(&mut A, T) -> Option<U> + 'a,
1788 {
1789 let init = init.splice_fn0_ctx(&self.location).into();
1790 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1791
1792 Stream::new(
1793 self.location.clone(),
1794 HydroNode::Scan {
1795 init,
1796 acc: f,
1797 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1798 metadata: self.location.new_node_metadata(
1799 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1800 ),
1801 },
1802 )
1803 }
1804
1805 pub fn scan_async_blocking<A, U, I, F, Fut>(
1839 self,
1840 init: impl IntoQuotedMut<'a, I, L>,
1841 f: impl IntoQuotedMut<'a, F, L>,
1842 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1843 where
1844 O: IsOrdered,
1845 R: IsExactlyOnce,
1846 I: Fn() -> A + 'a,
1847 F: Fn(&mut A, T) -> Fut + 'a,
1848 Fut: Future<Output = Option<U>> + 'a,
1849 {
1850 let init = init.splice_fn0_ctx(&self.location).into();
1851 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1852
1853 Stream::new(
1854 self.location.clone(),
1855 HydroNode::ScanAsyncBlocking {
1856 init,
1857 acc: f,
1858 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1859 metadata: self.location.new_node_metadata(
1860 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1861 ),
1862 },
1863 )
1864 }
1865
1866 pub fn generator<A, U, I, F>(
1906 self,
1907 init: impl IntoQuotedMut<'a, I, L> + Copy,
1908 f: impl IntoQuotedMut<'a, F, L> + Copy,
1909 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1910 where
1911 O: IsOrdered,
1912 R: IsExactlyOnce,
1913 I: Fn() -> A + 'a,
1914 F: Fn(&mut A, T) -> Generate<U> + 'a,
1915 {
1916 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1917 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1918
1919 let this = self.make_totally_ordered().make_exactly_once();
1920
1921 let scan_init = q!(|| None)
1926 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1927 .into();
1928 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1929 if state.is_none() {
1930 *state = Some(Some(init()));
1931 }
1932 match state {
1933 Some(Some(state_value)) => match f(state_value, v) {
1934 Generate::Yield(out) => Some(Some(out)),
1935 Generate::Return(out) => {
1936 *state = Some(None);
1937 Some(Some(out))
1938 }
1939 Generate::Break => None,
1943 Generate::Continue => Some(None),
1944 },
1945 _ => None,
1947 }
1948 })
1949 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1950 .into();
1951
1952 let scan_node = HydroNode::Scan {
1953 init: scan_init,
1954 acc: scan_f,
1955 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1956 metadata: this.location.new_node_metadata(Stream::<
1957 Option<U>,
1958 L,
1959 B,
1960 TotalOrder,
1961 ExactlyOnce,
1962 >::collection_kind()),
1963 };
1964
1965 let flatten_f = q!(|d| d)
1966 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1967 .into();
1968 let flatten_node = HydroNode::FlatMap {
1969 f: flatten_f,
1970 input: Box::new(scan_node),
1971 metadata: this
1972 .location
1973 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1974 };
1975
1976 Stream::new(this.location.clone(), flatten_node)
1977 }
1978
1979 pub fn sample_every(
1988 self,
1989 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1990 nondet: NonDet,
1991 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1992 where
1993 L: TopLevel<'a>,
1994 {
1995 let samples = self.location.source_interval(interval);
1996
1997 let tick = self.location.tick();
1998 self.batch(&tick, nondet)
1999 .filter_if(samples.batch(&tick, nondet).first().is_some())
2000 .all_ticks()
2001 .weaken_retries()
2002 }
2003
2004 pub fn timeout(
2014 self,
2015 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2016 nondet: NonDet,
2017 ) -> Optional<(), L::DropConsistency, Unbounded>
2018 where
2019 L: TopLevel<'a>,
2020 {
2021 let tick = self.location.tick();
2022
2023 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2024 q!(|| None),
2025 q!(
2026 |latest, _| {
2027 *latest = Some(Instant::now());
2028 },
2029 commutative = manual_proof!()
2030 ),
2031 );
2032
2033 latest_received
2034 .snapshot(&tick, nondet)
2035 .filter_map(q!(move |latest_received| {
2036 if let Some(latest_received) = latest_received {
2037 if Instant::now().duration_since(latest_received) > duration {
2038 Some(())
2039 } else {
2040 None
2041 }
2042 } else {
2043 Some(())
2044 }
2045 }))
2046 .latest()
2047 }
2048
2049 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2055 let id = self.location.flow_state().borrow_mut().next_clock_id();
2056 let out_location = Atomic {
2057 tick: Tick {
2058 id,
2059 l: self.location.clone(),
2060 },
2061 };
2062 Stream::new(
2063 out_location.clone(),
2064 HydroNode::BeginAtomic {
2065 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2066 metadata: out_location
2067 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2068 },
2069 )
2070 }
2071
2072 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2080 self,
2081 tick: &Tick<L2>,
2082 _nondet: NonDet,
2083 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2084 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2085 Stream::new(
2086 tick.drop_consistency(),
2087 HydroNode::Batch {
2088 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2089 metadata: tick
2090 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2091 },
2092 )
2093 }
2094
2095 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2098 {
2099 let mut node = self.ir_node.borrow_mut();
2100 let metadata = node.metadata_mut();
2101 metadata.tag = Some(name.to_owned());
2102 }
2103 self
2104 }
2105
2106 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2110 where
2111 B: IsBounded,
2112 {
2113 Optional::new(
2114 self.location.clone(),
2115 HydroNode::Cast {
2116 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2117 metadata: self
2118 .location
2119 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2120 },
2121 )
2122 }
2123
2124 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2125 if O::ORDERING_KIND == O2::ORDERING_KIND {
2126 Stream::new(
2127 self.location.clone(),
2128 self.ir_node.replace(HydroNode::Placeholder),
2129 )
2130 } else {
2131 panic!(
2132 "Runtime ordering {:?} did not match requested cast {:?}.",
2133 O::ORDERING_KIND,
2134 O2::ORDERING_KIND
2135 )
2136 }
2137 }
2138
2139 pub fn assume_ordering<O2: Ordering>(
2148 self,
2149 _nondet: NonDet,
2150 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2151 if O::ORDERING_KIND == O2::ORDERING_KIND {
2152 self.use_ordering_type().weaken_consistency()
2153 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2154 let target_location = self.location().drop_consistency();
2156 Stream::new(
2157 target_location.clone(),
2158 HydroNode::Cast {
2159 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2160 metadata: target_location
2161 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2162 },
2163 )
2164 } else {
2165 let target_location = self.location().drop_consistency();
2166 Stream::new(
2167 target_location.clone(),
2168 HydroNode::ObserveNonDet {
2169 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2170 trusted: false,
2171 metadata: target_location
2172 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2173 },
2174 )
2175 }
2176 }
2177
2178 fn assume_ordering_trusted_bounded<O2: Ordering>(
2181 self,
2182 nondet: NonDet,
2183 ) -> Stream<T, L, B, O2, R> {
2184 if B::BOUNDED {
2185 self.assume_ordering_trusted(nondet)
2186 } else {
2187 let self_location = self.location.clone();
2188 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2189 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2190 }
2191 }
2192
2193 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2196 self,
2197 _nondet: NonDet,
2198 ) -> Stream<T, L, B, O2, R> {
2199 if O::ORDERING_KIND == O2::ORDERING_KIND {
2200 self.use_ordering_type()
2201 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2202 Stream::new(
2204 self.location.clone(),
2205 HydroNode::Cast {
2206 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2207 metadata: self
2208 .location
2209 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2210 },
2211 )
2212 } else {
2213 Stream::new(
2214 self.location.clone(),
2215 HydroNode::ObserveNonDet {
2216 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2217 trusted: true,
2218 metadata: self
2219 .location
2220 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2221 },
2222 )
2223 }
2224 }
2225
2226 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2227 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2230 self.weaken_ordering::<NoOrder>()
2231 }
2232
2233 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2236 let nondet = nondet!();
2237 self.assume_ordering_trusted::<O2>(nondet)
2238 }
2239
2240 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2243 where
2244 O: IsOrdered,
2245 {
2246 self.assume_ordering_trusted(nondet!())
2247 }
2248
2249 pub fn assume_retries<R2: Retries>(
2258 self,
2259 _nondet: NonDet,
2260 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2261 if R::RETRIES_KIND == R2::RETRIES_KIND {
2262 Stream::new(
2263 self.location.drop_consistency(),
2264 self.ir_node.replace(HydroNode::Placeholder),
2265 )
2266 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2267 let target_location = self.location.drop_consistency();
2269 Stream::new(
2270 target_location.clone(),
2271 HydroNode::Cast {
2272 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2273 metadata: target_location
2274 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2275 },
2276 )
2277 } else {
2278 let target_location = self.location.drop_consistency();
2279 Stream::new(
2280 target_location.clone(),
2281 HydroNode::ObserveNonDet {
2282 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2283 trusted: false,
2284 metadata: target_location
2285 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2286 },
2287 )
2288 }
2289 }
2290
2291 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2294 if R::RETRIES_KIND == R2::RETRIES_KIND {
2295 Stream::new(
2296 self.location.clone(),
2297 self.ir_node.replace(HydroNode::Placeholder),
2298 )
2299 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2300 Stream::new(
2302 self.location.clone(),
2303 HydroNode::Cast {
2304 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2305 metadata: self
2306 .location
2307 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2308 },
2309 )
2310 } else {
2311 Stream::new(
2312 self.location.clone(),
2313 HydroNode::ObserveNonDet {
2314 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2315 trusted: true,
2316 metadata: self
2317 .location
2318 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2319 },
2320 )
2321 }
2322 }
2323
2324 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2325 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2328 self.weaken_retries::<AtLeastOnce>()
2329 }
2330
2331 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2334 let nondet = nondet!();
2335 self.assume_retries_trusted::<R2>(nondet)
2336 }
2337
2338 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2341 where
2342 R: IsExactlyOnce,
2343 {
2344 self.assume_retries_trusted(nondet!())
2345 }
2346
2347 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2350 where
2351 B: IsBounded,
2352 {
2353 self.weaken_boundedness()
2354 }
2355
2356 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2359 if B::BOUNDED == B2::BOUNDED {
2360 Stream::new(
2361 self.location.clone(),
2362 self.ir_node.replace(HydroNode::Placeholder),
2363 )
2364 } else {
2365 Stream::new(
2367 self.location.clone(),
2368 HydroNode::Cast {
2369 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2370 metadata: self
2371 .location
2372 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2373 },
2374 )
2375 }
2376 }
2377}
2378
2379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2380where
2381 L: Location<'a>,
2382{
2383 pub fn cloned(self) -> Stream<T, L, B, O, R>
2401 where
2402 T: Clone,
2403 {
2404 self.map(q!(|d| d.clone()))
2405 }
2406}
2407
2408impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2409where
2410 L: Location<'a>,
2411{
2412 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2431 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2432 ))
2434 .fold(
2435 q!(|| 0usize),
2436 q!(
2437 |count, _| *count += 1,
2438 monotone = manual_proof!()
2439 ),
2440 )
2441 }
2442}
2443
2444impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2445 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2469 self,
2470 other: Stream<T, L, Unbounded, O2, R2>,
2471 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2472 where
2473 R: MinRetries<R2>,
2474 {
2475 Stream::new(
2476 self.location.clone(),
2477 HydroNode::Chain {
2478 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2479 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2480 metadata: self.location.new_node_metadata(Stream::<
2481 T,
2482 L,
2483 Unbounded,
2484 NoOrder,
2485 <R as MinRetries<R2>>::Min,
2486 >::collection_kind()),
2487 },
2488 )
2489 }
2490
2491 #[deprecated(note = "use `merge_unordered` instead")]
2493 pub fn interleave<O2: Ordering, R2: Retries>(
2494 self,
2495 other: Stream<T, L, Unbounded, O2, R2>,
2496 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2497 where
2498 R: MinRetries<R2>,
2499 {
2500 self.merge_unordered(other)
2501 }
2502}
2503
2504impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2505 pub fn merge_ordered<R2: Retries>(
2533 self,
2534 other: Stream<T, L, B, TotalOrder, R2>,
2535 _nondet: NonDet,
2536 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2537 where
2538 R: MinRetries<R2>,
2539 {
2540 let target_location = self.location().drop_consistency();
2541 Stream::new(
2542 target_location.clone(),
2543 HydroNode::MergeOrdered {
2544 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2545 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2546 metadata: target_location.new_node_metadata(Stream::<
2547 T,
2548 L::DropConsistency,
2549 B,
2550 TotalOrder,
2551 <R as MinRetries<R2>>::Min,
2552 >::collection_kind()),
2553 },
2554 )
2555 }
2556}
2557
2558impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2559where
2560 L: Location<'a>,
2561{
2562 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2588 where
2589 B: IsBounded,
2590 T: Ord,
2591 {
2592 let this = self.make_bounded();
2593 Stream::new(
2594 this.location.clone(),
2595 HydroNode::Sort {
2596 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2597 metadata: this
2598 .location
2599 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2600 },
2601 )
2602 }
2603
2604 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2632 self,
2633 other: Stream<T, L, B2, O2, R2>,
2634 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2635 where
2636 B: IsBounded,
2637 O: MinOrder<O2>,
2638 R: MinRetries<R2>,
2639 {
2640 check_matching_location(&self.location, &other.location);
2641
2642 Stream::new(
2643 self.location.clone(),
2644 HydroNode::Chain {
2645 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2646 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2647 metadata: self.location.new_node_metadata(Stream::<
2648 T,
2649 L,
2650 B2,
2651 <O as MinOrder<O2>>::Min,
2652 <R as MinRetries<R2>>::Min,
2653 >::collection_kind()),
2654 },
2655 )
2656 }
2657
2658 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2662 self,
2663 other: Stream<T2, L, Bounded, O2, R2>,
2664 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2665 where
2666 B: IsBounded,
2667 T: Clone,
2668 T2: Clone,
2669 R: MinRetries<R2>,
2670 {
2671 let this = self.make_bounded();
2672 check_matching_location(&this.location, &other.location);
2673
2674 Stream::new(
2675 this.location.clone(),
2676 HydroNode::CrossProduct {
2677 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2678 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2679 metadata: this.location.new_node_metadata(Stream::<
2680 (T, T2),
2681 L,
2682 Bounded,
2683 <O2 as MinOrder<O>>::Min,
2684 <R as MinRetries<R2>>::Min,
2685 >::collection_kind()),
2686 },
2687 )
2688 }
2689
2690 pub fn repeat_with_keys<K, V2>(
2728 self,
2729 keys: KeyedSingleton<K, V2, L, Bounded>,
2730 ) -> KeyedStream<K, T, L, Bounded, O, R>
2731 where
2732 B: IsBounded,
2733 K: Clone,
2734 T: Clone,
2735 {
2736 keys.keys()
2737 .assume_ordering_trusted::<TotalOrder>(
2738 nondet!(),
2739 )
2740 .cross_product_nested_loop(self.make_bounded())
2741 .into_keyed()
2742 }
2743
2744 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2781 where
2782 T: Future,
2783 {
2784 Stream::new(
2785 self.location.clone(),
2786 HydroNode::ResolveFuturesBlocking {
2787 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788 metadata: self
2789 .location
2790 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2791 },
2792 )
2793 }
2794
2795 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2815 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2816 where
2817 B: IsBounded,
2818 {
2819 self.make_bounded()
2820 .assume_ordering_trusted::<TotalOrder>(
2821 nondet!(),
2822 )
2823 .first()
2824 .is_none()
2825 }
2826}
2827
2828impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2829where
2830 L: Location<'a>,
2831{
2832 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2857 self,
2858 n: Stream<(K, V2), L, B2, O2, R2>,
2859 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2860 where
2861 K: Eq + Hash + Clone,
2862 R: MinRetries<R2>,
2863 V1: Clone,
2864 V2: Clone,
2865 {
2866 check_matching_location(&self.location, &n.location);
2867
2868 let ir_node = if B2::BOUNDED {
2869 HydroNode::JoinHalf {
2870 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2871 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2872 metadata: self.location.new_node_metadata(Stream::<
2873 (K, (V1, V2)),
2874 L,
2875 B,
2876 B2::PreserveOrderIfBounded<O>,
2877 <R as MinRetries<R2>>::Min,
2878 >::collection_kind()),
2879 }
2880 } else {
2881 HydroNode::Join {
2882 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2883 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2884 metadata: self.location.new_node_metadata(Stream::<
2885 (K, (V1, V2)),
2886 L,
2887 B,
2888 B2::PreserveOrderIfBounded<O>,
2889 <R as MinRetries<R2>>::Min,
2890 >::collection_kind()),
2891 }
2892 };
2893
2894 Stream::new(self.location.clone(), ir_node)
2895 }
2896
2897 pub fn anti_join<O2: Ordering, R2: Retries>(
2923 self,
2924 n: Stream<K, L, Bounded, O2, R2>,
2925 ) -> Stream<(K, V1), L, B, O, R>
2926 where
2927 K: Eq + Hash,
2928 {
2929 check_matching_location(&self.location, &n.location);
2930
2931 Stream::new(
2932 self.location.clone(),
2933 HydroNode::AntiJoin {
2934 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2935 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2936 metadata: self
2937 .location
2938 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2939 },
2940 )
2941 }
2942}
2943
2944impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2945 Stream<(K, V), L, B, O, R>
2946{
2947 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2974 KeyedStream::new(
2975 self.location.clone(),
2976 HydroNode::Cast {
2977 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978 metadata: self
2979 .location
2980 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2981 },
2982 )
2983 }
2984}
2985
2986impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2987where
2988 K: Eq + Hash,
2989 L: Location<'a>,
2990{
2991 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3010 self.into_keyed()
3011 .fold(
3012 q!(|| ()),
3013 q!(
3014 |_, _| {},
3015 commutative = manual_proof!(),
3016 idempotent = manual_proof!()
3017 ),
3018 )
3019 .keys()
3020 }
3021}
3022
3023impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3024where
3025 L: Location<'a>,
3026{
3027 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3034 self,
3035 tick: &Tick<L2>,
3036 _nondet: NonDet,
3037 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3038 Stream::new(
3039 tick.drop_consistency(),
3040 HydroNode::Batch {
3041 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3042 metadata: tick
3043 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3044 },
3045 )
3046 }
3047
3048 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3051 Stream::new(
3052 self.location.tick.l.clone(),
3053 HydroNode::EndAtomic {
3054 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3055 metadata: self
3056 .location
3057 .tick
3058 .l
3059 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3060 },
3061 )
3062 }
3063}
3064
3065impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3066where
3067 L: TopLevel<'a>,
3068 F: Future<Output = T>,
3069{
3070 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3101 Stream::new(
3102 self.location.clone(),
3103 HydroNode::ResolveFutures {
3104 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3105 metadata: self
3106 .location
3107 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3108 },
3109 )
3110 }
3111
3112 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3143 Stream::new(
3144 self.location.clone(),
3145 HydroNode::ResolveFuturesOrdered {
3146 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3147 metadata: self
3148 .location
3149 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3150 },
3151 )
3152 }
3153}
3154
3155impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3156where
3157 L: Location<'a>,
3158{
3159 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3162 Stream::new(
3163 self.location.outer().clone(),
3164 HydroNode::YieldConcat {
3165 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3166 metadata: self
3167 .location
3168 .outer()
3169 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3170 },
3171 )
3172 }
3173
3174 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3181 let out_location = Atomic {
3182 tick: self.location.clone(),
3183 };
3184
3185 Stream::new(
3186 out_location.clone(),
3187 HydroNode::YieldConcat {
3188 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3189 metadata: out_location
3190 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3191 },
3192 )
3193 }
3194
3195 pub fn across_ticks<Out: BatchAtomic<'a>>(
3231 self,
3232 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3233 ) -> Out::Batched {
3234 thunk(self.all_ticks_atomic()).batched_atomic()
3235 }
3236
3237 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3276 Stream::new(
3277 self.location.clone(),
3278 HydroNode::DeferTick {
3279 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3280 metadata: self
3281 .location
3282 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3283 },
3284 )
3285 }
3286}
3287
3288#[cfg(test)]
3289mod tests {
3290 #[cfg(feature = "deploy")]
3291 use futures::{SinkExt, StreamExt};
3292 #[cfg(feature = "deploy")]
3293 use hydro_deploy::Deployment;
3294 #[cfg(feature = "deploy")]
3295 use serde::{Deserialize, Serialize};
3296 #[cfg(any(feature = "deploy", feature = "sim"))]
3297 use stageleft::q;
3298
3299 #[cfg(any(feature = "deploy", feature = "sim"))]
3300 use crate::compile::builder::FlowBuilder;
3301 #[cfg(feature = "deploy")]
3302 use crate::live_collections::sliced::sliced;
3303 #[cfg(feature = "deploy")]
3304 use crate::live_collections::stream::ExactlyOnce;
3305 #[cfg(feature = "sim")]
3306 use crate::live_collections::stream::NoOrder;
3307 #[cfg(any(feature = "deploy", feature = "sim"))]
3308 use crate::live_collections::stream::TotalOrder;
3309 #[cfg(any(feature = "deploy", feature = "sim"))]
3310 use crate::location::Location;
3311 #[cfg(feature = "sim")]
3312 use crate::networking::TCP;
3313 #[cfg(any(feature = "deploy", feature = "sim"))]
3314 use crate::nondet::nondet;
3315
3316 mod backtrace_chained_ops;
3317
3318 #[cfg(feature = "deploy")]
3319 struct P1 {}
3320 #[cfg(feature = "deploy")]
3321 struct P2 {}
3322
3323 #[cfg(feature = "deploy")]
3324 #[derive(Serialize, Deserialize, Debug)]
3325 struct SendOverNetwork {
3326 n: u32,
3327 }
3328
3329 #[cfg(feature = "deploy")]
3330 #[tokio::test]
3331 async fn first_ten_distributed() {
3332 use crate::networking::TCP;
3333
3334 let mut deployment = Deployment::new();
3335
3336 let mut flow = FlowBuilder::new();
3337 let first_node = flow.process::<P1>();
3338 let second_node = flow.process::<P2>();
3339 let external = flow.external::<P2>();
3340
3341 let numbers = first_node.source_iter(q!(0..10));
3342 let out_port = numbers
3343 .map(q!(|n| SendOverNetwork { n }))
3344 .send(&second_node, TCP.fail_stop().bincode())
3345 .send_bincode_external(&external);
3346
3347 let nodes = flow
3348 .with_process(&first_node, deployment.Localhost())
3349 .with_process(&second_node, deployment.Localhost())
3350 .with_external(&external, deployment.Localhost())
3351 .deploy(&mut deployment);
3352
3353 deployment.deploy().await.unwrap();
3354
3355 let mut external_out = nodes.connect(out_port).await;
3356
3357 deployment.start().await.unwrap();
3358
3359 for i in 0..10 {
3360 assert_eq!(external_out.next().await.unwrap().n, i);
3361 }
3362 }
3363
3364 #[cfg(feature = "deploy")]
3365 #[tokio::test]
3366 async fn first_cardinality() {
3367 let mut deployment = Deployment::new();
3368
3369 let mut flow = FlowBuilder::new();
3370 let node = flow.process::<()>();
3371 let external = flow.external::<()>();
3372
3373 let node_tick = node.tick();
3374 let count = node_tick
3375 .singleton(q!([1, 2, 3]))
3376 .into_stream()
3377 .flatten_ordered()
3378 .first()
3379 .into_stream()
3380 .count()
3381 .all_ticks()
3382 .send_bincode_external(&external);
3383
3384 let nodes = flow
3385 .with_process(&node, deployment.Localhost())
3386 .with_external(&external, deployment.Localhost())
3387 .deploy(&mut deployment);
3388
3389 deployment.deploy().await.unwrap();
3390
3391 let mut external_out = nodes.connect(count).await;
3392
3393 deployment.start().await.unwrap();
3394
3395 assert_eq!(external_out.next().await.unwrap(), 1);
3396 }
3397
3398 #[cfg(feature = "deploy")]
3399 #[tokio::test]
3400 async fn unbounded_reduce_remembers_state() {
3401 let mut deployment = Deployment::new();
3402
3403 let mut flow = FlowBuilder::new();
3404 let node = flow.process::<()>();
3405 let external = flow.external::<()>();
3406
3407 let (input_port, input) = node.source_external_bincode(&external);
3408 let out = input
3409 .reduce(q!(|acc, v| *acc += v))
3410 .sample_eager(nondet!())
3411 .send_bincode_external(&external);
3412
3413 let nodes = flow
3414 .with_process(&node, deployment.Localhost())
3415 .with_external(&external, deployment.Localhost())
3416 .deploy(&mut deployment);
3417
3418 deployment.deploy().await.unwrap();
3419
3420 let mut external_in = nodes.connect(input_port).await;
3421 let mut external_out = nodes.connect(out).await;
3422
3423 deployment.start().await.unwrap();
3424
3425 external_in.send(1).await.unwrap();
3426 assert_eq!(external_out.next().await.unwrap(), 1);
3427
3428 external_in.send(2).await.unwrap();
3429 assert_eq!(external_out.next().await.unwrap(), 3);
3430 }
3431
3432 #[cfg(feature = "deploy")]
3433 #[tokio::test]
3434 async fn top_level_bounded_cross_singleton() {
3435 let mut deployment = Deployment::new();
3436
3437 let mut flow = FlowBuilder::new();
3438 let node = flow.process::<()>();
3439 let external = flow.external::<()>();
3440
3441 let (input_port, input) =
3442 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3443
3444 let out = input
3445 .cross_singleton(
3446 node.source_iter(q!(vec![1, 2, 3]))
3447 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3448 )
3449 .send_bincode_external(&external);
3450
3451 let nodes = flow
3452 .with_process(&node, deployment.Localhost())
3453 .with_external(&external, deployment.Localhost())
3454 .deploy(&mut deployment);
3455
3456 deployment.deploy().await.unwrap();
3457
3458 let mut external_in = nodes.connect(input_port).await;
3459 let mut external_out = nodes.connect(out).await;
3460
3461 deployment.start().await.unwrap();
3462
3463 external_in.send(1).await.unwrap();
3464 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3465
3466 external_in.send(2).await.unwrap();
3467 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3468 }
3469
3470 #[cfg(feature = "deploy")]
3471 #[tokio::test]
3472 async fn top_level_bounded_reduce_cardinality() {
3473 let mut deployment = Deployment::new();
3474
3475 let mut flow = FlowBuilder::new();
3476 let node = flow.process::<()>();
3477 let external = flow.external::<()>();
3478
3479 let (input_port, input) =
3480 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3481
3482 let out = sliced! {
3483 let input = use(input, nondet!());
3484 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!());
3485 input.cross_singleton(v.into_stream().count())
3486 }
3487 .send_bincode_external(&external);
3488
3489 let nodes = flow
3490 .with_process(&node, deployment.Localhost())
3491 .with_external(&external, deployment.Localhost())
3492 .deploy(&mut deployment);
3493
3494 deployment.deploy().await.unwrap();
3495
3496 let mut external_in = nodes.connect(input_port).await;
3497 let mut external_out = nodes.connect(out).await;
3498
3499 deployment.start().await.unwrap();
3500
3501 external_in.send(1).await.unwrap();
3502 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3503
3504 external_in.send(2).await.unwrap();
3505 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3506 }
3507
3508 #[cfg(feature = "deploy")]
3509 #[tokio::test]
3510 async fn top_level_bounded_into_singleton_cardinality() {
3511 let mut deployment = Deployment::new();
3512
3513 let mut flow = FlowBuilder::new();
3514 let node = flow.process::<()>();
3515 let external = flow.external::<()>();
3516
3517 let (input_port, input) =
3518 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3519
3520 let out = sliced! {
3521 let input = use(input, nondet!());
3522 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!());
3523 input.cross_singleton(v.into_stream().count())
3524 }
3525 .send_bincode_external(&external);
3526
3527 let nodes = flow
3528 .with_process(&node, deployment.Localhost())
3529 .with_external(&external, deployment.Localhost())
3530 .deploy(&mut deployment);
3531
3532 deployment.deploy().await.unwrap();
3533
3534 let mut external_in = nodes.connect(input_port).await;
3535 let mut external_out = nodes.connect(out).await;
3536
3537 deployment.start().await.unwrap();
3538
3539 external_in.send(1).await.unwrap();
3540 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3541
3542 external_in.send(2).await.unwrap();
3543 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3544 }
3545
3546 #[cfg(feature = "deploy")]
3547 #[tokio::test]
3548 async fn atomic_fold_replays_each_tick() {
3549 let mut deployment = Deployment::new();
3550
3551 let mut flow = FlowBuilder::new();
3552 let node = flow.process::<()>();
3553 let external = flow.external::<()>();
3554
3555 let (input_port, input) =
3556 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3557 let tick = node.tick();
3558
3559 let out = input
3560 .batch(&tick, nondet!())
3561 .cross_singleton(
3562 node.source_iter(q!(vec![1, 2, 3]))
3563 .atomic()
3564 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3565 .snapshot_atomic(&tick, nondet!()),
3566 )
3567 .all_ticks()
3568 .send_bincode_external(&external);
3569
3570 let nodes = flow
3571 .with_process(&node, deployment.Localhost())
3572 .with_external(&external, deployment.Localhost())
3573 .deploy(&mut deployment);
3574
3575 deployment.deploy().await.unwrap();
3576
3577 let mut external_in = nodes.connect(input_port).await;
3578 let mut external_out = nodes.connect(out).await;
3579
3580 deployment.start().await.unwrap();
3581
3582 external_in.send(1).await.unwrap();
3583 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3584
3585 external_in.send(2).await.unwrap();
3586 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3587 }
3588
3589 #[cfg(feature = "deploy")]
3590 #[tokio::test]
3591 async fn unbounded_scan_remembers_state() {
3592 let mut deployment = Deployment::new();
3593
3594 let mut flow = FlowBuilder::new();
3595 let node = flow.process::<()>();
3596 let external = flow.external::<()>();
3597
3598 let (input_port, input) = node.source_external_bincode(&external);
3599 let out = input
3600 .scan(
3601 q!(|| 0),
3602 q!(|acc, v| {
3603 *acc += v;
3604 Some(*acc)
3605 }),
3606 )
3607 .send_bincode_external(&external);
3608
3609 let nodes = flow
3610 .with_process(&node, deployment.Localhost())
3611 .with_external(&external, deployment.Localhost())
3612 .deploy(&mut deployment);
3613
3614 deployment.deploy().await.unwrap();
3615
3616 let mut external_in = nodes.connect(input_port).await;
3617 let mut external_out = nodes.connect(out).await;
3618
3619 deployment.start().await.unwrap();
3620
3621 external_in.send(1).await.unwrap();
3622 assert_eq!(external_out.next().await.unwrap(), 1);
3623
3624 external_in.send(2).await.unwrap();
3625 assert_eq!(external_out.next().await.unwrap(), 3);
3626 }
3627
3628 #[cfg(feature = "deploy")]
3629 #[tokio::test]
3630 async fn unbounded_enumerate_remembers_state() {
3631 let mut deployment = Deployment::new();
3632
3633 let mut flow = FlowBuilder::new();
3634 let node = flow.process::<()>();
3635 let external = flow.external::<()>();
3636
3637 let (input_port, input) = node.source_external_bincode(&external);
3638 let out = input.enumerate().send_bincode_external(&external);
3639
3640 let nodes = flow
3641 .with_process(&node, deployment.Localhost())
3642 .with_external(&external, deployment.Localhost())
3643 .deploy(&mut deployment);
3644
3645 deployment.deploy().await.unwrap();
3646
3647 let mut external_in = nodes.connect(input_port).await;
3648 let mut external_out = nodes.connect(out).await;
3649
3650 deployment.start().await.unwrap();
3651
3652 external_in.send(1).await.unwrap();
3653 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3654
3655 external_in.send(2).await.unwrap();
3656 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3657 }
3658
3659 #[cfg(feature = "deploy")]
3660 #[tokio::test]
3661 async fn unbounded_unique_remembers_state() {
3662 let mut deployment = Deployment::new();
3663
3664 let mut flow = FlowBuilder::new();
3665 let node = flow.process::<()>();
3666 let external = flow.external::<()>();
3667
3668 let (input_port, input) =
3669 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3670 let out = input.unique().send_bincode_external(&external);
3671
3672 let nodes = flow
3673 .with_process(&node, deployment.Localhost())
3674 .with_external(&external, deployment.Localhost())
3675 .deploy(&mut deployment);
3676
3677 deployment.deploy().await.unwrap();
3678
3679 let mut external_in = nodes.connect(input_port).await;
3680 let mut external_out = nodes.connect(out).await;
3681
3682 deployment.start().await.unwrap();
3683
3684 external_in.send(1).await.unwrap();
3685 assert_eq!(external_out.next().await.unwrap(), 1);
3686
3687 external_in.send(2).await.unwrap();
3688 assert_eq!(external_out.next().await.unwrap(), 2);
3689
3690 external_in.send(1).await.unwrap();
3691 external_in.send(3).await.unwrap();
3692 assert_eq!(external_out.next().await.unwrap(), 3);
3693 }
3694
3695 #[cfg(feature = "sim")]
3696 #[test]
3697 #[should_panic]
3698 fn sim_batch_nondet_size() {
3699 let mut flow = FlowBuilder::new();
3700 let node = flow.process::<()>();
3701
3702 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3703
3704 let tick = node.tick();
3705 let out_recv = input
3706 .batch(&tick, nondet!())
3707 .count()
3708 .all_ticks()
3709 .sim_output();
3710
3711 flow.sim().exhaustive(async || {
3712 in_send.send(());
3713 in_send.send(());
3714 in_send.send(());
3715
3716 assert_eq!(out_recv.next().await.unwrap(), 3); });
3718 }
3719
3720 #[cfg(feature = "sim")]
3721 #[test]
3722 fn sim_batch_preserves_order() {
3723 let mut flow = FlowBuilder::new();
3724 let node = flow.process::<()>();
3725
3726 let (in_send, input) = node.sim_input();
3727
3728 let tick = node.tick();
3729 let out_recv = input
3730 .batch(&tick, nondet!())
3731 .all_ticks()
3732 .sim_output();
3733
3734 flow.sim().exhaustive(async || {
3735 in_send.send(1);
3736 in_send.send(2);
3737 in_send.send(3);
3738
3739 out_recv.assert_yields_only([1, 2, 3]).await;
3740 });
3741 }
3742
3743 #[cfg(feature = "sim")]
3744 #[test]
3745 #[should_panic]
3746 fn sim_batch_unordered_shuffles() {
3747 let mut flow = FlowBuilder::new();
3748 let node = flow.process::<()>();
3749
3750 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3751
3752 let tick = node.tick();
3753 let batch = input.batch(&tick, nondet!());
3754 let out_recv = batch
3755 .clone()
3756 .min()
3757 .zip(batch.max())
3758 .all_ticks()
3759 .sim_output();
3760
3761 flow.sim().exhaustive(async || {
3762 in_send.send_many_unordered([1, 2, 3]);
3763
3764 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3765 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3766 }
3767 });
3768 }
3769
3770 #[cfg(feature = "sim")]
3771 #[test]
3772 fn sim_batch_unordered_shuffles_count() {
3773 let mut flow = FlowBuilder::new();
3774 let node = flow.process::<()>();
3775
3776 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3777
3778 let tick = node.tick();
3779 let batch = input.batch(&tick, nondet!());
3780 let out_recv = batch.all_ticks().sim_output();
3781
3782 let instance_count = flow.sim().exhaustive(async || {
3783 in_send.send_many_unordered([1, 2, 3, 4]);
3784 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3785 });
3786
3787 assert_eq!(
3788 instance_count,
3789 75 )
3791 }
3792
3793 #[cfg(feature = "sim")]
3794 #[test]
3795 #[should_panic]
3796 fn sim_observe_order_batched() {
3797 let mut flow = FlowBuilder::new();
3798 let node = flow.process::<()>();
3799
3800 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3801
3802 let tick = node.tick();
3803 let batch = input.batch(&tick, nondet!());
3804 let out_recv = batch
3805 .assume_ordering::<TotalOrder>(nondet!())
3806 .all_ticks()
3807 .sim_output();
3808
3809 flow.sim().exhaustive(async || {
3810 in_send.send_many_unordered([1, 2, 3, 4]);
3811 out_recv.assert_yields_only([1, 2, 3, 4]).await; });
3813 }
3814
3815 #[cfg(feature = "sim")]
3816 #[test]
3817 fn sim_observe_order_batched_count() {
3818 let mut flow = FlowBuilder::new();
3819 let node = flow.process::<()>();
3820
3821 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3822
3823 let tick = node.tick();
3824 let batch = input.batch(&tick, nondet!());
3825 let out_recv = batch
3826 .assume_ordering::<TotalOrder>(nondet!())
3827 .all_ticks()
3828 .sim_output();
3829
3830 let instance_count = flow.sim().exhaustive(async || {
3831 in_send.send_many_unordered([1, 2, 3, 4]);
3832 let _ = out_recv.collect::<Vec<_>>().await;
3833 });
3834
3835 assert_eq!(
3836 instance_count,
3837 192 )
3839 }
3840
3841 #[cfg(feature = "sim")]
3842 #[test]
3843 fn sim_unordered_count_instance_count() {
3844 let mut flow = FlowBuilder::new();
3845 let node = flow.process::<()>();
3846
3847 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3848
3849 let tick = node.tick();
3850 let out_recv = input
3851 .count()
3852 .snapshot(&tick, nondet!())
3853 .all_ticks()
3854 .sim_output();
3855
3856 let instance_count = flow.sim().exhaustive(async || {
3857 in_send.send_many_unordered([1, 2, 3, 4]);
3858 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3859 });
3860
3861 assert_eq!(
3862 instance_count,
3863 16 )
3865 }
3866
3867 #[cfg(feature = "sim")]
3868 #[test]
3869 fn sim_top_level_assume_ordering() {
3870 let mut flow = FlowBuilder::new();
3871 let node = flow.process::<()>();
3872
3873 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3874
3875 let out_recv = input
3876 .assume_ordering::<TotalOrder>(nondet!())
3877 .sim_output();
3878
3879 let instance_count = flow.sim().exhaustive(async || {
3880 in_send.send_many_unordered([1, 2, 3]);
3881 let mut out = out_recv.collect::<Vec<_>>().await;
3882 out.sort();
3883 assert_eq!(out, vec![1, 2, 3]);
3884 });
3885
3886 assert_eq!(instance_count, 6)
3887 }
3888
3889 #[cfg(feature = "sim")]
3890 #[test]
3891 fn sim_top_level_assume_ordering_cycle_back() {
3892 let mut flow = FlowBuilder::new();
3893 let node = flow.process::<()>();
3894 let node2 = flow.process::<()>();
3895
3896 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3897
3898 let (complete_cycle_back, cycle_back) =
3899 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3900 let ordered = input
3901 .merge_unordered(cycle_back)
3902 .assume_ordering::<TotalOrder>(nondet!());
3903 complete_cycle_back.complete(
3904 ordered
3905 .clone()
3906 .map(q!(|v| v + 1))
3907 .filter(q!(|v| v % 2 == 1))
3908 .send(&node2, TCP.fail_stop().bincode())
3909 .send(&node, TCP.fail_stop().bincode()),
3910 );
3911
3912 let out_recv = ordered.sim_output();
3913
3914 let mut saw = false;
3915 let instance_count = flow.sim().exhaustive(async || {
3916 in_send.send_many_unordered([0, 2]);
3917 let out = out_recv.collect::<Vec<_>>().await;
3918
3919 if out.starts_with(&[0, 1, 2]) {
3920 saw = true;
3921 }
3922 });
3923
3924 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3925 assert_eq!(instance_count, 6);
3926 }
3927
3928 #[cfg(feature = "sim")]
3929 #[test]
3930 fn sim_top_level_assume_ordering_cycle_back_tick() {
3931 let mut flow = FlowBuilder::new();
3932 let node = flow.process::<()>();
3933 let node2 = flow.process::<()>();
3934
3935 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3936
3937 let (complete_cycle_back, cycle_back) =
3938 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3939 let ordered = input
3940 .merge_unordered(cycle_back)
3941 .assume_ordering::<TotalOrder>(nondet!());
3942 complete_cycle_back.complete(
3943 ordered
3944 .clone()
3945 .batch(&node.tick(), nondet!())
3946 .all_ticks()
3947 .map(q!(|v| v + 1))
3948 .filter(q!(|v| v % 2 == 1))
3949 .send(&node2, TCP.fail_stop().bincode())
3950 .send(&node, TCP.fail_stop().bincode()),
3951 );
3952
3953 let out_recv = ordered.sim_output();
3954
3955 let mut saw = false;
3956 let instance_count = flow.sim().exhaustive(async || {
3957 in_send.send_many_unordered([0, 2]);
3958 let out = out_recv.collect::<Vec<_>>().await;
3959
3960 if out.starts_with(&[0, 1, 2]) {
3961 saw = true;
3962 }
3963 });
3964
3965 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3966 assert_eq!(instance_count, 58);
3967 }
3968
3969 #[cfg(feature = "sim")]
3970 #[test]
3971 fn sim_top_level_assume_ordering_multiple() {
3972 let mut flow = FlowBuilder::new();
3973 let node = flow.process::<()>();
3974 let node2 = flow.process::<()>();
3975
3976 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3977 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3978
3979 let (complete_cycle_back, cycle_back) =
3980 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3981 let input1_ordered = input
3982 .clone()
3983 .merge_unordered(cycle_back)
3984 .assume_ordering::<TotalOrder>(nondet!());
3985 let foo = input1_ordered
3986 .clone()
3987 .map(q!(|v| v + 3))
3988 .weaken_ordering::<NoOrder>()
3989 .merge_unordered(input2)
3990 .assume_ordering::<TotalOrder>(nondet!());
3991
3992 complete_cycle_back.complete(
3993 foo.filter(q!(|v| *v == 3))
3994 .send(&node2, TCP.fail_stop().bincode())
3995 .send(&node, TCP.fail_stop().bincode()),
3996 );
3997
3998 let out_recv = input1_ordered.sim_output();
3999
4000 let mut saw = false;
4001 let instance_count = flow.sim().exhaustive(async || {
4002 in_send.send_many_unordered([0, 1]);
4003 let out = out_recv.collect::<Vec<_>>().await;
4004
4005 if out.starts_with(&[0, 3, 1]) {
4006 saw = true;
4007 }
4008 });
4009
4010 assert!(saw, "did not see an instance with 0, 3, 1 in order");
4011 assert_eq!(instance_count, 24);
4012 }
4013
4014 #[cfg(feature = "sim")]
4015 #[test]
4016 fn sim_atomic_assume_ordering_cycle_back() {
4017 let mut flow = FlowBuilder::new();
4018 let node = flow.process::<()>();
4019 let node2 = flow.process::<()>();
4020
4021 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4022
4023 let (complete_cycle_back, cycle_back) =
4024 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4025 let ordered = input
4026 .merge_unordered(cycle_back)
4027 .atomic()
4028 .assume_ordering::<TotalOrder>(nondet!())
4029 .end_atomic();
4030 complete_cycle_back.complete(
4031 ordered
4032 .clone()
4033 .map(q!(|v| v + 1))
4034 .filter(q!(|v| v % 2 == 1))
4035 .send(&node2, TCP.fail_stop().bincode())
4036 .send(&node, TCP.fail_stop().bincode()),
4037 );
4038
4039 let out_recv = ordered.sim_output();
4040
4041 let instance_count = flow.sim().exhaustive(async || {
4042 in_send.send_many_unordered([0, 2]);
4043 let out = out_recv.collect::<Vec<_>>().await;
4044 assert_eq!(out.len(), 4);
4045 });
4046 assert_eq!(instance_count, 22);
4047 }
4048
4049 #[cfg(feature = "deploy")]
4050 #[tokio::test]
4051 async fn partition_evens_odds() {
4052 let mut deployment = Deployment::new();
4053
4054 let mut flow = FlowBuilder::new();
4055 let node = flow.process::<()>();
4056 let external = flow.external::<()>();
4057
4058 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4059 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4060 let evens_port = evens.send_bincode_external(&external);
4061 let odds_port = odds.send_bincode_external(&external);
4062
4063 let nodes = flow
4064 .with_process(&node, deployment.Localhost())
4065 .with_external(&external, deployment.Localhost())
4066 .deploy(&mut deployment);
4067
4068 deployment.deploy().await.unwrap();
4069
4070 let mut evens_out = nodes.connect(evens_port).await;
4071 let mut odds_out = nodes.connect(odds_port).await;
4072
4073 deployment.start().await.unwrap();
4074
4075 let mut even_results = Vec::new();
4076 for _ in 0..3 {
4077 even_results.push(evens_out.next().await.unwrap());
4078 }
4079 even_results.sort();
4080 assert_eq!(even_results, vec![2, 4, 6]);
4081
4082 let mut odd_results = Vec::new();
4083 for _ in 0..3 {
4084 odd_results.push(odds_out.next().await.unwrap());
4085 }
4086 odd_results.sort();
4087 assert_eq!(odd_results, vec![1, 3, 5]);
4088 }
4089
4090 #[cfg(feature = "deploy")]
4091 #[tokio::test]
4092 async fn unconsumed_inspect_still_runs() {
4093 use crate::deploy::DeployCrateWrapper;
4094
4095 let mut deployment = Deployment::new();
4096
4097 let mut flow = FlowBuilder::new();
4098 let node = flow.process::<()>();
4099
4100 node.source_iter(q!(0..5))
4103 .inspect(q!(|x| println!("inspect: {}", x)));
4104
4105 let nodes = flow
4106 .with_process(&node, deployment.Localhost())
4107 .deploy(&mut deployment);
4108
4109 deployment.deploy().await.unwrap();
4110
4111 let mut stdout = nodes.get_process(&node).stdout();
4112
4113 deployment.start().await.unwrap();
4114
4115 let mut lines = Vec::new();
4116 for _ in 0..5 {
4117 lines.push(stdout.recv().await.unwrap());
4118 }
4119 lines.sort();
4120 assert_eq!(
4121 lines,
4122 vec![
4123 "inspect: 0",
4124 "inspect: 1",
4125 "inspect: 2",
4126 "inspect: 3",
4127 "inspect: 4",
4128 ]
4129 );
4130 }
4131
4132 #[cfg(feature = "sim")]
4133 #[test]
4134 fn sim_limit() {
4135 let mut flow = FlowBuilder::new();
4136 let node = flow.process::<()>();
4137
4138 let (in_send, input) = node.sim_input();
4139
4140 let out_recv = input.limit(q!(3)).sim_output();
4141
4142 flow.sim().exhaustive(async || {
4143 in_send.send(1);
4144 in_send.send(2);
4145 in_send.send(3);
4146 in_send.send(4);
4147 in_send.send(5);
4148
4149 out_recv.assert_yields_only([1, 2, 3]).await;
4150 });
4151 }
4152
4153 #[cfg(feature = "sim")]
4154 #[test]
4155 fn sim_limit_zero() {
4156 let mut flow = FlowBuilder::new();
4157 let node = flow.process::<()>();
4158
4159 let (in_send, input) = node.sim_input();
4160
4161 let out_recv = input.limit(q!(0)).sim_output();
4162
4163 flow.sim().exhaustive(async || {
4164 in_send.send(1);
4165 in_send.send(2);
4166
4167 out_recv.assert_yields_only::<i32, _>([]).await;
4168 });
4169 }
4170
4171 #[cfg(feature = "sim")]
4172 #[test]
4173 fn sim_merge_ordered() {
4174 let mut flow = FlowBuilder::new();
4175 let node = flow.process::<()>();
4176
4177 let (in_send, input) = node.sim_input();
4178 let (in_send2, input2) = node.sim_input();
4179
4180 let out_recv = input
4181 .merge_ordered(input2, nondet!())
4182 .sim_output();
4183
4184 let mut saw_out_of_order = false;
4185 let instances = flow.sim().exhaustive(async || {
4186 in_send.send(1);
4187 in_send.send(2);
4188 in_send2.send(3);
4189 in_send2.send(4);
4190
4191 let out = out_recv.collect::<Vec<_>>().await;
4192
4193 if out == [1, 3, 2, 4] {
4194 saw_out_of_order = true;
4195 }
4196
4197 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4200 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4201 assert_eq!(
4202 first_elements,
4203 vec![1, 2],
4204 "first input order violated: {:?}",
4205 out
4206 );
4207 assert_eq!(
4208 second_elements,
4209 vec![3, 4],
4210 "second input order violated: {:?}",
4211 out
4212 );
4213
4214 first_elements.append(&mut second_elements);
4215 first_elements.sort();
4216 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4217 });
4218
4219 assert!(saw_out_of_order);
4220 assert_eq!(instances, 6);
4221 }
4222
4223 #[cfg(feature = "sim")]
4226 #[test]
4227 fn sim_merge_ordered_one_empty() {
4228 let mut flow = FlowBuilder::new();
4229 let node = flow.process::<()>();
4230
4231 let (in_send, input) = node.sim_input();
4232 let (_in_send2, input2) = node.sim_input();
4233
4234 let out_recv = input
4235 .merge_ordered(input2, nondet!())
4236 .sim_output();
4237
4238 let instances = flow.sim().exhaustive(async || {
4239 in_send.send(1);
4240 in_send.send(2);
4241
4242 let out = out_recv.collect::<Vec<_>>().await;
4243 assert_eq!(out, vec![1, 2]);
4244 });
4245
4246 assert_eq!(instances, 1);
4248 }
4249
4250 #[cfg(feature = "sim")]
4256 #[test]
4257 fn sim_merge_ordered_cycle_back() {
4258 let mut flow = FlowBuilder::new();
4259 let node = flow.process::<()>();
4260
4261 let (in_send, input) = node.sim_input();
4262
4263 let (complete_cycle_back, cycle_back) =
4265 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4266
4267 let merged = input.merge_ordered(cycle_back, nondet!());
4269
4270 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4272
4273 let out_recv = merged.sim_output();
4274
4275 let mut saw_cycle_before_second = false;
4278 flow.sim().exhaustive(async || {
4279 in_send.send(1);
4280 in_send.send(2);
4281
4282 let out = out_recv.collect::<Vec<_>>().await;
4283
4284 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4286 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4287 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4288
4289 if out == [1, 10, 2] {
4291 saw_cycle_before_second = true;
4292 }
4293
4294 let mut sorted = out;
4295 sorted.sort();
4296 assert_eq!(sorted, vec![1, 2, 10]);
4297 });
4298
4299 assert!(
4300 saw_cycle_before_second,
4301 "never saw the cycled element arrive before the second input element"
4302 );
4303 }
4304
4305 #[cfg(feature = "sim")]
4309 #[test]
4310 fn sim_merge_ordered_delayed() {
4311 let mut flow = FlowBuilder::new();
4312 let node = flow.process::<()>();
4313
4314 let (in_send, input) = node.sim_input();
4315 let (in_send2, input2) = node.sim_input();
4316
4317 let out_recv = input
4318 .merge_ordered(input2, nondet!())
4319 .sim_output();
4320
4321 let mut saw_delayed_interleaving = false;
4322 flow.sim().exhaustive(async || {
4323 in_send.send(1);
4325 in_send2.send(3);
4326 in_send2.send(4);
4327
4328 let first_batch = out_recv.collect::<Vec<_>>().await;
4330
4331 in_send.send(2);
4333 let second_batch = out_recv.collect::<Vec<_>>().await;
4334
4335 let mut all: Vec<_> = first_batch
4336 .iter()
4337 .chain(second_batch.iter())
4338 .copied()
4339 .collect();
4340
4341 if all == [1, 3, 4, 2] {
4343 saw_delayed_interleaving = true;
4344 }
4345
4346 all.sort();
4347 assert_eq!(all, vec![1, 2, 3, 4]);
4348 });
4349
4350 assert!(saw_delayed_interleaving);
4351 }
4352
4353 #[cfg(feature = "deploy")]
4358 #[tokio::test]
4359 async fn deploy_merge_ordered_delayed() {
4360 let mut deployment = Deployment::new();
4361
4362 let mut flow = FlowBuilder::new();
4363 let node = flow.process::<()>();
4364 let external = flow.external::<()>();
4365
4366 let (input_a_port, input_a) = node.source_external_bincode(&external);
4367 let (input_b_port, input_b) = node.source_external_bincode(&external);
4368
4369 let out = input_a
4370 .assume_ordering(nondet!())
4371 .merge_ordered(
4372 input_b.assume_ordering(nondet!()),
4373 nondet!(),
4374 )
4375 .send_bincode_external(&external);
4376
4377 let nodes = flow
4378 .with_process(&node, deployment.Localhost())
4379 .with_external(&external, deployment.Localhost())
4380 .deploy(&mut deployment);
4381
4382 deployment.deploy().await.unwrap();
4383
4384 let mut ext_a = nodes.connect(input_a_port).await;
4385 let mut ext_b = nodes.connect(input_b_port).await;
4386 let mut ext_out = nodes.connect(out).await;
4387
4388 deployment.start().await.unwrap();
4389
4390 ext_a.send(1).await.unwrap();
4392 ext_b.send(3).await.unwrap();
4393 ext_b.send(4).await.unwrap();
4394
4395 let mut received = Vec::new();
4397 for _ in 0..3 {
4398 received.push(ext_out.next().await.unwrap());
4399 }
4400
4401 ext_a.send(2).await.unwrap();
4403 received.push(ext_out.next().await.unwrap());
4404
4405 received.sort();
4407 assert_eq!(received, vec![1, 2, 3, 4]);
4408 }
4409
4410 #[cfg(feature = "deploy")]
4411 #[tokio::test]
4412 async fn monotone_fold_threshold() {
4413 use crate::properties::manual_proof;
4414
4415 let mut deployment = Deployment::new();
4416
4417 let mut flow = FlowBuilder::new();
4418 let node = flow.process::<()>();
4419 let external = flow.external::<()>();
4420
4421 let in_unbounded: super::Stream<_, _> =
4422 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4423 let sum = in_unbounded.fold(
4424 q!(|| 0),
4425 q!(
4426 |sum, v| {
4427 *sum += v;
4428 },
4429 monotone = manual_proof!()
4430 ),
4431 );
4432
4433 let threshold_out = sum
4434 .threshold_greater_or_equal(node.singleton(q!(7)))
4435 .send_bincode_external(&external);
4436
4437 let nodes = flow
4438 .with_process(&node, deployment.Localhost())
4439 .with_external(&external, deployment.Localhost())
4440 .deploy(&mut deployment);
4441
4442 deployment.deploy().await.unwrap();
4443
4444 let mut threshold_out = nodes.connect(threshold_out).await;
4445
4446 deployment.start().await.unwrap();
4447
4448 assert_eq!(threshold_out.next().await.unwrap(), 7);
4449 }
4450
4451 #[cfg(feature = "deploy")]
4452 #[tokio::test]
4453 async fn monotone_count_threshold() {
4454 let mut deployment = Deployment::new();
4455
4456 let mut flow = FlowBuilder::new();
4457 let node = flow.process::<()>();
4458 let external = flow.external::<()>();
4459
4460 let in_unbounded: super::Stream<_, _> =
4461 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4462 let sum = in_unbounded.count();
4463
4464 let threshold_out = sum
4465 .threshold_greater_or_equal(node.singleton(q!(3)))
4466 .send_bincode_external(&external);
4467
4468 let nodes = flow
4469 .with_process(&node, deployment.Localhost())
4470 .with_external(&external, deployment.Localhost())
4471 .deploy(&mut deployment);
4472
4473 deployment.deploy().await.unwrap();
4474
4475 let mut threshold_out = nodes.connect(threshold_out).await;
4476
4477 deployment.start().await.unwrap();
4478
4479 assert_eq!(threshold_out.next().await.unwrap(), 3);
4480 }
4481
4482 #[cfg(feature = "deploy")]
4483 #[tokio::test]
4484 async fn monotone_map_order_preserving_threshold() {
4485 use crate::properties::manual_proof;
4486
4487 let mut deployment = Deployment::new();
4488
4489 let mut flow = FlowBuilder::new();
4490 let node = flow.process::<()>();
4491 let external = flow.external::<()>();
4492
4493 let in_unbounded: super::Stream<_, _> =
4494 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4495 let sum = in_unbounded.fold(
4496 q!(|| 0),
4497 q!(
4498 |sum, v| {
4499 *sum += v;
4500 },
4501 monotone = manual_proof!()
4502 ),
4503 );
4504
4505 let doubled = sum.map(q!(
4507 |v| v * 2,
4508 order_preserving = manual_proof!()
4509 ));
4510
4511 let threshold_out = doubled
4512 .threshold_greater_or_equal(node.singleton(q!(14)))
4513 .send_bincode_external(&external);
4514
4515 let nodes = flow
4516 .with_process(&node, deployment.Localhost())
4517 .with_external(&external, deployment.Localhost())
4518 .deploy(&mut deployment);
4519
4520 deployment.deploy().await.unwrap();
4521
4522 let mut threshold_out = nodes.connect(threshold_out).await;
4523
4524 deployment.start().await.unwrap();
4525
4526 assert_eq!(threshold_out.next().await.unwrap(), 14);
4527 }
4528
4529 #[cfg(any(feature = "deploy", feature = "sim"))]
4532 mod join_ordering_type_tests {
4533 use crate::live_collections::boundedness::{Bounded, Unbounded};
4534 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4535 use crate::location::{Location, Process};
4536
4537 #[expect(dead_code, reason = "compile-time type test")]
4538 fn join_unbounded_with_bounded_preserves_order<'a>(
4539 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4540 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4541 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4542 left.join(right)
4543 }
4544
4545 #[expect(dead_code, reason = "compile-time type test")]
4546 fn join_unbounded_with_unbounded_is_no_order<'a>(
4547 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4548 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4549 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4550 left.join(right)
4551 }
4552
4553 #[expect(dead_code, reason = "compile-time type test")]
4554 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4555 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4556 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4557 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4558 left.join(right)
4559 }
4560
4561 #[expect(dead_code, reason = "compile-time type test")]
4562 fn join_unbounded_noorder_with_bounded<'a>(
4563 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4564 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4565 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4566 left.join(right)
4567 }
4568
4569 #[expect(dead_code, reason = "compile-time type test")]
4572 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4573 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4574 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4575 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4576 left.cross_product(right)
4577 }
4578
4579 #[expect(dead_code, reason = "compile-time type test")]
4580 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4581 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4582 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4583 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4584 left.cross_product(right)
4585 }
4586
4587 #[expect(dead_code, reason = "compile-time type test")]
4588 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4589 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4590 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4591 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4592 left.cross_product(right)
4593 }
4594 } #[cfg(feature = "sim")]
4599 #[test]
4600 fn cross_product_mixed_boundedness_correctness() {
4601 use stageleft::q;
4602
4603 use crate::compile::builder::FlowBuilder;
4604 use crate::nondet::nondet;
4605
4606 let mut flow = FlowBuilder::new();
4607 let process = flow.process::<()>();
4608 let tick = process.tick();
4609
4610 let left = process.source_iter(q!(vec![1, 2]));
4611 let right = process
4612 .source_iter(q!(vec!['a', 'b']))
4613 .batch(&tick, nondet!())
4614 .all_ticks();
4615
4616 let out = left.cross_product(right).sim_output();
4617
4618 flow.sim().exhaustive(async || {
4619 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4620 .await;
4621 });
4622 }
4623
4624 #[cfg(feature = "sim")]
4625 #[test]
4626 fn join_mixed_boundedness_correctness() {
4627 use stageleft::q;
4628
4629 use crate::compile::builder::FlowBuilder;
4630 use crate::nondet::nondet;
4631
4632 let mut flow = FlowBuilder::new();
4633 let process = flow.process::<()>();
4634 let tick = process.tick();
4635
4636 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4637 let right = process
4638 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4639 .batch(&tick, nondet!())
4640 .all_ticks();
4641
4642 let out = left.join(right).sim_output();
4643
4644 flow.sim().exhaustive(async || {
4645 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4646 .await;
4647 });
4648 }
4649
4650 #[cfg(feature = "sim")]
4651 #[test]
4652 fn sim_merge_unordered_independent_atomics() {
4653 let mut flow = FlowBuilder::new();
4654 let node = flow.process::<()>();
4655
4656 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4657 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4658
4659 let out = input1
4660 .atomic()
4661 .merge_unordered(input2.atomic())
4662 .end_atomic()
4663 .sim_output();
4664
4665 flow.sim().exhaustive(async || {
4666 in1_send.send(1);
4667 in2_send.send(2);
4668
4669 out.assert_yields_only_unordered(vec![1, 2]).await;
4670 });
4671 }
4672
4673 #[cfg(feature = "deploy")]
4674 #[tokio::test]
4675 async fn test_stream_ref() {
4676 let mut deployment = Deployment::new();
4677
4678 let mut flow = FlowBuilder::new();
4679 let external = flow.external::<()>();
4680 let p1 = flow.process::<()>();
4681
4682 let my_stream = p1.source_iter(q!(1..=5i32));
4684
4685 let stream_ref = my_stream.by_ref();
4686
4687 let out_port = p1
4689 .source_iter(q!([()]))
4690 .map(q!(|_| stream_ref.len() as i32))
4691 .send_bincode_external(&external);
4692
4693 my_stream.for_each(q!(|_| {}));
4695
4696 let nodes = flow
4697 .with_default_optimize()
4698 .with_process(&p1, deployment.Localhost())
4699 .with_external(&external, deployment.Localhost())
4700 .deploy(&mut deployment);
4701
4702 deployment.deploy().await.unwrap();
4703
4704 let mut out_recv = nodes.connect(out_port).await;
4705
4706 deployment.start().await.unwrap();
4707
4708 let result = out_recv.next().await.unwrap();
4709 assert_eq!(result, 5);
4711 }
4712
4713 #[cfg(feature = "deploy")]
4714 #[tokio::test]
4715 async fn test_stream_ref_contents() {
4716 let mut deployment = Deployment::new();
4717
4718 let mut flow = FlowBuilder::new();
4719 let external = flow.external::<()>();
4720 let p1 = flow.process::<()>();
4721
4722 let my_stream = p1.source_iter(q!(1..=3i32));
4724
4725 let stream_ref = my_stream.by_ref();
4726
4727 let out_port = p1
4729 .source_iter(q!([()]))
4730 .map(q!(|_| stream_ref.iter().sum::<i32>()))
4731 .send_bincode_external(&external);
4732
4733 my_stream.for_each(q!(|_| {}));
4734
4735 let nodes = flow
4736 .with_default_optimize()
4737 .with_process(&p1, deployment.Localhost())
4738 .with_external(&external, deployment.Localhost())
4739 .deploy(&mut deployment);
4740
4741 deployment.deploy().await.unwrap();
4742
4743 let mut out_recv = nodes.connect(out_port).await;
4744
4745 deployment.start().await.unwrap();
4746
4747 let result = out_recv.next().await.unwrap();
4748 assert_eq!(result, 6);
4750 }
4751
4752 #[cfg(feature = "deploy")]
4753 #[tokio::test]
4754 async fn test_stream_ref_no_consumer() {
4755 let mut deployment = Deployment::new();
4756
4757 let mut flow = FlowBuilder::new();
4758 let external = flow.external::<()>();
4759 let p1 = flow.process::<()>();
4760
4761 let my_stream = p1.source_iter(q!(1..=4i32));
4763
4764 let stream_ref = my_stream.by_ref();
4765
4766 let out_port = p1
4767 .source_iter(q!([()]))
4768 .map(q!(|_| stream_ref.len() as i32))
4769 .send_bincode_external(&external);
4770
4771 let nodes = flow
4772 .with_default_optimize()
4773 .with_process(&p1, deployment.Localhost())
4774 .with_external(&external, deployment.Localhost())
4775 .deploy(&mut deployment);
4776
4777 deployment.deploy().await.unwrap();
4778
4779 let mut out_recv = nodes.connect(out_port).await;
4780
4781 deployment.start().await.unwrap();
4782
4783 let result = out_recv.next().await.unwrap();
4784 assert_eq!(result, 4);
4785 }
4786
4787 #[cfg(feature = "deploy")]
4788 #[tokio::test]
4789 async fn test_stream_mut() {
4790 let mut deployment = Deployment::new();
4791
4792 let mut flow = FlowBuilder::new();
4793 let external = flow.external::<()>();
4794 let p1 = flow.process::<()>();
4795
4796 let my_stream = p1.source_iter(q!(1..=5i32));
4798
4799 let stream_mut = my_stream.by_mut();
4800
4801 let out_port = p1
4803 .source_iter(q!([()]))
4804 .map(q!(|_| {
4805 stream_mut.retain(|x| *x > 3);
4806 stream_mut.len() as i32
4807 }))
4808 .send_bincode_external(&external);
4809
4810 my_stream.for_each(q!(|_| {}));
4811
4812 let nodes = flow
4813 .with_default_optimize()
4814 .with_process(&p1, deployment.Localhost())
4815 .with_external(&external, deployment.Localhost())
4816 .deploy(&mut deployment);
4817
4818 deployment.deploy().await.unwrap();
4819
4820 let mut out_recv = nodes.connect(out_port).await;
4821
4822 deployment.start().await.unwrap();
4823
4824 let result = out_recv.next().await.unwrap();
4825 assert_eq!(result, 2);
4827 }
4828
4829 #[cfg(feature = "sim")]
4833 #[test]
4834 fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4835 use crate::live_collections::sliced::sliced;
4836 use crate::live_collections::stream::ExactlyOnce;
4837 use crate::properties::manual_proof;
4838
4839 let mut flow = FlowBuilder::new();
4840 let node = flow.process::<()>();
4841
4842 let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4843
4844 let out_recv = sliced! {
4845 let batch = use(trigger, nondet!());
4846 let counter = batch.location().source_iter(q!(vec![0i32]))
4847 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4848 let counter_mut = counter.by_mut();
4849 let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4850 items.map(q!(
4851 |x| {
4852 *counter_mut += x;
4853 *counter_mut
4854 },
4855 commutative = manual_proof!()
4856 ))
4857 }
4858 .sim_output();
4859
4860 let count = flow.sim().exhaustive(async || {
4861 trigger_send.send(1);
4862 let _all: Vec<i32> = out_recv.collect_sorted().await;
4863 });
4864
4865 assert_eq!(
4866 count, 2,
4867 "Expected 2 simulation instances due to mut on unordered input, got {}",
4868 count
4869 );
4870 }
4871
4872 #[cfg(feature = "sim")]
4876 #[test]
4877 #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4878 fn sim_map_with_mut_on_unordered_top_level() {
4879 use crate::properties::manual_proof;
4880
4881 let mut flow = FlowBuilder::new();
4882 let node = flow.process::<()>();
4883
4884 let counter = node
4885 .source_iter(q!(vec![0i32]))
4886 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4887 let counter_mut = counter.by_mut();
4888
4889 let out_recv = node
4890 .source_iter(q!(vec![1i32, 2]))
4891 .weaken_ordering::<NoOrder>()
4892 .map(q!(
4893 |x| {
4894 *counter_mut += x;
4895 *counter_mut
4896 },
4897 commutative = manual_proof!()
4898 ))
4899 .assume_ordering::<TotalOrder>(nondet!())
4900 .sim_output();
4901
4902 counter.into_stream().for_each(q!(|_| {}));
4903
4904 let count = flow.sim().exhaustive(async || {
4905 let _all: Vec<i32> = out_recv.collect().await;
4906 });
4907
4908 assert_eq!(
4909 count, 2,
4910 "Expected 2 simulation instances due to mut on unordered input, got {}",
4911 count
4912 );
4913 }
4914}