1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606 fn singleton_intermediates(&self) -> bool {
607 false
608 }
609
610 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611 self.entry(location.root().key())
612 .expect("location was removed")
613 .or_default()
614 }
615
616 fn batch(
617 &mut self,
618 in_ident: syn::Ident,
619 in_location: &LocationId,
620 in_kind: &CollectionKind,
621 out_ident: &syn::Ident,
622 _out_location: &LocationId,
623 _op_meta: &HydroIrOpMetadata,
624 _fold_hooked_idents: &HashSet<String>,
625 ) {
626 let builder = self.get_dfir_mut(in_location.root());
627 if in_kind.is_bounded()
628 && matches!(
629 in_kind,
630 CollectionKind::Singleton { .. }
631 | CollectionKind::Optional { .. }
632 | CollectionKind::KeyedSingleton { .. }
633 )
634 {
635 assert!(in_location.is_top_level());
636 builder.add_dfir(
637 parse_quote! {
638 #out_ident = #in_ident -> persist::<'static>();
639 },
640 None,
641 None,
642 );
643 } else {
644 builder.add_dfir(
645 parse_quote! {
646 #out_ident = #in_ident;
647 },
648 None,
649 None,
650 );
651 }
652 }
653
654 fn yield_from_tick(
655 &mut self,
656 in_ident: syn::Ident,
657 in_location: &LocationId,
658 _in_kind: &CollectionKind,
659 out_ident: &syn::Ident,
660 _out_location: &LocationId,
661 ) {
662 let builder = self.get_dfir_mut(in_location.root());
663 builder.add_dfir(
664 parse_quote! {
665 #out_ident = #in_ident;
666 },
667 None,
668 None,
669 );
670 }
671
672 fn begin_atomic(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 _op_meta: &HydroIrOpMetadata,
680 ) {
681 let builder = self.get_dfir_mut(in_location.root());
682 builder.add_dfir(
683 parse_quote! {
684 #out_ident = #in_ident;
685 },
686 None,
687 None,
688 );
689 }
690
691 fn end_atomic(
692 &mut self,
693 in_ident: syn::Ident,
694 in_location: &LocationId,
695 _in_kind: &CollectionKind,
696 out_ident: &syn::Ident,
697 ) {
698 let builder = self.get_dfir_mut(in_location.root());
699 builder.add_dfir(
700 parse_quote! {
701 #out_ident = #in_ident;
702 },
703 None,
704 None,
705 );
706 }
707
708 fn observe_nondet(
709 &mut self,
710 _trusted: bool,
711 location: &LocationId,
712 in_ident: syn::Ident,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 _out_kind: &CollectionKind,
716 _op_meta: &HydroIrOpMetadata,
717 ) {
718 let builder = self.get_dfir_mut(location);
719 builder.add_dfir(
720 parse_quote! {
721 #out_ident = #in_ident;
722 },
723 None,
724 None,
725 );
726 }
727
728 fn merge_ordered(
729 &mut self,
730 location: &LocationId,
731 first_ident: syn::Ident,
732 second_ident: syn::Ident,
733 out_ident: &syn::Ident,
734 _in_kind: &CollectionKind,
735 _op_meta: &HydroIrOpMetadata,
736 operator_tag: Option<&str>,
737 ) {
738 let builder = self.get_dfir_mut(location);
739 builder.add_dfir(
740 parse_quote! {
741 #out_ident = union();
742 #first_ident -> [0]#out_ident;
743 #second_ident -> [1]#out_ident;
744 },
745 None,
746 operator_tag,
747 );
748 }
749
750 fn create_network(
751 &mut self,
752 from: &LocationId,
753 to: &LocationId,
754 input_ident: syn::Ident,
755 out_ident: &syn::Ident,
756 serialize: Option<&DebugExpr>,
757 sink: syn::Expr,
758 source: syn::Expr,
759 deserialize: Option<&DebugExpr>,
760 tag_id: StmtId,
761 _networking_info: &crate::networking::NetworkingInfo,
762 ) {
763 let sender_builder = self.get_dfir_mut(from);
764 if let Some(serialize_pipeline) = serialize {
765 sender_builder.add_dfir(
766 parse_quote! {
767 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768 },
769 None,
770 Some(&format!("send{}", tag_id)),
772 );
773 } else {
774 sender_builder.add_dfir(
775 parse_quote! {
776 #input_ident -> dest_sink(#sink);
777 },
778 None,
779 Some(&format!("send{}", tag_id)),
780 );
781 }
782
783 let receiver_builder = self.get_dfir_mut(to);
784 if let Some(deserialize_pipeline) = deserialize {
785 receiver_builder.add_dfir(
786 parse_quote! {
787 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788 },
789 None,
790 Some(&format!("recv{}", tag_id)),
791 );
792 } else {
793 receiver_builder.add_dfir(
794 parse_quote! {
795 #out_ident = source_stream(#source);
796 },
797 None,
798 Some(&format!("recv{}", tag_id)),
799 );
800 }
801 }
802
803 fn create_external_source(
804 &mut self,
805 on: &LocationId,
806 source_expr: syn::Expr,
807 out_ident: &syn::Ident,
808 deserialize: Option<&DebugExpr>,
809 tag_id: StmtId,
810 ) {
811 let receiver_builder = self.get_dfir_mut(on);
812 if let Some(deserialize_pipeline) = deserialize {
813 receiver_builder.add_dfir(
814 parse_quote! {
815 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816 },
817 None,
818 Some(&format!("recv{}", tag_id)),
819 );
820 } else {
821 receiver_builder.add_dfir(
822 parse_quote! {
823 #out_ident = source_stream(#source_expr);
824 },
825 None,
826 Some(&format!("recv{}", tag_id)),
827 );
828 }
829 }
830
831 fn create_external_output(
832 &mut self,
833 on: &LocationId,
834 sink_expr: syn::Expr,
835 input_ident: &syn::Ident,
836 serialize: Option<&DebugExpr>,
837 tag_id: StmtId,
838 ) {
839 let sender_builder = self.get_dfir_mut(on);
840 if let Some(serialize_fn) = serialize {
841 sender_builder.add_dfir(
842 parse_quote! {
843 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844 },
845 None,
846 Some(&format!("send{}", tag_id)),
848 );
849 } else {
850 sender_builder.add_dfir(
851 parse_quote! {
852 #input_ident -> dest_sink(#sink_expr);
853 },
854 None,
855 Some(&format!("send{}", tag_id)),
856 );
857 }
858 }
859
860 fn emit_fold_hook(
861 &mut self,
862 _location: &LocationId,
863 _in_ident: &syn::Ident,
864 _in_kind: &CollectionKind,
865 _op_meta: &HydroIrOpMetadata,
866 ) -> Option<syn::Ident> {
867 None
868 }
869
870 fn assert_is_consistent(
871 &mut self,
872 _trusted: bool,
873 location: &LocationId,
874 in_ident: syn::Ident,
875 out_ident: &syn::Ident,
876 ) {
877 let builder = self.get_dfir_mut(location);
878 builder.add_dfir(
879 parse_quote! {
880 #out_ident = #in_ident;
881 },
882 None,
883 None,
884 );
885 }
886
887 fn observe_for_mut(
888 &mut self,
889 location: &LocationId,
890 in_ident: syn::Ident,
891 _in_kind: &CollectionKind,
892 out_ident: &syn::Ident,
893 _op_meta: &HydroIrOpMetadata,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912 Builders(&'a mut dyn DfirBuilder),
913 Callback(L, N),
914}
915
916#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921 ForEach {
922 f: ClosureExpr,
923 input: Box<HydroNode>,
924 op_metadata: HydroIrOpMetadata,
925 },
926 SendExternal {
927 to_external_key: LocationKey,
928 to_port_id: ExternalPortId,
929 to_many: bool,
930 unpaired: bool,
931 serialize_fn: Option<DebugExpr>,
932 instantiate_fn: DebugInstantiate,
933 input: Box<HydroNode>,
934 op_metadata: HydroIrOpMetadata,
935 },
936 DestSink {
937 sink: DebugExpr,
938 input: Box<HydroNode>,
939 op_metadata: HydroIrOpMetadata,
940 },
941 CycleSink {
942 cycle_id: CycleId,
943 input: Box<HydroNode>,
944 op_metadata: HydroIrOpMetadata,
945 },
946 EmbeddedOutput {
947 #[serde(serialize_with = "serialize_ident")]
948 ident: syn::Ident,
949 input: Box<HydroNode>,
950 op_metadata: HydroIrOpMetadata,
951 },
952 Null {
953 input: Box<HydroNode>,
954 op_metadata: HydroIrOpMetadata,
955 },
956}
957
958impl HydroRoot {
959 #[cfg(feature = "build")]
960 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961 pub fn compile_network<'a, D>(
962 &mut self,
963 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964 seen_tees: &mut SeenSharedNodes,
965 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966 processes: &SparseSecondaryMap<LocationKey, D::Process>,
967 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968 externals: &SparseSecondaryMap<LocationKey, D::External>,
969 env: &mut D::InstantiateEnv,
970 ) where
971 D: Deploy<'a>,
972 {
973 let refcell_extra_stmts = RefCell::new(extra_stmts);
974 let refcell_env = RefCell::new(env);
975 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976 self.transform_bottom_up(
977 &mut |l| {
978 if let HydroRoot::SendExternal {
979 input,
980 to_external_key,
981 to_port_id,
982 to_many,
983 unpaired,
984 instantiate_fn,
985 ..
986 } = l
987 {
988 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
989 DebugInstantiate::Building => {
990 let to_node = externals
991 .get(*to_external_key)
992 .unwrap_or_else(|| {
993 panic!("A external used in the graph was not instantiated: {}", to_external_key)
994 })
995 .clone();
996
997 match input.metadata().location_id.root() {
998 &LocationId::Process(process_key) => {
999 if *to_many {
1000 (
1001 (
1002 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1003 parse_quote!(DUMMY),
1004 ),
1005 Box::new(|| {}) as Box<dyn FnOnce()>,
1006 )
1007 } else {
1008 let from_node = processes
1009 .get(process_key)
1010 .unwrap_or_else(|| {
1011 panic!("A process used in the graph was not instantiated: {}", process_key)
1012 })
1013 .clone();
1014
1015 let sink_port = from_node.next_port();
1016 let source_port = to_node.next_port();
1017
1018 if *unpaired {
1019 use stageleft::quote_type;
1020 use tokio_util::codec::LengthDelimitedCodec;
1021
1022 to_node.register(*to_port_id, source_port.clone());
1023
1024 let _ = D::e2o_source(
1025 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1026 &to_node, &source_port,
1027 &from_node, &sink_port,
1028 "e_type::<LengthDelimitedCodec>(),
1029 format!("{}_{}", *to_external_key, *to_port_id)
1030 );
1031 }
1032
1033 (
1034 (
1035 D::o2e_sink(
1036 &from_node,
1037 &sink_port,
1038 &to_node,
1039 &source_port,
1040 format!("{}_{}", *to_external_key, *to_port_id)
1041 ),
1042 parse_quote!(DUMMY),
1043 ),
1044 if *unpaired {
1045 D::e2o_connect(
1046 &to_node,
1047 &source_port,
1048 &from_node,
1049 &sink_port,
1050 *to_many,
1051 NetworkHint::Auto,
1052 )
1053 } else {
1054 Box::new(|| {}) as Box<dyn FnOnce()>
1055 },
1056 )
1057 }
1058 }
1059 LocationId::Cluster(cluster_key) => {
1060 let from_node = clusters
1061 .get(*cluster_key)
1062 .unwrap_or_else(|| {
1063 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1064 })
1065 .clone();
1066
1067 let sink_port = from_node.next_port();
1068 let source_port = to_node.next_port();
1069
1070 if *unpaired {
1071 to_node.register(*to_port_id, source_port.clone());
1072 }
1073
1074 (
1075 (
1076 D::m2e_sink(
1077 &from_node,
1078 &sink_port,
1079 &to_node,
1080 &source_port,
1081 format!("{}_{}", *to_external_key, *to_port_id)
1082 ),
1083 parse_quote!(DUMMY),
1084 ),
1085 Box::new(|| {}) as Box<dyn FnOnce()>,
1086 )
1087 }
1088 _ => panic!()
1089 }
1090 },
1091
1092 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1093 };
1094
1095 *instantiate_fn = DebugInstantiateFinalized {
1096 sink: sink_expr,
1097 source: source_expr,
1098 connect_fn: Some(connect_fn),
1099 }
1100 .into();
1101 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1102 let element_type = match &input.metadata().collection_kind {
1103 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1104 _ => panic!("Embedded output must have Stream collection kind"),
1105 };
1106 let location_key = match input.metadata().location_id.root() {
1107 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1108 _ => panic!("Embedded output must be on a process or cluster"),
1109 };
1110 D::register_embedded_output(
1111 &mut refcell_env.borrow_mut(),
1112 location_key,
1113 ident,
1114 &element_type,
1115 );
1116 }
1117 },
1118 &mut |n| {
1119 if let HydroNode::Network {
1120 name,
1121 networking_info,
1122 input,
1123 instantiate_fn,
1124 metadata,
1125 ..
1126 } = n
1127 {
1128 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1129 DebugInstantiate::Building => instantiate_network::<D>(
1130 &mut refcell_env.borrow_mut(),
1131 input.metadata().location_id.root(),
1132 metadata.location_id.root(),
1133 processes,
1134 clusters,
1135 name.as_deref(),
1136 networking_info,
1137 ),
1138
1139 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1140 };
1141
1142 *instantiate_fn = DebugInstantiateFinalized {
1143 sink: sink_expr,
1144 source: source_expr,
1145 connect_fn: Some(connect_fn),
1146 }
1147 .into();
1148 } else if let HydroNode::ExternalInput {
1149 from_external_key,
1150 from_port_id,
1151 from_many,
1152 codec_type,
1153 port_hint,
1154 instantiate_fn,
1155 metadata,
1156 ..
1157 } = n
1158 {
1159 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1160 DebugInstantiate::Building => {
1161 let from_node = externals
1162 .get(*from_external_key)
1163 .unwrap_or_else(|| {
1164 panic!(
1165 "A external used in the graph was not instantiated: {}",
1166 from_external_key,
1167 )
1168 })
1169 .clone();
1170
1171 match metadata.location_id.root() {
1172 &LocationId::Process(process_key) => {
1173 let to_node = processes
1174 .get(process_key)
1175 .unwrap_or_else(|| {
1176 panic!("A process used in the graph was not instantiated: {}", process_key)
1177 })
1178 .clone();
1179
1180 let sink_port = from_node.next_port();
1181 let source_port = to_node.next_port();
1182
1183 from_node.register(*from_port_id, sink_port.clone());
1184
1185 (
1186 (
1187 parse_quote!(DUMMY),
1188 if *from_many {
1189 D::e2o_many_source(
1190 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1191 &to_node, &source_port,
1192 codec_type.0.as_ref(),
1193 format!("{}_{}", *from_external_key, *from_port_id)
1194 )
1195 } else {
1196 D::e2o_source(
1197 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1198 &from_node, &sink_port,
1199 &to_node, &source_port,
1200 codec_type.0.as_ref(),
1201 format!("{}_{}", *from_external_key, *from_port_id)
1202 )
1203 },
1204 ),
1205 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1206 )
1207 }
1208 LocationId::Cluster(cluster_key) => {
1209 let to_node = clusters
1210 .get(*cluster_key)
1211 .unwrap_or_else(|| {
1212 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1213 })
1214 .clone();
1215
1216 let sink_port = from_node.next_port();
1217 let source_port = to_node.next_port();
1218
1219 from_node.register(*from_port_id, sink_port.clone());
1220
1221 (
1222 (
1223 parse_quote!(DUMMY),
1224 D::e2m_source(
1225 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1226 &from_node, &sink_port,
1227 &to_node, &source_port,
1228 codec_type.0.as_ref(),
1229 format!("{}_{}", *from_external_key, *from_port_id)
1230 ),
1231 ),
1232 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1233 )
1234 }
1235 _ => panic!()
1236 }
1237 },
1238
1239 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1240 };
1241
1242 *instantiate_fn = DebugInstantiateFinalized {
1243 sink: sink_expr,
1244 source: source_expr,
1245 connect_fn: Some(connect_fn),
1246 }
1247 .into();
1248 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1249 let element_type = match &metadata.collection_kind {
1250 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1251 _ => panic!("Embedded source must have Stream collection kind"),
1252 };
1253 let location_key = match metadata.location_id.root() {
1254 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1255 _ => panic!("Embedded source must be on a process or cluster"),
1256 };
1257 D::register_embedded_stream_input(
1258 &mut refcell_env.borrow_mut(),
1259 location_key,
1260 ident,
1261 &element_type,
1262 );
1263 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1264 let element_type = match &metadata.collection_kind {
1265 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1266 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1267 };
1268 let location_key = match metadata.location_id.root() {
1269 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1271 };
1272 D::register_embedded_singleton_input(
1273 &mut refcell_env.borrow_mut(),
1274 location_key,
1275 ident,
1276 &element_type,
1277 );
1278 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1279 match state {
1280 ClusterMembersState::Uninit => {
1281 let at_location = metadata.location_id.root().clone();
1282 let key = (at_location.clone(), location_id.key());
1283 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1284 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1286 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1287 &(),
1288 );
1289 *state = ClusterMembersState::Stream(expr.into());
1290 } else {
1291 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1293 }
1294 }
1295 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1296 panic!("cluster members already finalized");
1297 }
1298 }
1299 }
1300 },
1301 seen_tees,
1302 false,
1303 );
1304 }
1305
1306 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1307 self.transform_bottom_up(
1308 &mut |l| {
1309 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1310 match instantiate_fn {
1311 DebugInstantiate::Building => panic!("network not built"),
1312
1313 DebugInstantiate::Finalized(finalized) => {
1314 (finalized.connect_fn.take().unwrap())();
1315 }
1316 }
1317 }
1318 },
1319 &mut |n| {
1320 if let HydroNode::Network { instantiate_fn, .. }
1321 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1322 {
1323 match instantiate_fn {
1324 DebugInstantiate::Building => panic!("network not built"),
1325
1326 DebugInstantiate::Finalized(finalized) => {
1327 (finalized.connect_fn.take().unwrap())();
1328 }
1329 }
1330 }
1331 },
1332 seen_tees,
1333 false,
1334 );
1335 }
1336
1337 pub fn transform_bottom_up(
1338 &mut self,
1339 transform_root: &mut impl FnMut(&mut HydroRoot),
1340 transform_node: &mut impl FnMut(&mut HydroNode),
1341 seen_tees: &mut SeenSharedNodes,
1342 check_well_formed: bool,
1343 ) {
1344 self.transform_children(
1345 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1346 seen_tees,
1347 );
1348
1349 transform_root(self);
1350 }
1351
1352 pub fn transform_children(
1353 &mut self,
1354 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1355 seen_tees: &mut SeenSharedNodes,
1356 ) {
1357 match self {
1358 HydroRoot::ForEach { f, input, .. } => {
1359 f.transform_children(&mut transform, seen_tees);
1360 transform(input, seen_tees);
1361 }
1362 HydroRoot::SendExternal { input, .. }
1363 | HydroRoot::DestSink { input, .. }
1364 | HydroRoot::CycleSink { input, .. }
1365 | HydroRoot::EmbeddedOutput { input, .. }
1366 | HydroRoot::Null { input, .. } => {
1367 transform(input, seen_tees);
1368 }
1369 }
1370 }
1371
1372 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1373 match self {
1374 HydroRoot::ForEach {
1375 f,
1376 input,
1377 op_metadata,
1378 } => HydroRoot::ForEach {
1379 f: f.deep_clone(seen_tees),
1380 input: Box::new(input.deep_clone(seen_tees)),
1381 op_metadata: op_metadata.clone(),
1382 },
1383 HydroRoot::SendExternal {
1384 to_external_key,
1385 to_port_id,
1386 to_many,
1387 unpaired,
1388 serialize_fn,
1389 instantiate_fn,
1390 input,
1391 op_metadata,
1392 } => HydroRoot::SendExternal {
1393 to_external_key: *to_external_key,
1394 to_port_id: *to_port_id,
1395 to_many: *to_many,
1396 unpaired: *unpaired,
1397 serialize_fn: serialize_fn.clone(),
1398 instantiate_fn: instantiate_fn.clone(),
1399 input: Box::new(input.deep_clone(seen_tees)),
1400 op_metadata: op_metadata.clone(),
1401 },
1402 HydroRoot::DestSink {
1403 sink,
1404 input,
1405 op_metadata,
1406 } => HydroRoot::DestSink {
1407 sink: sink.clone(),
1408 input: Box::new(input.deep_clone(seen_tees)),
1409 op_metadata: op_metadata.clone(),
1410 },
1411 HydroRoot::CycleSink {
1412 cycle_id,
1413 input,
1414 op_metadata,
1415 } => HydroRoot::CycleSink {
1416 cycle_id: *cycle_id,
1417 input: Box::new(input.deep_clone(seen_tees)),
1418 op_metadata: op_metadata.clone(),
1419 },
1420 HydroRoot::EmbeddedOutput {
1421 ident,
1422 input,
1423 op_metadata,
1424 } => HydroRoot::EmbeddedOutput {
1425 ident: ident.clone(),
1426 input: Box::new(input.deep_clone(seen_tees)),
1427 op_metadata: op_metadata.clone(),
1428 },
1429 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1430 input: Box::new(input.deep_clone(seen_tees)),
1431 op_metadata: op_metadata.clone(),
1432 },
1433 }
1434 }
1435
1436 #[cfg(feature = "build")]
1437 pub fn emit(
1438 &mut self,
1439 graph_builders: &mut dyn DfirBuilder,
1440 seen_tees: &mut SeenSharedNodes,
1441 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1442 next_stmt_id: &mut crate::Counter<StmtId>,
1443 fold_hooked_idents: &mut HashSet<String>,
1444 ) {
1445 self.emit_core(
1446 &mut BuildersOrCallback::<
1447 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1448 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1449 >::Builders(graph_builders),
1450 seen_tees,
1451 built_tees,
1452 next_stmt_id,
1453 fold_hooked_idents,
1454 );
1455 }
1456
1457 #[cfg(feature = "build")]
1458 pub fn emit_core(
1459 &mut self,
1460 builders_or_callback: &mut BuildersOrCallback<
1461 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1462 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1463 >,
1464 seen_tees: &mut SeenSharedNodes,
1465 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1466 next_stmt_id: &mut crate::Counter<StmtId>,
1467 fold_hooked_idents: &mut HashSet<String>,
1468 ) {
1469 match self {
1470 HydroRoot::ForEach { f, input, .. } => {
1471 let input_ident = input.emit_core(
1472 builders_or_callback,
1473 seen_tees,
1474 built_tees,
1475 next_stmt_id,
1476 fold_hooked_idents,
1477 );
1478
1479 let input_ident = maybe_observe_for_mut(
1480 f,
1481 input_ident,
1482 &input.metadata().location_id,
1483 &input.metadata().collection_kind,
1484 &input.metadata().op,
1485 builders_or_callback,
1486 next_stmt_id,
1487 );
1488
1489 let stmt_id = next_stmt_id.get_and_increment();
1490
1491 match builders_or_callback {
1492 BuildersOrCallback::Builders(graph_builders) => {
1493 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1494
1495 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1497 let HydroNode::Reference { inner, .. } = ref_node else {
1498 panic!("singleton_refs should only contain HydroNode::Reference");
1499 };
1500 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1501 let idents = built_tees.get(&ptr).expect(
1502 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1503 );
1504 ident_stack.push(idents[0].clone());
1505 }
1506
1507 let f_tokens = f.emit_tokens(&mut ident_stack);
1508
1509 graph_builders
1510 .get_dfir_mut(&input.metadata().location_id)
1511 .add_dfir(
1512 parse_quote! {
1513 #input_ident -> for_each(#f_tokens);
1514 },
1515 None,
1516 Some(&stmt_id.to_string()),
1517 );
1518 }
1519 BuildersOrCallback::Callback(leaf_callback, _) => {
1520 leaf_callback(self, next_stmt_id);
1521 }
1522 }
1523 }
1524
1525 HydroRoot::SendExternal {
1526 serialize_fn,
1527 instantiate_fn,
1528 input,
1529 ..
1530 } => {
1531 let input_ident = input.emit_core(
1532 builders_or_callback,
1533 seen_tees,
1534 built_tees,
1535 next_stmt_id,
1536 fold_hooked_idents,
1537 );
1538
1539 let stmt_id = next_stmt_id.get_and_increment();
1540
1541 match builders_or_callback {
1542 BuildersOrCallback::Builders(graph_builders) => {
1543 let (sink_expr, _) = match instantiate_fn {
1544 DebugInstantiate::Building => (
1545 syn::parse_quote!(DUMMY_SINK),
1546 syn::parse_quote!(DUMMY_SOURCE),
1547 ),
1548
1549 DebugInstantiate::Finalized(finalized) => {
1550 (finalized.sink.clone(), finalized.source.clone())
1551 }
1552 };
1553
1554 graph_builders.create_external_output(
1555 &input.metadata().location_id,
1556 sink_expr,
1557 &input_ident,
1558 serialize_fn.as_ref(),
1559 stmt_id,
1560 );
1561 }
1562 BuildersOrCallback::Callback(leaf_callback, _) => {
1563 leaf_callback(self, next_stmt_id);
1564 }
1565 }
1566 }
1567
1568 HydroRoot::DestSink { sink, input, .. } => {
1569 let input_ident = input.emit_core(
1570 builders_or_callback,
1571 seen_tees,
1572 built_tees,
1573 next_stmt_id,
1574 fold_hooked_idents,
1575 );
1576
1577 let stmt_id = next_stmt_id.get_and_increment();
1578
1579 match builders_or_callback {
1580 BuildersOrCallback::Builders(graph_builders) => {
1581 graph_builders
1582 .get_dfir_mut(&input.metadata().location_id)
1583 .add_dfir(
1584 parse_quote! {
1585 #input_ident -> dest_sink(#sink);
1586 },
1587 None,
1588 Some(&stmt_id.to_string()),
1589 );
1590 }
1591 BuildersOrCallback::Callback(leaf_callback, _) => {
1592 leaf_callback(self, next_stmt_id);
1593 }
1594 }
1595 }
1596
1597 HydroRoot::CycleSink {
1598 cycle_id, input, ..
1599 } => {
1600 let input_ident = input.emit_core(
1601 builders_or_callback,
1602 seen_tees,
1603 built_tees,
1604 next_stmt_id,
1605 fold_hooked_idents,
1606 );
1607
1608 match builders_or_callback {
1609 BuildersOrCallback::Builders(graph_builders) => {
1610 let elem_type: syn::Type = match &input.metadata().collection_kind {
1611 CollectionKind::KeyedSingleton {
1612 key_type,
1613 value_type,
1614 ..
1615 }
1616 | CollectionKind::KeyedStream {
1617 key_type,
1618 value_type,
1619 ..
1620 } => {
1621 parse_quote!((#key_type, #value_type))
1622 }
1623 CollectionKind::Stream { element_type, .. }
1624 | CollectionKind::Singleton { element_type, .. }
1625 | CollectionKind::Optional { element_type, .. } => {
1626 parse_quote!(#element_type)
1627 }
1628 };
1629
1630 let cycle_id_ident = cycle_id.as_ident();
1631 graph_builders
1632 .get_dfir_mut(&input.metadata().location_id)
1633 .add_dfir(
1634 parse_quote! {
1635 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1636 },
1637 None,
1638 None,
1639 );
1640 }
1641 BuildersOrCallback::Callback(_, _) => {}
1643 }
1644 }
1645
1646 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1647 let input_ident = input.emit_core(
1648 builders_or_callback,
1649 seen_tees,
1650 built_tees,
1651 next_stmt_id,
1652 fold_hooked_idents,
1653 );
1654
1655 let stmt_id = next_stmt_id.get_and_increment();
1656
1657 match builders_or_callback {
1658 BuildersOrCallback::Builders(graph_builders) => {
1659 graph_builders
1660 .get_dfir_mut(&input.metadata().location_id)
1661 .add_dfir(
1662 parse_quote! {
1663 #input_ident -> for_each(&mut #ident);
1664 },
1665 None,
1666 Some(&stmt_id.to_string()),
1667 );
1668 }
1669 BuildersOrCallback::Callback(leaf_callback, _) => {
1670 leaf_callback(self, next_stmt_id);
1671 }
1672 }
1673 }
1674
1675 HydroRoot::Null { input, .. } => {
1676 let input_ident = input.emit_core(
1677 builders_or_callback,
1678 seen_tees,
1679 built_tees,
1680 next_stmt_id,
1681 fold_hooked_idents,
1682 );
1683
1684 let stmt_id = next_stmt_id.get_and_increment();
1685
1686 match builders_or_callback {
1687 BuildersOrCallback::Builders(graph_builders) => {
1688 graph_builders
1689 .get_dfir_mut(&input.metadata().location_id)
1690 .add_dfir(
1691 parse_quote! {
1692 #input_ident -> for_each(|_| {});
1693 },
1694 None,
1695 Some(&stmt_id.to_string()),
1696 );
1697 }
1698 BuildersOrCallback::Callback(leaf_callback, _) => {
1699 leaf_callback(self, next_stmt_id);
1700 }
1701 }
1702 }
1703 }
1704 }
1705
1706 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1707 match self {
1708 HydroRoot::ForEach { op_metadata, .. }
1709 | HydroRoot::SendExternal { op_metadata, .. }
1710 | HydroRoot::DestSink { op_metadata, .. }
1711 | HydroRoot::CycleSink { op_metadata, .. }
1712 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1713 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1714 }
1715 }
1716
1717 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1718 match self {
1719 HydroRoot::ForEach { op_metadata, .. }
1720 | HydroRoot::SendExternal { op_metadata, .. }
1721 | HydroRoot::DestSink { op_metadata, .. }
1722 | HydroRoot::CycleSink { op_metadata, .. }
1723 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1724 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1725 }
1726 }
1727
1728 pub fn input(&self) -> &HydroNode {
1729 match self {
1730 HydroRoot::ForEach { input, .. }
1731 | HydroRoot::SendExternal { input, .. }
1732 | HydroRoot::DestSink { input, .. }
1733 | HydroRoot::CycleSink { input, .. }
1734 | HydroRoot::EmbeddedOutput { input, .. }
1735 | HydroRoot::Null { input, .. } => input,
1736 }
1737 }
1738
1739 pub fn input_metadata(&self) -> &HydroIrMetadata {
1740 self.input().metadata()
1741 }
1742
1743 pub fn print_root(&self) -> String {
1744 match self {
1745 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1746 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1747 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1748 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1749 HydroRoot::EmbeddedOutput { ident, .. } => {
1750 format!("EmbeddedOutput({})", ident)
1751 }
1752 HydroRoot::Null { .. } => "Null".to_owned(),
1753 }
1754 }
1755
1756 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1757 match self {
1758 HydroRoot::ForEach { f, .. } => {
1759 transform(&mut f.expr);
1760 }
1761 HydroRoot::DestSink { sink, .. } => {
1762 transform(sink);
1763 }
1764 HydroRoot::SendExternal { .. }
1765 | HydroRoot::CycleSink { .. }
1766 | HydroRoot::EmbeddedOutput { .. }
1767 | HydroRoot::Null { .. } => {}
1768 }
1769 }
1770}
1771
1772#[cfg(feature = "build")]
1773fn tick_of(loc: &LocationId) -> Option<ClockId> {
1774 match loc {
1775 LocationId::Tick(id, _) => Some(*id),
1776 LocationId::Atomic(inner) => tick_of(inner),
1777 _ => None,
1778 }
1779}
1780
1781#[cfg(feature = "build")]
1782fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1783 match loc {
1784 LocationId::Tick(id, inner) => {
1785 *id = uf_find(uf, *id);
1786 remap_location(inner, uf);
1787 }
1788 LocationId::Atomic(inner) => {
1789 remap_location(inner, uf);
1790 }
1791 LocationId::Process(_) | LocationId::Cluster(_) => {}
1792 }
1793}
1794
1795#[cfg(feature = "build")]
1796fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1797 let p = *parent.get(&x).unwrap_or(&x);
1798 if p == x {
1799 return x;
1800 }
1801 let root = uf_find(parent, p);
1802 parent.insert(x, root);
1803 root
1804}
1805
1806#[cfg(feature = "build")]
1807fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1808 let ra = uf_find(parent, a);
1809 let rb = uf_find(parent, b);
1810 if ra != rb {
1811 parent.insert(ra, rb);
1812 }
1813}
1814
1815#[cfg(feature = "build")]
1819pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1820 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1821
1822 transform_bottom_up(
1824 ir,
1825 &mut |_| {},
1826 &mut |node: &mut HydroNode| match node {
1827 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1828 if let (Some(a), Some(b)) = (
1829 tick_of(&inner.metadata().location_id),
1830 tick_of(&metadata.location_id),
1831 ) {
1832 uf_union(&mut uf, a, b);
1833 }
1834 }
1835 HydroNode::Chain {
1836 first,
1837 second,
1838 metadata,
1839 }
1840 | HydroNode::ChainFirst {
1841 first,
1842 second,
1843 metadata,
1844 }
1845 | HydroNode::MergeOrdered {
1846 first,
1847 second,
1848 metadata,
1849 } => {
1850 if let (Some(a), Some(b)) = (
1851 tick_of(&first.metadata().location_id),
1852 tick_of(&metadata.location_id),
1853 ) {
1854 uf_union(&mut uf, a, b);
1855 }
1856 if let (Some(a), Some(b)) = (
1857 tick_of(&second.metadata().location_id),
1858 tick_of(&metadata.location_id),
1859 ) {
1860 uf_union(&mut uf, a, b);
1861 }
1862 }
1863 _ => {}
1864 },
1865 false,
1866 );
1867
1868 transform_bottom_up(
1870 ir,
1871 &mut |_| {},
1872 &mut |node: &mut HydroNode| {
1873 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1874 },
1875 false,
1876 );
1877}
1878
1879#[cfg(feature = "build")]
1880pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1881 let mut builders = SecondaryMap::new();
1882 let mut seen_tees = HashMap::new();
1883 let mut built_tees = HashMap::new();
1884 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1885 let mut fold_hooked_idents = HashSet::new();
1886 for leaf in ir {
1887 leaf.emit(
1888 &mut builders,
1889 &mut seen_tees,
1890 &mut built_tees,
1891 &mut next_stmt_id,
1892 &mut fold_hooked_idents,
1893 );
1894 }
1895 builders
1896}
1897
1898#[cfg(feature = "build")]
1899pub fn traverse_dfir(
1900 ir: &mut [HydroRoot],
1901 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1902 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1903) {
1904 let mut seen_tees = HashMap::new();
1905 let mut built_tees = HashMap::new();
1906 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1907 let mut fold_hooked_idents = HashSet::new();
1908 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1909 ir.iter_mut().for_each(|leaf| {
1910 leaf.emit_core(
1911 &mut callback,
1912 &mut seen_tees,
1913 &mut built_tees,
1914 &mut next_stmt_id,
1915 &mut fold_hooked_idents,
1916 );
1917 });
1918}
1919
1920pub fn transform_bottom_up(
1921 ir: &mut [HydroRoot],
1922 transform_root: &mut impl FnMut(&mut HydroRoot),
1923 transform_node: &mut impl FnMut(&mut HydroNode),
1924 check_well_formed: bool,
1925) {
1926 let mut seen_tees = HashMap::new();
1927 ir.iter_mut().for_each(|leaf| {
1928 leaf.transform_bottom_up(
1929 transform_root,
1930 transform_node,
1931 &mut seen_tees,
1932 check_well_formed,
1933 );
1934 });
1935}
1936
1937pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1938 let mut seen_tees = HashMap::new();
1939 ir.iter()
1940 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1941 .collect()
1942}
1943
1944type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1945thread_local! {
1946 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1947 static SERIALIZED_SHARED: PrintedTees
1951 = const { RefCell::new(None) };
1952}
1953
1954pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1955 PRINTED_TEES.with(|printed_tees| {
1956 let mut printed_tees_mut = printed_tees.borrow_mut();
1957 *printed_tees_mut = Some((0, HashMap::new()));
1958 drop(printed_tees_mut);
1959
1960 let ret = f();
1961
1962 let mut printed_tees_mut = printed_tees.borrow_mut();
1963 *printed_tees_mut = None;
1964
1965 ret
1966 })
1967}
1968
1969pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1974 let _guard = SerializedSharedGuard::enter();
1975 f()
1976}
1977
1978struct SerializedSharedGuard {
1981 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1982}
1983
1984impl SerializedSharedGuard {
1985 fn enter() -> Self {
1986 let previous = SERIALIZED_SHARED.with(|cell| {
1987 let mut guard = cell.borrow_mut();
1988 guard.replace((0, HashMap::new()))
1989 });
1990 Self { previous }
1991 }
1992}
1993
1994impl Drop for SerializedSharedGuard {
1995 fn drop(&mut self) {
1996 SERIALIZED_SHARED.with(|cell| {
1997 *cell.borrow_mut() = self.previous.take();
1998 });
1999 }
2000}
2001
2002pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2003
2004impl serde::Serialize for SharedNode {
2005 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2016 SERIALIZED_SHARED.with(|cell| {
2017 let mut guard = cell.borrow_mut();
2018 let state = guard.as_mut().ok_or_else(|| {
2020 serde::ser::Error::custom(
2021 "SharedNode serialization requires an active serialize_dedup_shared scope",
2022 )
2023 })?;
2024 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2025
2026 if let Some(&id) = state.1.get(&ptr) {
2027 drop(guard);
2028 use serde::ser::SerializeMap;
2029 let mut map = serializer.serialize_map(Some(1))?;
2030 map.serialize_entry("$shared_ref", &id)?;
2031 map.end()
2032 } else {
2033 let id = state.0;
2034 state.0 += 1;
2035 state.1.insert(ptr, id);
2036 drop(guard);
2037
2038 use serde::ser::SerializeMap;
2039 let mut map = serializer.serialize_map(Some(2))?;
2040 map.serialize_entry("$shared", &id)?;
2041 map.serialize_entry("node", &*self.0.borrow())?;
2042 map.end()
2043 }
2044 })
2045 }
2046}
2047
2048impl SharedNode {
2049 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2050 Rc::as_ptr(&self.0)
2051 }
2052}
2053
2054impl Debug for SharedNode {
2055 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2056 PRINTED_TEES.with(|printed_tees| {
2057 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2058 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2059
2060 if let Some(printed_tees_mut) = printed_tees_mut {
2061 if let Some(existing) = printed_tees_mut
2062 .1
2063 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2064 {
2065 write!(f, "<shared {}>", existing)
2066 } else {
2067 let next_id = printed_tees_mut.0;
2068 printed_tees_mut.0 += 1;
2069 printed_tees_mut
2070 .1
2071 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2072 drop(printed_tees_mut_borrow);
2073 write!(f, "<shared {}>: ", next_id)?;
2074 Debug::fmt(&self.0.borrow(), f)
2075 }
2076 } else {
2077 drop(printed_tees_mut_borrow);
2078 write!(f, "<shared>: ")?;
2079 Debug::fmt(&self.0.borrow(), f)
2080 }
2081 })
2082 }
2083}
2084
2085impl Hash for SharedNode {
2086 fn hash<H: Hasher>(&self, state: &mut H) {
2087 self.0.borrow_mut().hash(state);
2088 }
2089}
2090
2091#[derive(Debug)]
2096pub enum AccessCounter {
2097 Counting(Cell<u32>),
2098 Frozen(u32),
2099}
2100
2101impl AccessCounter {
2102 pub fn new() -> Self {
2103 Self::Counting(Cell::new(0))
2104 }
2105
2106 pub fn next_group(&self, is_mut: bool) -> Self {
2110 let AccessCounter::Counting(count) = self else {
2111 panic!("Cannot count on `AccessCounter::Frozen`");
2112 };
2113 let c = if is_mut {
2114 let c = count.get() + 1;
2115 count.set(c + 1);
2116 c
2117 } else {
2118 count.get()
2119 };
2120 Self::Frozen(c)
2121 }
2122
2123 pub fn freeze(&self) -> Self {
2125 Self::Frozen(match self {
2126 Self::Counting(count) => count.get(),
2127 Self::Frozen(count) => *count,
2128 })
2129 }
2130
2131 pub fn frozen_group(&self) -> u32 {
2132 let Self::Frozen(count) = self else {
2133 panic!("`AccessCounter` not frozen");
2134 };
2135 *count
2136 }
2137}
2138
2139impl Default for AccessCounter {
2140 fn default() -> Self {
2141 Self::new()
2142 }
2143}
2144
2145impl Hash for AccessCounter {
2146 fn hash<H: Hasher>(&self, _state: &mut H) {
2147 }
2149}
2150
2151impl serde::Serialize for AccessCounter {
2152 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2153 let count = match self {
2154 AccessCounter::Counting(count) => count.get(),
2155 AccessCounter::Frozen(count) => *count,
2156 };
2157 count.serialize(serializer)
2158 }
2159}
2160
2161#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2162pub enum BoundKind {
2163 Unbounded,
2164 Bounded,
2165}
2166
2167#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2168pub enum StreamOrder {
2169 NoOrder,
2170 TotalOrder,
2171}
2172
2173#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2174pub enum StreamRetry {
2175 AtLeastOnce,
2176 ExactlyOnce,
2177}
2178
2179#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2180pub enum KeyedSingletonBoundKind {
2181 Unbounded,
2182 MonotonicKeys,
2183 MonotonicValue,
2184 BoundedValue,
2185 Bounded,
2186}
2187
2188#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2189pub enum SingletonBoundKind {
2190 Unbounded,
2191 Monotonic,
2192 Bounded,
2193}
2194
2195#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2196pub enum CollectionKind {
2197 Stream {
2198 bound: BoundKind,
2199 order: StreamOrder,
2200 retry: StreamRetry,
2201 element_type: DebugType,
2202 },
2203 Singleton {
2204 bound: SingletonBoundKind,
2205 element_type: DebugType,
2206 },
2207 Optional {
2208 bound: BoundKind,
2209 element_type: DebugType,
2210 },
2211 KeyedStream {
2212 bound: BoundKind,
2213 value_order: StreamOrder,
2214 value_retry: StreamRetry,
2215 key_type: DebugType,
2216 value_type: DebugType,
2217 },
2218 KeyedSingleton {
2219 bound: KeyedSingletonBoundKind,
2220 key_type: DebugType,
2221 value_type: DebugType,
2222 },
2223}
2224
2225impl CollectionKind {
2226 pub fn is_bounded(&self) -> bool {
2227 matches!(
2228 self,
2229 CollectionKind::Stream {
2230 bound: BoundKind::Bounded,
2231 ..
2232 } | CollectionKind::Singleton {
2233 bound: SingletonBoundKind::Bounded,
2234 ..
2235 } | CollectionKind::Optional {
2236 bound: BoundKind::Bounded,
2237 ..
2238 } | CollectionKind::KeyedStream {
2239 bound: BoundKind::Bounded,
2240 ..
2241 } | CollectionKind::KeyedSingleton {
2242 bound: KeyedSingletonBoundKind::Bounded,
2243 ..
2244 }
2245 )
2246 }
2247
2248 pub fn is_strict(&self) -> bool {
2251 match self {
2252 CollectionKind::Stream { order, retry, .. } => {
2253 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2254 }
2255 CollectionKind::KeyedStream {
2256 value_order,
2257 value_retry,
2258 ..
2259 } => {
2260 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2261 }
2262 CollectionKind::Singleton { .. }
2265 | CollectionKind::Optional { .. }
2266 | CollectionKind::KeyedSingleton { .. } => true,
2267 }
2268 }
2269
2270 pub fn strict_kind(&self) -> CollectionKind {
2272 match self {
2273 CollectionKind::Stream {
2274 bound,
2275 element_type,
2276 ..
2277 } => CollectionKind::Stream {
2278 bound: bound.clone(),
2279 order: StreamOrder::TotalOrder,
2280 retry: StreamRetry::ExactlyOnce,
2281 element_type: element_type.clone(),
2282 },
2283 CollectionKind::KeyedStream {
2284 bound,
2285 key_type,
2286 value_type,
2287 ..
2288 } => CollectionKind::KeyedStream {
2289 bound: bound.clone(),
2290 value_order: StreamOrder::TotalOrder,
2291 value_retry: StreamRetry::ExactlyOnce,
2292 key_type: key_type.clone(),
2293 value_type: value_type.clone(),
2294 },
2295 other => other.clone(),
2296 }
2297 }
2298}
2299
2300#[derive(Clone, serde::Serialize)]
2301pub struct HydroIrMetadata {
2302 pub location_id: LocationId,
2303 pub collection_kind: CollectionKind,
2304 pub consistency: Option<ClusterConsistency>,
2305 pub cardinality: Option<usize>,
2306 pub tag: Option<String>,
2307 pub op: HydroIrOpMetadata,
2308}
2309
2310impl Hash for HydroIrMetadata {
2312 fn hash<H: Hasher>(&self, _: &mut H) {}
2313}
2314
2315impl PartialEq for HydroIrMetadata {
2316 fn eq(&self, _: &Self) -> bool {
2317 true
2318 }
2319}
2320
2321impl Eq for HydroIrMetadata {}
2322
2323impl Debug for HydroIrMetadata {
2324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2325 f.debug_struct("HydroIrMetadata")
2326 .field("location_id", &self.location_id)
2327 .field("collection_kind", &self.collection_kind)
2328 .finish()
2329 }
2330}
2331
2332#[derive(Clone, serde::Serialize)]
2335pub struct HydroIrOpMetadata {
2336 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2337 pub backtrace: Backtrace,
2338 pub cpu_usage: Option<f64>,
2339 pub network_recv_cpu_usage: Option<f64>,
2340 pub id: Option<usize>,
2341}
2342
2343impl HydroIrOpMetadata {
2344 #[expect(
2345 clippy::new_without_default,
2346 reason = "explicit calls to new ensure correct backtrace bounds"
2347 )]
2348 pub fn new() -> HydroIrOpMetadata {
2349 Self::new_with_skip(1)
2350 }
2351
2352 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2353 HydroIrOpMetadata {
2354 backtrace: Backtrace::get_backtrace(2 + skip_count),
2355 cpu_usage: None,
2356 network_recv_cpu_usage: None,
2357 id: None,
2358 }
2359 }
2360}
2361
2362impl Debug for HydroIrOpMetadata {
2363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2364 f.debug_struct("HydroIrOpMetadata").finish()
2365 }
2366}
2367
2368impl Hash for HydroIrOpMetadata {
2369 fn hash<H: Hasher>(&self, _: &mut H) {}
2370}
2371
2372#[derive(Debug, Hash, serde::Serialize)]
2375pub enum HydroNode {
2376 Placeholder,
2377
2378 Cast {
2386 inner: Box<HydroNode>,
2387 metadata: HydroIrMetadata,
2388 },
2389
2390 ObserveNonDet {
2396 inner: Box<HydroNode>,
2397 trusted: bool, metadata: HydroIrMetadata,
2399 },
2400
2401 Source {
2402 source: HydroSource,
2403 metadata: HydroIrMetadata,
2404 },
2405
2406 SingletonSource {
2407 value: DebugExpr,
2408 first_tick_only: bool,
2409 metadata: HydroIrMetadata,
2410 },
2411
2412 CycleSource {
2413 cycle_id: CycleId,
2414 metadata: HydroIrMetadata,
2415 },
2416
2417 Tee {
2418 inner: SharedNode,
2419 metadata: HydroIrMetadata,
2420 },
2421
2422 Reference {
2431 inner: SharedNode,
2432 kind: crate::handoff_ref::HandoffRefKind,
2433 access_counter: AccessCounter,
2434 metadata: HydroIrMetadata,
2435 },
2436
2437 Partition {
2438 inner: SharedNode,
2439 f: ClosureExpr,
2440 is_true: bool,
2441 metadata: HydroIrMetadata,
2442 },
2443
2444 BeginAtomic {
2445 inner: Box<HydroNode>,
2446 metadata: HydroIrMetadata,
2447 },
2448
2449 EndAtomic {
2450 inner: Box<HydroNode>,
2451 metadata: HydroIrMetadata,
2452 },
2453
2454 Batch {
2455 inner: Box<HydroNode>,
2456 metadata: HydroIrMetadata,
2457 },
2458
2459 YieldConcat {
2460 inner: Box<HydroNode>,
2461 metadata: HydroIrMetadata,
2462 },
2463
2464 Chain {
2465 first: Box<HydroNode>,
2466 second: Box<HydroNode>,
2467 metadata: HydroIrMetadata,
2468 },
2469
2470 MergeOrdered {
2471 first: Box<HydroNode>,
2472 second: Box<HydroNode>,
2473 metadata: HydroIrMetadata,
2474 },
2475
2476 ChainFirst {
2477 first: Box<HydroNode>,
2478 second: Box<HydroNode>,
2479 metadata: HydroIrMetadata,
2480 },
2481
2482 CrossProduct {
2483 left: Box<HydroNode>,
2484 right: Box<HydroNode>,
2485 metadata: HydroIrMetadata,
2486 },
2487
2488 CrossSingleton {
2489 left: Box<HydroNode>,
2490 right: Box<HydroNode>,
2491 metadata: HydroIrMetadata,
2492 },
2493
2494 Join {
2495 left: Box<HydroNode>,
2496 right: Box<HydroNode>,
2497 metadata: HydroIrMetadata,
2498 },
2499
2500 JoinHalf {
2504 left: Box<HydroNode>,
2505 right: Box<HydroNode>,
2506 metadata: HydroIrMetadata,
2507 },
2508
2509 Difference {
2510 pos: Box<HydroNode>,
2511 neg: Box<HydroNode>,
2512 metadata: HydroIrMetadata,
2513 },
2514
2515 AntiJoin {
2516 pos: Box<HydroNode>,
2517 neg: Box<HydroNode>,
2518 metadata: HydroIrMetadata,
2519 },
2520
2521 ResolveFutures {
2522 input: Box<HydroNode>,
2523 metadata: HydroIrMetadata,
2524 },
2525 ResolveFuturesBlocking {
2526 input: Box<HydroNode>,
2527 metadata: HydroIrMetadata,
2528 },
2529 ResolveFuturesOrdered {
2530 input: Box<HydroNode>,
2531 metadata: HydroIrMetadata,
2532 },
2533
2534 Map {
2535 f: ClosureExpr,
2536 input: Box<HydroNode>,
2537 metadata: HydroIrMetadata,
2538 },
2539 FlatMap {
2540 f: ClosureExpr,
2541 input: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544 FlatMapStreamBlocking {
2545 f: ClosureExpr,
2546 input: Box<HydroNode>,
2547 metadata: HydroIrMetadata,
2548 },
2549 Filter {
2550 f: ClosureExpr,
2551 input: Box<HydroNode>,
2552 metadata: HydroIrMetadata,
2553 },
2554 FilterMap {
2555 f: ClosureExpr,
2556 input: Box<HydroNode>,
2557 metadata: HydroIrMetadata,
2558 },
2559
2560 DeferTick {
2561 input: Box<HydroNode>,
2562 metadata: HydroIrMetadata,
2563 },
2564 Enumerate {
2565 input: Box<HydroNode>,
2566 metadata: HydroIrMetadata,
2567 },
2568 Inspect {
2569 f: ClosureExpr,
2570 input: Box<HydroNode>,
2571 metadata: HydroIrMetadata,
2572 },
2573
2574 Unique {
2575 input: Box<HydroNode>,
2576 metadata: HydroIrMetadata,
2577 },
2578
2579 Sort {
2580 input: Box<HydroNode>,
2581 metadata: HydroIrMetadata,
2582 },
2583 Fold {
2584 init: ClosureExpr,
2585 acc: ClosureExpr,
2586 input: Box<HydroNode>,
2587 metadata: HydroIrMetadata,
2588 },
2589
2590 Scan {
2591 init: ClosureExpr,
2592 acc: ClosureExpr,
2593 input: Box<HydroNode>,
2594 metadata: HydroIrMetadata,
2595 },
2596 ScanAsyncBlocking {
2597 init: ClosureExpr,
2598 acc: ClosureExpr,
2599 input: Box<HydroNode>,
2600 metadata: HydroIrMetadata,
2601 },
2602 FoldKeyed {
2603 init: ClosureExpr,
2604 acc: ClosureExpr,
2605 input: Box<HydroNode>,
2606 metadata: HydroIrMetadata,
2607 },
2608
2609 Reduce {
2610 f: ClosureExpr,
2611 input: Box<HydroNode>,
2612 metadata: HydroIrMetadata,
2613 },
2614 ReduceKeyed {
2615 f: ClosureExpr,
2616 input: Box<HydroNode>,
2617 metadata: HydroIrMetadata,
2618 },
2619 ReduceKeyedWatermark {
2620 f: ClosureExpr,
2621 input: Box<HydroNode>,
2622 watermark: Box<HydroNode>,
2623 metadata: HydroIrMetadata,
2624 },
2625
2626 Network {
2627 name: Option<String>,
2628 networking_info: crate::networking::NetworkingInfo,
2629 serialize_fn: Option<DebugExpr>,
2630 instantiate_fn: DebugInstantiate,
2631 deserialize_fn: Option<DebugExpr>,
2632 input: Box<HydroNode>,
2633 metadata: HydroIrMetadata,
2634 },
2635
2636 ExternalInput {
2637 from_external_key: LocationKey,
2638 from_port_id: ExternalPortId,
2639 from_many: bool,
2640 codec_type: DebugType,
2641 #[serde(skip)]
2642 port_hint: NetworkHint,
2643 instantiate_fn: DebugInstantiate,
2644 deserialize_fn: Option<DebugExpr>,
2645 metadata: HydroIrMetadata,
2646 },
2647
2648 Counter {
2649 tag: String,
2650 duration: DebugExpr,
2651 prefix: String,
2652 input: Box<HydroNode>,
2653 metadata: HydroIrMetadata,
2654 },
2655
2656 AssertIsConsistent {
2657 inner: Box<HydroNode>,
2658 trusted: bool,
2659 metadata: HydroIrMetadata,
2660 },
2661
2662 UnboundSingleton {
2663 inner: Box<HydroNode>,
2664 metadata: HydroIrMetadata,
2665 },
2666}
2667
2668pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2669pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2670
2671#[cfg(feature = "build")]
2675fn maybe_observe_for_mut(
2676 f: &ClosureExpr,
2677 in_ident: syn::Ident,
2678 in_location: &LocationId,
2679 in_kind: &CollectionKind,
2680 op_meta: &HydroIrOpMetadata,
2681 builders_or_callback: &mut BuildersOrCallback<
2682 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2683 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2684 >,
2685 next_stmt_id: &mut crate::Counter<StmtId>,
2686) -> syn::Ident {
2687 if f.has_mut_ref() && !in_kind.is_strict() {
2688 let observe_stmt_id = next_stmt_id.get_and_increment();
2689 let observe_ident =
2690 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2691 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2692 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2693 }
2694 observe_ident
2695 } else {
2696 in_ident
2697 }
2698}
2699
2700impl HydroNode {
2701 pub fn transform_bottom_up(
2702 &mut self,
2703 transform: &mut impl FnMut(&mut HydroNode),
2704 seen_tees: &mut SeenSharedNodes,
2705 check_well_formed: bool,
2706 ) {
2707 self.transform_children(
2708 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2709 seen_tees,
2710 );
2711
2712 transform(self);
2713
2714 let self_location = self.metadata().location_id.root();
2715
2716 if check_well_formed {
2717 match &*self {
2718 HydroNode::Network { .. } => {}
2719 _ => {
2720 self.input_metadata().iter().for_each(|i| {
2721 if i.location_id.root() != self_location {
2722 panic!(
2723 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2724 i,
2725 i.location_id.root(),
2726 self,
2727 self_location
2728 )
2729 }
2730 });
2731 }
2732 }
2733 }
2734 }
2735
2736 #[inline(always)]
2737 pub fn transform_children(
2738 &mut self,
2739 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2740 seen_tees: &mut SeenSharedNodes,
2741 ) {
2742 match self {
2743 HydroNode::Placeholder => {
2744 panic!();
2745 }
2746
2747 HydroNode::Source { .. }
2748 | HydroNode::SingletonSource { .. }
2749 | HydroNode::CycleSource { .. }
2750 | HydroNode::ExternalInput { .. } => {}
2751
2752 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2753 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2754 *inner = SharedNode(transformed.clone());
2755 } else {
2756 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2757 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2758 let mut orig = inner.0.replace(HydroNode::Placeholder);
2759 transform(&mut orig, seen_tees);
2760 *transformed_cell.borrow_mut() = orig;
2761 *inner = SharedNode(transformed_cell);
2762 }
2763 }
2764
2765 HydroNode::Partition { inner, f, .. } => {
2766 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2767 *inner = SharedNode(transformed.clone());
2768 } else {
2769 f.transform_children(&mut transform, seen_tees);
2770 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2771 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2772 let mut orig = inner.0.replace(HydroNode::Placeholder);
2773 transform(&mut orig, seen_tees);
2774 *transformed_cell.borrow_mut() = orig;
2775 *inner = SharedNode(transformed_cell);
2776 }
2777 }
2778
2779 HydroNode::Cast { inner, .. }
2780 | HydroNode::ObserveNonDet { inner, .. }
2781 | HydroNode::BeginAtomic { inner, .. }
2782 | HydroNode::EndAtomic { inner, .. }
2783 | HydroNode::Batch { inner, .. }
2784 | HydroNode::YieldConcat { inner, .. }
2785 | HydroNode::UnboundSingleton { inner, .. }
2786 | HydroNode::AssertIsConsistent { inner, .. } => {
2787 transform(inner.as_mut(), seen_tees);
2788 }
2789
2790 HydroNode::Chain { first, second, .. } => {
2791 transform(first.as_mut(), seen_tees);
2792 transform(second.as_mut(), seen_tees);
2793 }
2794
2795 HydroNode::MergeOrdered { first, second, .. } => {
2796 transform(first.as_mut(), seen_tees);
2797 transform(second.as_mut(), seen_tees);
2798 }
2799
2800 HydroNode::ChainFirst { first, second, .. } => {
2801 transform(first.as_mut(), seen_tees);
2802 transform(second.as_mut(), seen_tees);
2803 }
2804
2805 HydroNode::CrossSingleton { left, right, .. }
2806 | HydroNode::CrossProduct { left, right, .. }
2807 | HydroNode::Join { left, right, .. }
2808 | HydroNode::JoinHalf { left, right, .. } => {
2809 transform(left.as_mut(), seen_tees);
2810 transform(right.as_mut(), seen_tees);
2811 }
2812
2813 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2814 transform(pos.as_mut(), seen_tees);
2815 transform(neg.as_mut(), seen_tees);
2816 }
2817
2818 HydroNode::Map { f, input, .. } => {
2819 f.transform_children(&mut transform, seen_tees);
2820 transform(input.as_mut(), seen_tees);
2821 }
2822 HydroNode::FlatMap { f, input, .. }
2823 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2824 | HydroNode::Filter { f, input, .. }
2825 | HydroNode::FilterMap { f, input, .. }
2826 | HydroNode::Inspect { f, input, .. }
2827 | HydroNode::Reduce { f, input, .. }
2828 | HydroNode::ReduceKeyed { f, input, .. } => {
2829 f.transform_children(&mut transform, seen_tees);
2830 transform(input.as_mut(), seen_tees);
2831 }
2832 HydroNode::ReduceKeyedWatermark {
2833 f,
2834 input,
2835 watermark,
2836 ..
2837 } => {
2838 f.transform_children(&mut transform, seen_tees);
2839 transform(input.as_mut(), seen_tees);
2840 transform(watermark.as_mut(), seen_tees);
2841 }
2842 HydroNode::Fold {
2843 init, acc, input, ..
2844 }
2845 | HydroNode::Scan {
2846 init, acc, input, ..
2847 }
2848 | HydroNode::ScanAsyncBlocking {
2849 init, acc, input, ..
2850 }
2851 | HydroNode::FoldKeyed {
2852 init, acc, input, ..
2853 } => {
2854 init.transform_children(&mut transform, seen_tees);
2855 acc.transform_children(&mut transform, seen_tees);
2856 transform(input.as_mut(), seen_tees);
2857 }
2858 HydroNode::ResolveFutures { input, .. }
2859 | HydroNode::ResolveFuturesBlocking { input, .. }
2860 | HydroNode::ResolveFuturesOrdered { input, .. }
2861 | HydroNode::Sort { input, .. }
2862 | HydroNode::DeferTick { input, .. }
2863 | HydroNode::Enumerate { input, .. }
2864 | HydroNode::Unique { input, .. }
2865 | HydroNode::Network { input, .. }
2866 | HydroNode::Counter { input, .. } => {
2867 transform(input.as_mut(), seen_tees);
2868 }
2869 }
2870 }
2871
2872 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2873 match self {
2874 HydroNode::Placeholder => HydroNode::Placeholder,
2875 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2876 inner: Box::new(inner.deep_clone(seen_tees)),
2877 metadata: metadata.clone(),
2878 },
2879 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2880 inner: Box::new(inner.deep_clone(seen_tees)),
2881 metadata: metadata.clone(),
2882 },
2883 HydroNode::ObserveNonDet {
2884 inner,
2885 trusted,
2886 metadata,
2887 } => HydroNode::ObserveNonDet {
2888 inner: Box::new(inner.deep_clone(seen_tees)),
2889 trusted: *trusted,
2890 metadata: metadata.clone(),
2891 },
2892 HydroNode::AssertIsConsistent {
2893 inner,
2894 trusted,
2895 metadata,
2896 } => HydroNode::AssertIsConsistent {
2897 inner: Box::new(inner.deep_clone(seen_tees)),
2898 trusted: *trusted,
2899 metadata: metadata.clone(),
2900 },
2901 HydroNode::Source { source, metadata } => HydroNode::Source {
2902 source: source.clone(),
2903 metadata: metadata.clone(),
2904 },
2905 HydroNode::SingletonSource {
2906 value,
2907 first_tick_only,
2908 metadata,
2909 } => HydroNode::SingletonSource {
2910 value: value.clone(),
2911 first_tick_only: *first_tick_only,
2912 metadata: metadata.clone(),
2913 },
2914 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2915 cycle_id: *cycle_id,
2916 metadata: metadata.clone(),
2917 },
2918 HydroNode::Tee { inner, metadata }
2919 | HydroNode::Reference {
2920 inner, metadata, ..
2921 } => {
2922 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2923 SharedNode(transformed.clone())
2924 } else {
2925 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2926 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2927 let cloned = inner.0.borrow().deep_clone(seen_tees);
2928 *new_rc.borrow_mut() = cloned;
2929 SharedNode(new_rc)
2930 };
2931 if let HydroNode::Reference {
2932 kind,
2933 access_counter,
2934 ..
2935 } = self
2936 {
2937 HydroNode::Reference {
2938 inner: cloned_inner,
2939 kind: *kind,
2940 access_counter: access_counter.freeze(),
2941 metadata: metadata.clone(),
2942 }
2943 } else {
2944 HydroNode::Tee {
2945 inner: cloned_inner,
2946 metadata: metadata.clone(),
2947 }
2948 }
2949 }
2950 HydroNode::Partition {
2951 inner,
2952 f,
2953 is_true,
2954 metadata,
2955 } => {
2956 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2957 HydroNode::Partition {
2958 inner: SharedNode(transformed.clone()),
2959 f: f.deep_clone(seen_tees),
2960 is_true: *is_true,
2961 metadata: metadata.clone(),
2962 }
2963 } else {
2964 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2965 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2966 let cloned = inner.0.borrow().deep_clone(seen_tees);
2967 *new_rc.borrow_mut() = cloned;
2968 HydroNode::Partition {
2969 inner: SharedNode(new_rc),
2970 f: f.deep_clone(seen_tees),
2971 is_true: *is_true,
2972 metadata: metadata.clone(),
2973 }
2974 }
2975 }
2976 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2977 inner: Box::new(inner.deep_clone(seen_tees)),
2978 metadata: metadata.clone(),
2979 },
2980 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2981 inner: Box::new(inner.deep_clone(seen_tees)),
2982 metadata: metadata.clone(),
2983 },
2984 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2985 inner: Box::new(inner.deep_clone(seen_tees)),
2986 metadata: metadata.clone(),
2987 },
2988 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2989 inner: Box::new(inner.deep_clone(seen_tees)),
2990 metadata: metadata.clone(),
2991 },
2992 HydroNode::Chain {
2993 first,
2994 second,
2995 metadata,
2996 } => HydroNode::Chain {
2997 first: Box::new(first.deep_clone(seen_tees)),
2998 second: Box::new(second.deep_clone(seen_tees)),
2999 metadata: metadata.clone(),
3000 },
3001 HydroNode::MergeOrdered {
3002 first,
3003 second,
3004 metadata,
3005 } => HydroNode::MergeOrdered {
3006 first: Box::new(first.deep_clone(seen_tees)),
3007 second: Box::new(second.deep_clone(seen_tees)),
3008 metadata: metadata.clone(),
3009 },
3010 HydroNode::ChainFirst {
3011 first,
3012 second,
3013 metadata,
3014 } => HydroNode::ChainFirst {
3015 first: Box::new(first.deep_clone(seen_tees)),
3016 second: Box::new(second.deep_clone(seen_tees)),
3017 metadata: metadata.clone(),
3018 },
3019 HydroNode::CrossProduct {
3020 left,
3021 right,
3022 metadata,
3023 } => HydroNode::CrossProduct {
3024 left: Box::new(left.deep_clone(seen_tees)),
3025 right: Box::new(right.deep_clone(seen_tees)),
3026 metadata: metadata.clone(),
3027 },
3028 HydroNode::CrossSingleton {
3029 left,
3030 right,
3031 metadata,
3032 } => HydroNode::CrossSingleton {
3033 left: Box::new(left.deep_clone(seen_tees)),
3034 right: Box::new(right.deep_clone(seen_tees)),
3035 metadata: metadata.clone(),
3036 },
3037 HydroNode::Join {
3038 left,
3039 right,
3040 metadata,
3041 } => HydroNode::Join {
3042 left: Box::new(left.deep_clone(seen_tees)),
3043 right: Box::new(right.deep_clone(seen_tees)),
3044 metadata: metadata.clone(),
3045 },
3046 HydroNode::JoinHalf {
3047 left,
3048 right,
3049 metadata,
3050 } => HydroNode::JoinHalf {
3051 left: Box::new(left.deep_clone(seen_tees)),
3052 right: Box::new(right.deep_clone(seen_tees)),
3053 metadata: metadata.clone(),
3054 },
3055 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3056 pos: Box::new(pos.deep_clone(seen_tees)),
3057 neg: Box::new(neg.deep_clone(seen_tees)),
3058 metadata: metadata.clone(),
3059 },
3060 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3061 pos: Box::new(pos.deep_clone(seen_tees)),
3062 neg: Box::new(neg.deep_clone(seen_tees)),
3063 metadata: metadata.clone(),
3064 },
3065 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3066 input: Box::new(input.deep_clone(seen_tees)),
3067 metadata: metadata.clone(),
3068 },
3069 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3070 HydroNode::ResolveFuturesBlocking {
3071 input: Box::new(input.deep_clone(seen_tees)),
3072 metadata: metadata.clone(),
3073 }
3074 }
3075 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3076 HydroNode::ResolveFuturesOrdered {
3077 input: Box::new(input.deep_clone(seen_tees)),
3078 metadata: metadata.clone(),
3079 }
3080 }
3081 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3082 f: f.deep_clone(seen_tees),
3083 input: Box::new(input.deep_clone(seen_tees)),
3084 metadata: metadata.clone(),
3085 },
3086 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3087 f: f.deep_clone(seen_tees),
3088 input: Box::new(input.deep_clone(seen_tees)),
3089 metadata: metadata.clone(),
3090 },
3091 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3092 HydroNode::FlatMapStreamBlocking {
3093 f: f.deep_clone(seen_tees),
3094 input: Box::new(input.deep_clone(seen_tees)),
3095 metadata: metadata.clone(),
3096 }
3097 }
3098 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3099 f: f.deep_clone(seen_tees),
3100 input: Box::new(input.deep_clone(seen_tees)),
3101 metadata: metadata.clone(),
3102 },
3103 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3104 f: f.deep_clone(seen_tees),
3105 input: Box::new(input.deep_clone(seen_tees)),
3106 metadata: metadata.clone(),
3107 },
3108 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3109 input: Box::new(input.deep_clone(seen_tees)),
3110 metadata: metadata.clone(),
3111 },
3112 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3113 input: Box::new(input.deep_clone(seen_tees)),
3114 metadata: metadata.clone(),
3115 },
3116 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3117 f: f.deep_clone(seen_tees),
3118 input: Box::new(input.deep_clone(seen_tees)),
3119 metadata: metadata.clone(),
3120 },
3121 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3122 input: Box::new(input.deep_clone(seen_tees)),
3123 metadata: metadata.clone(),
3124 },
3125 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3126 input: Box::new(input.deep_clone(seen_tees)),
3127 metadata: metadata.clone(),
3128 },
3129 HydroNode::Fold {
3130 init,
3131 acc,
3132 input,
3133 metadata,
3134 } => HydroNode::Fold {
3135 init: init.deep_clone(seen_tees),
3136 acc: acc.deep_clone(seen_tees),
3137 input: Box::new(input.deep_clone(seen_tees)),
3138 metadata: metadata.clone(),
3139 },
3140 HydroNode::Scan {
3141 init,
3142 acc,
3143 input,
3144 metadata,
3145 } => HydroNode::Scan {
3146 init: init.deep_clone(seen_tees),
3147 acc: acc.deep_clone(seen_tees),
3148 input: Box::new(input.deep_clone(seen_tees)),
3149 metadata: metadata.clone(),
3150 },
3151 HydroNode::ScanAsyncBlocking {
3152 init,
3153 acc,
3154 input,
3155 metadata,
3156 } => HydroNode::ScanAsyncBlocking {
3157 init: init.deep_clone(seen_tees),
3158 acc: acc.deep_clone(seen_tees),
3159 input: Box::new(input.deep_clone(seen_tees)),
3160 metadata: metadata.clone(),
3161 },
3162 HydroNode::FoldKeyed {
3163 init,
3164 acc,
3165 input,
3166 metadata,
3167 } => HydroNode::FoldKeyed {
3168 init: init.deep_clone(seen_tees),
3169 acc: acc.deep_clone(seen_tees),
3170 input: Box::new(input.deep_clone(seen_tees)),
3171 metadata: metadata.clone(),
3172 },
3173 HydroNode::ReduceKeyedWatermark {
3174 f,
3175 input,
3176 watermark,
3177 metadata,
3178 } => HydroNode::ReduceKeyedWatermark {
3179 f: f.deep_clone(seen_tees),
3180 input: Box::new(input.deep_clone(seen_tees)),
3181 watermark: Box::new(watermark.deep_clone(seen_tees)),
3182 metadata: metadata.clone(),
3183 },
3184 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3185 f: f.deep_clone(seen_tees),
3186 input: Box::new(input.deep_clone(seen_tees)),
3187 metadata: metadata.clone(),
3188 },
3189 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3190 f: f.deep_clone(seen_tees),
3191 input: Box::new(input.deep_clone(seen_tees)),
3192 metadata: metadata.clone(),
3193 },
3194 HydroNode::Network {
3195 name,
3196 networking_info,
3197 serialize_fn,
3198 instantiate_fn,
3199 deserialize_fn,
3200 input,
3201 metadata,
3202 } => HydroNode::Network {
3203 name: name.clone(),
3204 networking_info: networking_info.clone(),
3205 serialize_fn: serialize_fn.clone(),
3206 instantiate_fn: instantiate_fn.clone(),
3207 deserialize_fn: deserialize_fn.clone(),
3208 input: Box::new(input.deep_clone(seen_tees)),
3209 metadata: metadata.clone(),
3210 },
3211 HydroNode::ExternalInput {
3212 from_external_key,
3213 from_port_id,
3214 from_many,
3215 codec_type,
3216 port_hint,
3217 instantiate_fn,
3218 deserialize_fn,
3219 metadata,
3220 } => HydroNode::ExternalInput {
3221 from_external_key: *from_external_key,
3222 from_port_id: *from_port_id,
3223 from_many: *from_many,
3224 codec_type: codec_type.clone(),
3225 port_hint: *port_hint,
3226 instantiate_fn: instantiate_fn.clone(),
3227 deserialize_fn: deserialize_fn.clone(),
3228 metadata: metadata.clone(),
3229 },
3230 HydroNode::Counter {
3231 tag,
3232 duration,
3233 prefix,
3234 input,
3235 metadata,
3236 } => HydroNode::Counter {
3237 tag: tag.clone(),
3238 duration: duration.clone(),
3239 prefix: prefix.clone(),
3240 input: Box::new(input.deep_clone(seen_tees)),
3241 metadata: metadata.clone(),
3242 },
3243 }
3244 }
3245
3246 #[cfg(feature = "build")]
3247 pub fn emit_core(
3248 &mut self,
3249 builders_or_callback: &mut BuildersOrCallback<
3250 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3251 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3252 >,
3253 seen_tees: &mut SeenSharedNodes,
3254 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3255 next_stmt_id: &mut crate::Counter<StmtId>,
3256 fold_hooked_idents: &mut HashSet<String>,
3257 ) -> syn::Ident {
3258 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3259
3260 self.transform_bottom_up(
3261 &mut |node: &mut HydroNode| {
3262 let out_location = node.metadata().location_id.clone();
3263 match node {
3264 HydroNode::Placeholder => {
3265 panic!()
3266 }
3267
3268 HydroNode::Cast { .. } => {
3269 let _ = next_stmt_id.get_and_increment();
3272 match builders_or_callback {
3273 BuildersOrCallback::Builders(_) => {}
3274 BuildersOrCallback::Callback(_, node_callback) => {
3275 node_callback(node, next_stmt_id);
3276 }
3277 }
3278 }
3280
3281 HydroNode::UnboundSingleton { .. } => {
3282 let inner_ident = ident_stack.pop().unwrap();
3283
3284 let stmt_id = next_stmt_id.get_and_increment();
3285 let out_ident =
3286 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3287
3288 match builders_or_callback {
3289 BuildersOrCallback::Builders(graph_builders) => {
3290 if graph_builders.singleton_intermediates() {
3291 let builder = graph_builders.get_dfir_mut(&out_location);
3292 builder.add_dfir(
3293 parse_quote! {
3294 #out_ident = #inner_ident;
3295 },
3296 None,
3297 None,
3298 );
3299 } else {
3300 let builder = graph_builders.get_dfir_mut(&out_location);
3301 builder.add_dfir(
3302 parse_quote! {
3303 #out_ident = #inner_ident -> persist::<'static>();
3304 },
3305 None,
3306 None,
3307 );
3308 }
3309 }
3310 BuildersOrCallback::Callback(_, node_callback) => {
3311 node_callback(node, next_stmt_id);
3312 }
3313 }
3314
3315 ident_stack.push(out_ident);
3316 }
3317
3318 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3319 let inner_ident = ident_stack.pop().unwrap();
3320
3321 let stmt_id = next_stmt_id.get_and_increment();
3322 let out_ident =
3323 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3324
3325 match builders_or_callback {
3326 BuildersOrCallback::Builders(graph_builders) => {
3327 graph_builders.assert_is_consistent(
3328 *trusted,
3329 &inner.metadata().location_id,
3330 inner_ident,
3331 &out_ident,
3332 );
3333 }
3334 BuildersOrCallback::Callback(_, node_callback) => {
3335 node_callback(node, next_stmt_id);
3336 }
3337 }
3338
3339 ident_stack.push(out_ident);
3340 }
3341
3342 HydroNode::ObserveNonDet {
3343 inner,
3344 trusted,
3345 metadata,
3346 ..
3347 } => {
3348 let inner_ident = ident_stack.pop().unwrap();
3349
3350 let stmt_id = next_stmt_id.get_and_increment();
3351 let observe_ident =
3352 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3353
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(graph_builders) => {
3356 graph_builders.observe_nondet(
3357 *trusted,
3358 &inner.metadata().location_id,
3359 inner_ident,
3360 &inner.metadata().collection_kind,
3361 &observe_ident,
3362 &metadata.collection_kind,
3363 &metadata.op,
3364 );
3365 }
3366 BuildersOrCallback::Callback(_, node_callback) => {
3367 node_callback(node, next_stmt_id);
3368 }
3369 }
3370
3371 ident_stack.push(observe_ident);
3372 }
3373
3374 HydroNode::Batch {
3375 inner, metadata, ..
3376 } => {
3377 let inner_ident = ident_stack.pop().unwrap();
3378
3379 let stmt_id = next_stmt_id.get_and_increment();
3380 let batch_ident =
3381 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3382
3383 match builders_or_callback {
3384 BuildersOrCallback::Builders(graph_builders) => {
3385 graph_builders.batch(
3386 inner_ident,
3387 &inner.metadata().location_id,
3388 &inner.metadata().collection_kind,
3389 &batch_ident,
3390 &out_location,
3391 &metadata.op,
3392 fold_hooked_idents,
3393 );
3394 }
3395 BuildersOrCallback::Callback(_, node_callback) => {
3396 node_callback(node, next_stmt_id);
3397 }
3398 }
3399
3400 ident_stack.push(batch_ident);
3401 }
3402
3403 HydroNode::YieldConcat { inner, .. } => {
3404 let inner_ident = ident_stack.pop().unwrap();
3405
3406 let stmt_id = next_stmt_id.get_and_increment();
3407 let yield_ident =
3408 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3409
3410 match builders_or_callback {
3411 BuildersOrCallback::Builders(graph_builders) => {
3412 graph_builders.yield_from_tick(
3413 inner_ident,
3414 &inner.metadata().location_id,
3415 &inner.metadata().collection_kind,
3416 &yield_ident,
3417 &out_location,
3418 );
3419 }
3420 BuildersOrCallback::Callback(_, node_callback) => {
3421 node_callback(node, next_stmt_id);
3422 }
3423 }
3424
3425 ident_stack.push(yield_ident);
3426 }
3427
3428 HydroNode::BeginAtomic { inner, metadata } => {
3429 let inner_ident = ident_stack.pop().unwrap();
3430
3431 let stmt_id = next_stmt_id.get_and_increment();
3432 let begin_ident =
3433 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3434
3435 match builders_or_callback {
3436 BuildersOrCallback::Builders(graph_builders) => {
3437 graph_builders.begin_atomic(
3438 inner_ident,
3439 &inner.metadata().location_id,
3440 &inner.metadata().collection_kind,
3441 &begin_ident,
3442 &out_location,
3443 &metadata.op,
3444 );
3445 }
3446 BuildersOrCallback::Callback(_, node_callback) => {
3447 node_callback(node, next_stmt_id);
3448 }
3449 }
3450
3451 ident_stack.push(begin_ident);
3452 }
3453
3454 HydroNode::EndAtomic { inner, .. } => {
3455 let inner_ident = ident_stack.pop().unwrap();
3456
3457 let stmt_id = next_stmt_id.get_and_increment();
3458 let end_ident =
3459 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3460
3461 match builders_or_callback {
3462 BuildersOrCallback::Builders(graph_builders) => {
3463 graph_builders.end_atomic(
3464 inner_ident,
3465 &inner.metadata().location_id,
3466 &inner.metadata().collection_kind,
3467 &end_ident,
3468 );
3469 }
3470 BuildersOrCallback::Callback(_, node_callback) => {
3471 node_callback(node, next_stmt_id);
3472 }
3473 }
3474
3475 ident_stack.push(end_ident);
3476 }
3477
3478 HydroNode::Source {
3479 source, metadata, ..
3480 } => {
3481 if let HydroSource::ExternalNetwork() = source {
3482 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3483 } else {
3484 let stmt_id = next_stmt_id.get_and_increment();
3485 let source_ident =
3486 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3487
3488 let source_stmt = match source {
3489 HydroSource::Stream(expr) => {
3490 debug_assert!(metadata.location_id.is_top_level());
3491 parse_quote! {
3492 #source_ident = source_stream(#expr);
3493 }
3494 }
3495
3496 HydroSource::ExternalNetwork() => {
3497 unreachable!()
3498 }
3499
3500 HydroSource::Iter(expr) => {
3501 if metadata.location_id.is_top_level() {
3502 parse_quote! {
3503 #source_ident = source_iter(#expr);
3504 }
3505 } else {
3506 parse_quote! {
3508 #source_ident = source_iter(#expr) -> persist::<'static>();
3509 }
3510 }
3511 }
3512
3513 HydroSource::Spin() => {
3514 debug_assert!(metadata.location_id.is_top_level());
3515 parse_quote! {
3516 #source_ident = spin();
3517 }
3518 }
3519
3520 HydroSource::ClusterMembers(target_loc, state) => {
3521 debug_assert!(metadata.location_id.is_top_level());
3522
3523 let members_tee_ident = syn::Ident::new(
3524 &format!(
3525 "__cluster_members_tee_{}_{}",
3526 metadata.location_id.root().key(),
3527 target_loc.key(),
3528 ),
3529 Span::call_site(),
3530 );
3531
3532 match state {
3533 ClusterMembersState::Stream(d) => {
3534 parse_quote! {
3535 #members_tee_ident = source_stream(#d) -> tee();
3536 #source_ident = #members_tee_ident;
3537 }
3538 },
3539 ClusterMembersState::Uninit => syn::parse_quote! {
3540 #source_ident = source_stream(DUMMY);
3541 },
3542 ClusterMembersState::Tee(..) => parse_quote! {
3543 #source_ident = #members_tee_ident;
3544 },
3545 }
3546 }
3547
3548 HydroSource::Embedded(ident) => {
3549 parse_quote! {
3550 #source_ident = source_stream(#ident);
3551 }
3552 }
3553
3554 HydroSource::EmbeddedSingleton(ident) => {
3555 parse_quote! {
3556 #source_ident = source_iter([#ident]);
3557 }
3558 }
3559 };
3560
3561 match builders_or_callback {
3562 BuildersOrCallback::Builders(graph_builders) => {
3563 let builder = graph_builders.get_dfir_mut(&out_location);
3564 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3565 }
3566 BuildersOrCallback::Callback(_, node_callback) => {
3567 node_callback(node, next_stmt_id);
3568 }
3569 }
3570
3571 ident_stack.push(source_ident);
3572 }
3573 }
3574
3575 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3576 let stmt_id = next_stmt_id.get_and_increment();
3577 let source_ident =
3578 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3579
3580 match builders_or_callback {
3581 BuildersOrCallback::Builders(graph_builders) => {
3582 let builder = graph_builders.get_dfir_mut(&out_location);
3583
3584 if *first_tick_only {
3585 assert!(
3586 !metadata.location_id.is_top_level(),
3587 "first_tick_only SingletonSource must be inside a tick"
3588 );
3589 }
3590
3591 if *first_tick_only
3592 || (metadata.location_id.is_top_level()
3593 && metadata.collection_kind.is_bounded())
3594 {
3595 builder.add_dfir(
3596 parse_quote! {
3597 #source_ident = source_iter([#value]);
3598 },
3599 None,
3600 Some(&stmt_id.to_string()),
3601 );
3602 } else {
3603 builder.add_dfir(
3604 parse_quote! {
3605 #source_ident = source_iter([#value]) -> persist::<'static>();
3606 },
3607 None,
3608 Some(&stmt_id.to_string()),
3609 );
3610 }
3611 }
3612 BuildersOrCallback::Callback(_, node_callback) => {
3613 node_callback(node, next_stmt_id);
3614 }
3615 }
3616
3617 ident_stack.push(source_ident);
3618 }
3619
3620 HydroNode::CycleSource { cycle_id, .. } => {
3621 let ident = cycle_id.as_ident();
3622
3623 let _ = next_stmt_id.get_and_increment();
3625
3626 match builders_or_callback {
3627 BuildersOrCallback::Builders(_) => {}
3628 BuildersOrCallback::Callback(_, node_callback) => {
3629 node_callback(node, next_stmt_id);
3630 }
3631 }
3632
3633 ident_stack.push(ident);
3634 }
3635
3636 HydroNode::Tee { inner, .. } => {
3637 let stmt_id = next_stmt_id.get_and_increment();
3640
3641 let ret_ident = if let Some(built_idents) =
3642 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3643 {
3644 match builders_or_callback {
3645 BuildersOrCallback::Builders(_) => {}
3646 BuildersOrCallback::Callback(_, node_callback) => {
3647 node_callback(node, next_stmt_id);
3648 }
3649 }
3650
3651 built_idents[0].clone()
3652 } else {
3653 let inner_ident = ident_stack.pop().unwrap();
3656
3657 let tee_ident =
3658 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3659
3660 built_tees.insert(
3661 inner.0.as_ref() as *const RefCell<HydroNode>,
3662 vec![tee_ident.clone()],
3663 );
3664
3665 match builders_or_callback {
3666 BuildersOrCallback::Builders(graph_builders) => {
3667 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3679 fold_hooked_idents.insert(tee_ident.to_string());
3680 }
3681 let builder = graph_builders.get_dfir_mut(&out_location);
3682 builder.add_dfir(
3683 parse_quote! {
3684 #tee_ident = #inner_ident -> tee();
3685 },
3686 None,
3687 Some(&stmt_id.to_string()),
3688 );
3689 }
3690 BuildersOrCallback::Callback(_, node_callback) => {
3691 node_callback(node, next_stmt_id);
3692 }
3693 }
3694
3695 tee_ident
3696 };
3697
3698 ident_stack.push(ret_ident);
3699 }
3700
3701 HydroNode::Reference { inner, kind, .. } => {
3702 let stmt_id = next_stmt_id.get_and_increment();
3705
3706 let ret_ident = if let Some(built_idents) =
3707 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3708 {
3709 built_idents[0].clone()
3710 } else {
3711 let inner_ident = ident_stack.pop().unwrap();
3712
3713 let ref_ident =
3714 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3715
3716 built_tees.insert(
3717 inner.0.as_ref() as *const RefCell<HydroNode>,
3718 vec![ref_ident.clone()],
3719 );
3720
3721 match builders_or_callback {
3722 BuildersOrCallback::Builders(graph_builders) => {
3723 let builder = graph_builders.get_dfir_mut(&out_location);
3724 let op_ident = syn::Ident::new(
3725 match kind {
3726 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3727 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3728 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3729 },
3730 Span::call_site(),
3731 );
3732 builder.add_dfir(
3733 parse_quote! {
3734 #ref_ident = #inner_ident -> #op_ident();
3735 },
3736 None,
3737 Some(&stmt_id.to_string()),
3738 );
3739 }
3740 BuildersOrCallback::Callback(_, node_callback) => {
3741 node_callback(node, next_stmt_id);
3742 }
3743 }
3744
3745 ref_ident
3746 };
3747
3748 ident_stack.push(ret_ident);
3749 }
3750
3751 HydroNode::Partition {
3752 inner, f, is_true, metadata,
3753 } => {
3754 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3756 let stmt_id = next_stmt_id.get_and_increment();
3757
3758 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3759 match builders_or_callback {
3760 BuildersOrCallback::Builders(_) => {}
3761 BuildersOrCallback::Callback(_, node_callback) => {
3762 node_callback(node, next_stmt_id);
3763 }
3764 }
3765
3766 let idx = if is_true { 0 } else { 1 };
3767 built_idents[idx].clone()
3768 } else {
3769 let inner_ident = ident_stack.pop().unwrap();
3772 let f_tokens = f.emit_tokens(&mut ident_stack);
3773
3774 let inner_ident = {
3775 let inner_borrow = inner.0.borrow();
3776 maybe_observe_for_mut(
3777 f, inner_ident,
3778 &inner_borrow.metadata().location_id,
3779 &inner_borrow.metadata().collection_kind,
3780 &metadata.op,
3781 builders_or_callback, next_stmt_id,
3782 )
3783 };
3784
3785 let partition_ident = syn::Ident::new(
3786 &format!("stream_{}_partition", stmt_id),
3787 Span::call_site(),
3788 );
3789 let true_ident = syn::Ident::new(
3790 &format!("stream_{}_true", stmt_id),
3791 Span::call_site(),
3792 );
3793 let false_ident = syn::Ident::new(
3794 &format!("stream_{}_false", stmt_id),
3795 Span::call_site(),
3796 );
3797
3798 built_tees.insert(
3799 ptr,
3800 vec![true_ident.clone(), false_ident.clone()],
3801 );
3802
3803 let stmt_id = next_stmt_id.get_and_increment();
3804 match builders_or_callback {
3805 BuildersOrCallback::Builders(graph_builders) => {
3806 let builder = graph_builders.get_dfir_mut(&out_location);
3807 builder.add_dfir(
3808 parse_quote! {
3809 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3810 #true_ident = #partition_ident[0];
3811 #false_ident = #partition_ident[1];
3812 },
3813 None,
3814 Some(&stmt_id.to_string()),
3815 );
3816 }
3817 BuildersOrCallback::Callback(_, node_callback) => {
3818 node_callback(node, next_stmt_id);
3819 }
3820 }
3821
3822 if is_true { true_ident } else { false_ident }
3823 };
3824
3825 ident_stack.push(ret_ident);
3826 }
3827
3828 HydroNode::Chain { .. } => {
3829 let second_ident = ident_stack.pop().unwrap();
3831 let first_ident = ident_stack.pop().unwrap();
3832
3833 let stmt_id = next_stmt_id.get_and_increment();
3834 let chain_ident =
3835 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3836
3837 match builders_or_callback {
3838 BuildersOrCallback::Builders(graph_builders) => {
3839 let builder = graph_builders.get_dfir_mut(&out_location);
3840 builder.add_dfir(
3841 parse_quote! {
3842 #chain_ident = chain();
3843 #first_ident -> [0]#chain_ident;
3844 #second_ident -> [1]#chain_ident;
3845 },
3846 None,
3847 Some(&stmt_id.to_string()),
3848 );
3849 }
3850 BuildersOrCallback::Callback(_, node_callback) => {
3851 node_callback(node, next_stmt_id);
3852 }
3853 }
3854
3855 ident_stack.push(chain_ident);
3856 }
3857
3858 HydroNode::MergeOrdered { first, metadata, .. } => {
3859 let second_ident = ident_stack.pop().unwrap();
3860 let first_ident = ident_stack.pop().unwrap();
3861
3862 let stmt_id = next_stmt_id.get_and_increment();
3863 let merge_ident =
3864 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3865
3866 match builders_or_callback {
3867 BuildersOrCallback::Builders(graph_builders) => {
3868 graph_builders.merge_ordered(
3869 &first.metadata().location_id,
3870 first_ident,
3871 second_ident,
3872 &merge_ident,
3873 &first.metadata().collection_kind,
3874 &metadata.op,
3875 Some(&stmt_id.to_string()),
3876 );
3877 }
3878 BuildersOrCallback::Callback(_, node_callback) => {
3879 node_callback(node, next_stmt_id);
3880 }
3881 }
3882
3883 ident_stack.push(merge_ident);
3884 }
3885
3886 HydroNode::ChainFirst { .. } => {
3887 let second_ident = ident_stack.pop().unwrap();
3888 let first_ident = ident_stack.pop().unwrap();
3889
3890 let stmt_id = next_stmt_id.get_and_increment();
3891 let chain_ident =
3892 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3893
3894 match builders_or_callback {
3895 BuildersOrCallback::Builders(graph_builders) => {
3896 let builder = graph_builders.get_dfir_mut(&out_location);
3897 builder.add_dfir(
3898 parse_quote! {
3899 #chain_ident = chain_first_n(1);
3900 #first_ident -> [0]#chain_ident;
3901 #second_ident -> [1]#chain_ident;
3902 },
3903 None,
3904 Some(&stmt_id.to_string()),
3905 );
3906 }
3907 BuildersOrCallback::Callback(_, node_callback) => {
3908 node_callback(node, next_stmt_id);
3909 }
3910 }
3911
3912 ident_stack.push(chain_ident);
3913 }
3914
3915 HydroNode::CrossSingleton { right, .. } => {
3916 let right_ident = ident_stack.pop().unwrap();
3917 let left_ident = ident_stack.pop().unwrap();
3918
3919 let stmt_id = next_stmt_id.get_and_increment();
3920 let cross_ident =
3921 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3922
3923 match builders_or_callback {
3924 BuildersOrCallback::Builders(graph_builders) => {
3925 let builder = graph_builders.get_dfir_mut(&out_location);
3926
3927 if right.metadata().location_id.is_top_level()
3928 && right.metadata().collection_kind.is_bounded()
3929 {
3930 builder.add_dfir(
3931 parse_quote! {
3932 #cross_ident = cross_singleton::<'static>();
3933 #left_ident -> [input]#cross_ident;
3934 #right_ident -> [single]#cross_ident;
3935 },
3936 None,
3937 Some(&stmt_id.to_string()),
3938 );
3939 } else {
3940 builder.add_dfir(
3941 parse_quote! {
3942 #cross_ident = cross_singleton();
3943 #left_ident -> [input]#cross_ident;
3944 #right_ident -> [single]#cross_ident;
3945 },
3946 None,
3947 Some(&stmt_id.to_string()),
3948 );
3949 }
3950 }
3951 BuildersOrCallback::Callback(_, node_callback) => {
3952 node_callback(node, next_stmt_id);
3953 }
3954 }
3955
3956 ident_stack.push(cross_ident);
3957 }
3958
3959 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3960 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3961 parse_quote!(cross_join_multiset)
3962 } else {
3963 parse_quote!(join_multiset)
3964 };
3965
3966 let (HydroNode::CrossProduct { left, right, .. }
3967 | HydroNode::Join { left, right, .. }) = node
3968 else {
3969 unreachable!()
3970 };
3971
3972 let is_top_level = left.metadata().location_id.is_top_level()
3973 && right.metadata().location_id.is_top_level();
3974 let left_lifetime = if left.metadata().location_id.is_top_level() {
3975 quote!('static)
3976 } else {
3977 quote!('tick)
3978 };
3979
3980 let right_lifetime = if right.metadata().location_id.is_top_level() {
3981 quote!('static)
3982 } else {
3983 quote!('tick)
3984 };
3985
3986 let right_ident = ident_stack.pop().unwrap();
3987 let left_ident = ident_stack.pop().unwrap();
3988
3989 let stmt_id = next_stmt_id.get_and_increment();
3990 let stream_ident =
3991 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3992
3993 match builders_or_callback {
3994 BuildersOrCallback::Builders(graph_builders) => {
3995 let builder = graph_builders.get_dfir_mut(&out_location);
3996 builder.add_dfir(
3997 if is_top_level {
3998 parse_quote! {
4001 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4002 #left_ident -> [0]#stream_ident;
4003 #right_ident -> [1]#stream_ident;
4004 }
4005 } else {
4006 parse_quote! {
4007 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4008 #left_ident -> [0]#stream_ident;
4009 #right_ident -> [1]#stream_ident;
4010 }
4011 }
4012 ,
4013 None,
4014 Some(&stmt_id.to_string()),
4015 );
4016 }
4017 BuildersOrCallback::Callback(_, node_callback) => {
4018 node_callback(node, next_stmt_id);
4019 }
4020 }
4021
4022 ident_stack.push(stream_ident);
4023 }
4024
4025 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4026 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4027 parse_quote!(difference)
4028 } else {
4029 parse_quote!(anti_join)
4030 };
4031
4032 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4033 node
4034 else {
4035 unreachable!()
4036 };
4037
4038 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4039 quote!('static)
4040 } else {
4041 quote!('tick)
4042 };
4043
4044 let neg_ident = ident_stack.pop().unwrap();
4045 let pos_ident = ident_stack.pop().unwrap();
4046
4047 let stmt_id = next_stmt_id.get_and_increment();
4048 let stream_ident =
4049 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4050
4051 match builders_or_callback {
4052 BuildersOrCallback::Builders(graph_builders) => {
4053 let builder = graph_builders.get_dfir_mut(&out_location);
4054 builder.add_dfir(
4055 parse_quote! {
4056 #stream_ident = #operator::<'tick, #neg_lifetime>();
4057 #pos_ident -> [pos]#stream_ident;
4058 #neg_ident -> [neg]#stream_ident;
4059 },
4060 None,
4061 Some(&stmt_id.to_string()),
4062 );
4063 }
4064 BuildersOrCallback::Callback(_, node_callback) => {
4065 node_callback(node, next_stmt_id);
4066 }
4067 }
4068
4069 ident_stack.push(stream_ident);
4070 }
4071
4072 HydroNode::JoinHalf { .. } => {
4073 let HydroNode::JoinHalf { right, .. } = node else {
4074 unreachable!()
4075 };
4076
4077 assert!(
4078 right.metadata().collection_kind.is_bounded(),
4079 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4080 right.metadata().collection_kind
4081 );
4082
4083 let build_lifetime = if right.metadata().location_id.is_top_level() {
4084 quote!('static)
4085 } else {
4086 quote!('tick)
4087 };
4088
4089 let build_ident = ident_stack.pop().unwrap();
4090 let probe_ident = ident_stack.pop().unwrap();
4091
4092 let stmt_id = next_stmt_id.get_and_increment();
4093 let stream_ident =
4094 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4095
4096 match builders_or_callback {
4097 BuildersOrCallback::Builders(graph_builders) => {
4098 let builder = graph_builders.get_dfir_mut(&out_location);
4099 builder.add_dfir(
4100 parse_quote! {
4101 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4102 #probe_ident -> [probe]#stream_ident;
4103 #build_ident -> [build]#stream_ident;
4104 },
4105 None,
4106 Some(&stmt_id.to_string()),
4107 );
4108 }
4109 BuildersOrCallback::Callback(_, node_callback) => {
4110 node_callback(node, next_stmt_id);
4111 }
4112 }
4113
4114 ident_stack.push(stream_ident);
4115 }
4116
4117 HydroNode::ResolveFutures { .. } => {
4118 let input_ident = ident_stack.pop().unwrap();
4119
4120 let stmt_id = next_stmt_id.get_and_increment();
4121 let futures_ident =
4122 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4123
4124 match builders_or_callback {
4125 BuildersOrCallback::Builders(graph_builders) => {
4126 let builder = graph_builders.get_dfir_mut(&out_location);
4127 builder.add_dfir(
4128 parse_quote! {
4129 #futures_ident = #input_ident -> resolve_futures();
4130 },
4131 None,
4132 Some(&stmt_id.to_string()),
4133 );
4134 }
4135 BuildersOrCallback::Callback(_, node_callback) => {
4136 node_callback(node, next_stmt_id);
4137 }
4138 }
4139
4140 ident_stack.push(futures_ident);
4141 }
4142
4143 HydroNode::ResolveFuturesBlocking { .. } => {
4144 let input_ident = ident_stack.pop().unwrap();
4145
4146 let stmt_id = next_stmt_id.get_and_increment();
4147 let futures_ident =
4148 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4149
4150 match builders_or_callback {
4151 BuildersOrCallback::Builders(graph_builders) => {
4152 let builder = graph_builders.get_dfir_mut(&out_location);
4153 builder.add_dfir(
4154 parse_quote! {
4155 #futures_ident = #input_ident -> resolve_futures_blocking();
4156 },
4157 None,
4158 Some(&stmt_id.to_string()),
4159 );
4160 }
4161 BuildersOrCallback::Callback(_, node_callback) => {
4162 node_callback(node, next_stmt_id);
4163 }
4164 }
4165
4166 ident_stack.push(futures_ident);
4167 }
4168
4169 HydroNode::ResolveFuturesOrdered { .. } => {
4170 let input_ident = ident_stack.pop().unwrap();
4171
4172 let stmt_id = next_stmt_id.get_and_increment();
4173 let futures_ident =
4174 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4175
4176 match builders_or_callback {
4177 BuildersOrCallback::Builders(graph_builders) => {
4178 let builder = graph_builders.get_dfir_mut(&out_location);
4179 builder.add_dfir(
4180 parse_quote! {
4181 #futures_ident = #input_ident -> resolve_futures_ordered();
4182 },
4183 None,
4184 Some(&stmt_id.to_string()),
4185 );
4186 }
4187 BuildersOrCallback::Callback(_, node_callback) => {
4188 node_callback(node, next_stmt_id);
4189 }
4190 }
4191
4192 ident_stack.push(futures_ident);
4193 }
4194
4195 HydroNode::Map {
4196 f,
4197 input,
4198 metadata,
4199 } => {
4200 let input_ident = ident_stack.pop().unwrap();
4202 let f_tokens = f.emit_tokens(&mut ident_stack);
4203
4204 let input_ident = maybe_observe_for_mut(
4205 f,
4206 input_ident,
4207 &input.metadata().location_id,
4208 &input.metadata().collection_kind,
4209 &metadata.op,
4210 builders_or_callback,
4211 next_stmt_id,
4212 );
4213
4214 let stmt_id = next_stmt_id.get_and_increment();
4215 let map_ident =
4216 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4217
4218 match builders_or_callback {
4219 BuildersOrCallback::Builders(graph_builders) => {
4220 let builder = graph_builders.get_dfir_mut(&out_location);
4221 builder.add_dfir(
4222 parse_quote! {
4223 #map_ident = #input_ident -> map(#f_tokens);
4224 },
4225 None,
4226 Some(&stmt_id.to_string()),
4227 );
4228 }
4229 BuildersOrCallback::Callback(_, node_callback) => {
4230 node_callback(node, next_stmt_id);
4231 }
4232 }
4233
4234 ident_stack.push(map_ident);
4235 }
4236
4237 HydroNode::FlatMap { f, input, metadata } => {
4238 let input_ident = ident_stack.pop().unwrap();
4239 let f_tokens = f.emit_tokens(&mut ident_stack);
4240
4241 let input_ident = maybe_observe_for_mut(
4242 f, input_ident,
4243 &input.metadata().location_id,
4244 &input.metadata().collection_kind,
4245 &metadata.op,
4246 builders_or_callback, next_stmt_id,
4247 );
4248
4249 let stmt_id = next_stmt_id.get_and_increment();
4250 let flat_map_ident =
4251 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4252
4253 match builders_or_callback {
4254 BuildersOrCallback::Builders(graph_builders) => {
4255 let builder = graph_builders.get_dfir_mut(&out_location);
4256 builder.add_dfir(
4257 parse_quote! {
4258 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4259 },
4260 None,
4261 Some(&stmt_id.to_string()),
4262 );
4263 }
4264 BuildersOrCallback::Callback(_, node_callback) => {
4265 node_callback(node, next_stmt_id);
4266 }
4267 }
4268
4269 ident_stack.push(flat_map_ident);
4270 }
4271
4272 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4273 let input_ident = ident_stack.pop().unwrap();
4274 let f_tokens = f.emit_tokens(&mut ident_stack);
4275
4276 let input_ident = maybe_observe_for_mut(
4277 f, input_ident,
4278 &input.metadata().location_id,
4279 &input.metadata().collection_kind,
4280 &metadata.op,
4281 builders_or_callback, next_stmt_id,
4282 );
4283
4284 let stmt_id = next_stmt_id.get_and_increment();
4285 let flat_map_stream_blocking_ident =
4286 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4287
4288 match builders_or_callback {
4289 BuildersOrCallback::Builders(graph_builders) => {
4290 let builder = graph_builders.get_dfir_mut(&out_location);
4291 builder.add_dfir(
4292 parse_quote! {
4293 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4294 },
4295 None,
4296 Some(&stmt_id.to_string()),
4297 );
4298 }
4299 BuildersOrCallback::Callback(_, node_callback) => {
4300 node_callback(node, next_stmt_id);
4301 }
4302 }
4303
4304 ident_stack.push(flat_map_stream_blocking_ident);
4305 }
4306
4307 HydroNode::Filter { f, input, metadata } => {
4308 let input_ident = ident_stack.pop().unwrap();
4309 let f_tokens = f.emit_tokens(&mut ident_stack);
4310
4311 let input_ident = maybe_observe_for_mut(
4312 f, input_ident,
4313 &input.metadata().location_id,
4314 &input.metadata().collection_kind,
4315 &metadata.op,
4316 builders_or_callback, next_stmt_id,
4317 );
4318
4319 let stmt_id = next_stmt_id.get_and_increment();
4320 let filter_ident =
4321 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4322
4323 match builders_or_callback {
4324 BuildersOrCallback::Builders(graph_builders) => {
4325 let builder = graph_builders.get_dfir_mut(&out_location);
4326 builder.add_dfir(
4327 parse_quote! {
4328 #filter_ident = #input_ident -> filter(#f_tokens);
4329 },
4330 None,
4331 Some(&stmt_id.to_string()),
4332 );
4333 }
4334 BuildersOrCallback::Callback(_, node_callback) => {
4335 node_callback(node, next_stmt_id);
4336 }
4337 }
4338
4339 ident_stack.push(filter_ident);
4340 }
4341
4342 HydroNode::FilterMap { f, input, metadata } => {
4343 let input_ident = ident_stack.pop().unwrap();
4344 let f_tokens = f.emit_tokens(&mut ident_stack);
4345
4346 let input_ident = maybe_observe_for_mut(
4347 f, input_ident,
4348 &input.metadata().location_id,
4349 &input.metadata().collection_kind,
4350 &metadata.op,
4351 builders_or_callback, next_stmt_id,
4352 );
4353
4354 let stmt_id = next_stmt_id.get_and_increment();
4355 let filter_map_ident =
4356 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4357
4358 match builders_or_callback {
4359 BuildersOrCallback::Builders(graph_builders) => {
4360 let builder = graph_builders.get_dfir_mut(&out_location);
4361 builder.add_dfir(
4362 parse_quote! {
4363 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4364 },
4365 None,
4366 Some(&stmt_id.to_string()),
4367 );
4368 }
4369 BuildersOrCallback::Callback(_, node_callback) => {
4370 node_callback(node, next_stmt_id);
4371 }
4372 }
4373
4374 ident_stack.push(filter_map_ident);
4375 }
4376
4377 HydroNode::Sort { .. } => {
4378 let input_ident = ident_stack.pop().unwrap();
4379
4380 let stmt_id = next_stmt_id.get_and_increment();
4381 let sort_ident =
4382 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4383
4384 match builders_or_callback {
4385 BuildersOrCallback::Builders(graph_builders) => {
4386 let builder = graph_builders.get_dfir_mut(&out_location);
4387 builder.add_dfir(
4388 parse_quote! {
4389 #sort_ident = #input_ident -> sort();
4390 },
4391 None,
4392 Some(&stmt_id.to_string()),
4393 );
4394 }
4395 BuildersOrCallback::Callback(_, node_callback) => {
4396 node_callback(node, next_stmt_id);
4397 }
4398 }
4399
4400 ident_stack.push(sort_ident);
4401 }
4402
4403 HydroNode::DeferTick { .. } => {
4404 let input_ident = ident_stack.pop().unwrap();
4405
4406 let stmt_id = next_stmt_id.get_and_increment();
4407 let defer_tick_ident =
4408 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4409
4410 match builders_or_callback {
4411 BuildersOrCallback::Builders(graph_builders) => {
4412 let builder = graph_builders.get_dfir_mut(&out_location);
4413 builder.add_dfir(
4414 parse_quote! {
4415 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4416 },
4417 None,
4418 Some(&stmt_id.to_string()),
4419 );
4420 }
4421 BuildersOrCallback::Callback(_, node_callback) => {
4422 node_callback(node, next_stmt_id);
4423 }
4424 }
4425
4426 ident_stack.push(defer_tick_ident);
4427 }
4428
4429 HydroNode::Enumerate { input, .. } => {
4430 let input_ident = ident_stack.pop().unwrap();
4431
4432 let stmt_id = next_stmt_id.get_and_increment();
4433 let enumerate_ident =
4434 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4435
4436 match builders_or_callback {
4437 BuildersOrCallback::Builders(graph_builders) => {
4438 let builder = graph_builders.get_dfir_mut(&out_location);
4439 let lifetime = if input.metadata().location_id.is_top_level() {
4440 quote!('static)
4441 } else {
4442 quote!('tick)
4443 };
4444 builder.add_dfir(
4445 parse_quote! {
4446 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4447 },
4448 None,
4449 Some(&stmt_id.to_string()),
4450 );
4451 }
4452 BuildersOrCallback::Callback(_, node_callback) => {
4453 node_callback(node, next_stmt_id);
4454 }
4455 }
4456
4457 ident_stack.push(enumerate_ident);
4458 }
4459
4460 HydroNode::Inspect { f, input, metadata } => {
4461 let input_ident = ident_stack.pop().unwrap();
4462 let f_tokens = f.emit_tokens(&mut ident_stack);
4463
4464 let input_ident = maybe_observe_for_mut(
4465 f, input_ident,
4466 &input.metadata().location_id,
4467 &input.metadata().collection_kind,
4468 &metadata.op,
4469 builders_or_callback, next_stmt_id,
4470 );
4471
4472 let stmt_id = next_stmt_id.get_and_increment();
4473 let inspect_ident =
4474 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4475
4476 match builders_or_callback {
4477 BuildersOrCallback::Builders(graph_builders) => {
4478 let builder = graph_builders.get_dfir_mut(&out_location);
4479 builder.add_dfir(
4480 parse_quote! {
4481 #inspect_ident = #input_ident -> inspect(#f_tokens);
4482 },
4483 None,
4484 Some(&stmt_id.to_string()),
4485 );
4486 }
4487 BuildersOrCallback::Callback(_, node_callback) => {
4488 node_callback(node, next_stmt_id);
4489 }
4490 }
4491
4492 ident_stack.push(inspect_ident);
4493 }
4494
4495 HydroNode::Unique { input, .. } => {
4496 let input_ident = ident_stack.pop().unwrap();
4497
4498 let stmt_id = next_stmt_id.get_and_increment();
4499 let unique_ident =
4500 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4501
4502 match builders_or_callback {
4503 BuildersOrCallback::Builders(graph_builders) => {
4504 let builder = graph_builders.get_dfir_mut(&out_location);
4505 let lifetime = if input.metadata().location_id.is_top_level() {
4506 quote!('static)
4507 } else {
4508 quote!('tick)
4509 };
4510
4511 builder.add_dfir(
4512 parse_quote! {
4513 #unique_ident = #input_ident -> unique::<#lifetime>();
4514 },
4515 None,
4516 Some(&stmt_id.to_string()),
4517 );
4518 }
4519 BuildersOrCallback::Callback(_, node_callback) => {
4520 node_callback(node, next_stmt_id);
4521 }
4522 }
4523
4524 ident_stack.push(unique_ident);
4525 }
4526
4527 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4528 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4529 if input.metadata().location_id.is_top_level()
4530 && input.metadata().collection_kind.is_bounded()
4531 {
4532 parse_quote!(fold_no_replay)
4533 } else {
4534 parse_quote!(fold)
4535 }
4536 } else if matches!(node, HydroNode::Scan { .. }) {
4537 parse_quote!(scan)
4538 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4539 parse_quote!(scan_async_blocking)
4540 } else if let HydroNode::FoldKeyed { input, .. } = node {
4541 if input.metadata().location_id.is_top_level()
4542 && input.metadata().collection_kind.is_bounded()
4543 {
4544 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4545 } else {
4546 parse_quote!(fold_keyed)
4547 }
4548 } else {
4549 unreachable!()
4550 };
4551
4552 let (HydroNode::Fold { input, .. }
4553 | HydroNode::FoldKeyed { input, .. }
4554 | HydroNode::Scan { input, .. }
4555 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4556 else {
4557 unreachable!()
4558 };
4559
4560 let lifetime = if input.metadata().location_id.is_top_level() {
4561 quote!('static)
4562 } else {
4563 quote!('tick)
4564 };
4565
4566 let input_ident = ident_stack.pop().unwrap();
4567
4568 let (HydroNode::Fold { init, acc, .. }
4569 | HydroNode::FoldKeyed { init, acc, .. }
4570 | HydroNode::Scan { init, acc, .. }
4571 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4572 else {
4573 unreachable!()
4574 };
4575
4576 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4577 let init_tokens = init.emit_tokens(&mut ident_stack);
4578
4579 let stmt_id = next_stmt_id.get_and_increment();
4580 let fold_ident =
4581 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4582
4583 match builders_or_callback {
4584 BuildersOrCallback::Builders(graph_builders) => {
4585 if matches!(node, HydroNode::Fold { .. })
4586 && node.metadata().location_id.is_top_level()
4587 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4588 && graph_builders.singleton_intermediates()
4589 && !node.metadata().collection_kind.is_bounded()
4590 {
4591 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4592 let hooked_input_ident = graph_builders.emit_fold_hook(
4593 &input.metadata().location_id,
4594 &input_ident,
4595 &input.metadata().collection_kind,
4596 &node.metadata().op,
4597 );
4598
4599 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4600 let acc: syn::Expr = parse_quote!({
4601 let mut __inner = #acc_tokens;
4602 move |__state, __batch: Vec<_>| {
4603 if __batch.is_empty() {
4604 return None;
4605 }
4606 for __value in __batch {
4607 __inner(__state, __value);
4608 }
4609 Some(__state.clone())
4610 }
4611 });
4612 (hooked, acc)
4613 } else {
4614 let acc: syn::Expr = parse_quote!({
4615 let mut __inner = #acc_tokens;
4616 move |__state, __value| {
4617 __inner(__state, __value);
4618 Some(__state.clone())
4619 }
4620 });
4621 (&input_ident, acc)
4622 };
4623
4624 let builder = graph_builders.get_dfir_mut(&out_location);
4625 builder.add_dfir(
4626 parse_quote! {
4627 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4628 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4629 #fold_ident = chain();
4630 },
4631 None,
4632 Some(&stmt_id.to_string()),
4633 );
4634
4635 if hooked_input_ident.is_some() {
4636 fold_hooked_idents.insert(fold_ident.to_string());
4637 }
4638 } else if matches!(node, HydroNode::FoldKeyed { .. })
4639 && node.metadata().location_id.is_top_level()
4640 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4641 && graph_builders.singleton_intermediates()
4642 && !node.metadata().collection_kind.is_bounded()
4643 {
4644 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4645 let hooked_input_ident = graph_builders.emit_fold_hook(
4646 &input.metadata().location_id,
4647 &input_ident,
4648 &input.metadata().collection_kind,
4649 &node.metadata().op,
4650 );
4651 let builder = graph_builders.get_dfir_mut(&out_location);
4652
4653 let wrapped_acc: syn::Expr = parse_quote!({
4654 let mut __init = #init_tokens;
4655 let mut __inner = #acc_tokens;
4656 move |__state, __kv: (_, _)| {
4657 let __state = __state
4659 .entry(::std::clone::Clone::clone(&__kv.0))
4660 .or_insert_with(|| (__init)());
4661 __inner(__state, __kv.1);
4662 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4663 }
4664 });
4665
4666 if let Some(hooked_input_ident) = hooked_input_ident {
4667 builder.add_dfir(
4668 parse_quote! {
4669 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4670 },
4671 None,
4672 Some(&stmt_id.to_string()),
4673 );
4674
4675 fold_hooked_idents.insert(fold_ident.to_string());
4676 } else {
4677 builder.add_dfir(
4678 parse_quote! {
4679 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4680 },
4681 None,
4682 Some(&stmt_id.to_string()),
4683 );
4684 }
4685 } else if (matches!(node, HydroNode::Fold { .. })
4686 || matches!(node, HydroNode::FoldKeyed { .. }))
4687 && !node.metadata().location_id.is_top_level()
4688 && graph_builders.singleton_intermediates()
4689 {
4690 let input_ref = match &*node {
4691 HydroNode::Fold { input, .. } => input,
4692 HydroNode::FoldKeyed { input, .. } => input,
4693 _ => unreachable!(),
4694 };
4695 let hooked_input_ident = graph_builders.emit_fold_hook(
4696 &input_ref.metadata().location_id,
4697 &input_ident,
4698 &input_ref.metadata().collection_kind,
4699 &node.metadata().op,
4700 );
4701
4702 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4703 let builder = graph_builders.get_dfir_mut(&out_location);
4704 builder.add_dfir(
4705 parse_quote! {
4706 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4707 },
4708 None,
4709 Some(&stmt_id.to_string()),
4710 );
4711 } else {
4712 let builder = graph_builders.get_dfir_mut(&out_location);
4713 builder.add_dfir(
4714 parse_quote! {
4715 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4716 },
4717 None,
4718 Some(&stmt_id.to_string()),
4719 );
4720 }
4721 }
4722 BuildersOrCallback::Callback(_, node_callback) => {
4723 node_callback(node, next_stmt_id);
4724 }
4725 }
4726
4727 ident_stack.push(fold_ident);
4728 }
4729
4730 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4731 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4732 if input.metadata().location_id.is_top_level()
4733 && input.metadata().collection_kind.is_bounded()
4734 {
4735 parse_quote!(reduce_no_replay)
4736 } else {
4737 parse_quote!(reduce)
4738 }
4739 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4740 if input.metadata().location_id.is_top_level()
4741 && input.metadata().collection_kind.is_bounded()
4742 {
4743 todo!(
4744 "Calling keyed reduce on a top-level bounded collection is not supported"
4745 )
4746 } else {
4747 parse_quote!(reduce_keyed)
4748 }
4749 } else {
4750 unreachable!()
4751 };
4752
4753 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4754 else {
4755 unreachable!()
4756 };
4757
4758 let lifetime = if input.metadata().location_id.is_top_level() {
4759 quote!('static)
4760 } else {
4761 quote!('tick)
4762 };
4763
4764 let input_ident = ident_stack.pop().unwrap();
4765
4766 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4767 else {
4768 unreachable!()
4769 };
4770
4771 let f_tokens = f.emit_tokens(&mut ident_stack);
4772
4773 let stmt_id = next_stmt_id.get_and_increment();
4774 let reduce_ident =
4775 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4776
4777 match builders_or_callback {
4778 BuildersOrCallback::Builders(graph_builders) => {
4779 if matches!(node, HydroNode::Reduce { .. })
4780 && node.metadata().location_id.is_top_level()
4781 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4782 && graph_builders.singleton_intermediates()
4783 && !node.metadata().collection_kind.is_bounded()
4784 {
4785 todo!(
4786 "Reduce with optional intermediates is not yet supported in simulator"
4787 );
4788 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4789 && node.metadata().location_id.is_top_level()
4790 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4791 && graph_builders.singleton_intermediates()
4792 && !node.metadata().collection_kind.is_bounded()
4793 {
4794 todo!(
4795 "Reduce keyed with optional intermediates is not yet supported in simulator"
4796 );
4797 } else {
4798 let builder = graph_builders.get_dfir_mut(&out_location);
4799 builder.add_dfir(
4800 parse_quote! {
4801 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4802 },
4803 None,
4804 Some(&stmt_id.to_string()),
4805 );
4806 }
4807 }
4808 BuildersOrCallback::Callback(_, node_callback) => {
4809 node_callback(node, next_stmt_id);
4810 }
4811 }
4812
4813 ident_stack.push(reduce_ident);
4814 }
4815
4816 HydroNode::ReduceKeyedWatermark {
4817 f,
4818 input,
4819 metadata,
4820 ..
4821 } => {
4822 let lifetime = if input.metadata().location_id.is_top_level() {
4823 quote!('static)
4824 } else {
4825 quote!('tick)
4826 };
4827
4828 let watermark_ident = ident_stack.pop().unwrap();
4830 let input_ident = ident_stack.pop().unwrap();
4831 let f_tokens = f.emit_tokens(&mut ident_stack);
4832
4833 let stmt_id = next_stmt_id.get_and_increment();
4834 let chain_ident = syn::Ident::new(
4835 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4836 Span::call_site(),
4837 );
4838
4839 let fold_ident =
4840 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4841
4842 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4843 && input.metadata().collection_kind.is_bounded()
4844 {
4845 parse_quote!(fold_no_replay)
4846 } else {
4847 parse_quote!(fold)
4848 };
4849
4850 match builders_or_callback {
4851 BuildersOrCallback::Builders(graph_builders) => {
4852 if metadata.location_id.is_top_level()
4853 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4854 && graph_builders.singleton_intermediates()
4855 && !metadata.collection_kind.is_bounded()
4856 {
4857 todo!(
4858 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4859 )
4860 } else {
4861 let builder = graph_builders.get_dfir_mut(&out_location);
4862 builder.add_dfir(
4863 parse_quote! {
4864 #chain_ident = chain();
4865 #input_ident
4866 -> map(|x| (Some(x), None))
4867 -> [0]#chain_ident;
4868 #watermark_ident
4869 -> map(|watermark| (None, Some(watermark)))
4870 -> [1]#chain_ident;
4871
4872 #fold_ident = #chain_ident
4873 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4874 let __reduce_keyed_fn = #f_tokens;
4875 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4876 if let Some((k, v)) = opt_payload {
4877 if let Some(curr_watermark) = *opt_curr_watermark {
4878 if k < curr_watermark {
4879 return;
4880 }
4881 }
4882 match map.entry(k) {
4883 ::std::collections::hash_map::Entry::Vacant(e) => {
4884 e.insert(v);
4885 }
4886 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4887 __reduce_keyed_fn(e.get_mut(), v);
4888 }
4889 }
4890 } else {
4891 let watermark = opt_watermark.unwrap();
4892 if let Some(curr_watermark) = *opt_curr_watermark {
4893 if watermark <= curr_watermark {
4894 return;
4895 }
4896 }
4897 map.retain(|k, _| *k >= watermark);
4898 *opt_curr_watermark = Some(watermark);
4899 }
4900 }
4901 })
4902 -> flat_map(|(map, _curr_watermark)| map);
4903 },
4904 None,
4905 Some(&stmt_id.to_string()),
4906 );
4907 }
4908 }
4909 BuildersOrCallback::Callback(_, node_callback) => {
4910 node_callback(node, next_stmt_id);
4911 }
4912 }
4913
4914 ident_stack.push(fold_ident);
4915 }
4916
4917 HydroNode::Network {
4918 networking_info,
4919 serialize_fn: serialize_pipeline,
4920 instantiate_fn,
4921 deserialize_fn: deserialize_pipeline,
4922 input,
4923 ..
4924 } => {
4925 let input_ident = ident_stack.pop().unwrap();
4926
4927 let stmt_id = next_stmt_id.get_and_increment();
4928 let receiver_stream_ident =
4929 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4930
4931 match builders_or_callback {
4932 BuildersOrCallback::Builders(graph_builders) => {
4933 let (sink_expr, source_expr) = match instantiate_fn {
4934 DebugInstantiate::Building => (
4935 syn::parse_quote!(DUMMY_SINK),
4936 syn::parse_quote!(DUMMY_SOURCE),
4937 ),
4938
4939 DebugInstantiate::Finalized(finalized) => {
4940 (finalized.sink.clone(), finalized.source.clone())
4941 }
4942 };
4943
4944 graph_builders.create_network(
4945 &input.metadata().location_id,
4946 &out_location,
4947 input_ident,
4948 &receiver_stream_ident,
4949 serialize_pipeline.as_ref(),
4950 sink_expr,
4951 source_expr,
4952 deserialize_pipeline.as_ref(),
4953 stmt_id,
4954 networking_info,
4955 );
4956 }
4957 BuildersOrCallback::Callback(_, node_callback) => {
4958 node_callback(node, next_stmt_id);
4959 }
4960 }
4961
4962 ident_stack.push(receiver_stream_ident);
4963 }
4964
4965 HydroNode::ExternalInput {
4966 instantiate_fn,
4967 deserialize_fn: deserialize_pipeline,
4968 ..
4969 } => {
4970 let stmt_id = next_stmt_id.get_and_increment();
4971 let receiver_stream_ident =
4972 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4973
4974 match builders_or_callback {
4975 BuildersOrCallback::Builders(graph_builders) => {
4976 let (_, source_expr) = match instantiate_fn {
4977 DebugInstantiate::Building => (
4978 syn::parse_quote!(DUMMY_SINK),
4979 syn::parse_quote!(DUMMY_SOURCE),
4980 ),
4981
4982 DebugInstantiate::Finalized(finalized) => {
4983 (finalized.sink.clone(), finalized.source.clone())
4984 }
4985 };
4986
4987 graph_builders.create_external_source(
4988 &out_location,
4989 source_expr,
4990 &receiver_stream_ident,
4991 deserialize_pipeline.as_ref(),
4992 stmt_id,
4993 );
4994 }
4995 BuildersOrCallback::Callback(_, node_callback) => {
4996 node_callback(node, next_stmt_id);
4997 }
4998 }
4999
5000 ident_stack.push(receiver_stream_ident);
5001 }
5002
5003 HydroNode::Counter {
5004 tag,
5005 duration,
5006 prefix,
5007 ..
5008 } => {
5009 let input_ident = ident_stack.pop().unwrap();
5010
5011 let stmt_id = next_stmt_id.get_and_increment();
5012 let counter_ident =
5013 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5014
5015 match builders_or_callback {
5016 BuildersOrCallback::Builders(graph_builders) => {
5017 let arg = format!("{}({})", prefix, tag);
5018 let builder = graph_builders.get_dfir_mut(&out_location);
5019 builder.add_dfir(
5020 parse_quote! {
5021 #counter_ident = #input_ident -> _counter(#arg, #duration);
5022 },
5023 None,
5024 Some(&stmt_id.to_string()),
5025 );
5026 }
5027 BuildersOrCallback::Callback(_, node_callback) => {
5028 node_callback(node, next_stmt_id);
5029 }
5030 }
5031
5032 ident_stack.push(counter_ident);
5033 }
5034 }
5035 },
5036 seen_tees,
5037 false,
5038 );
5039
5040 let ret = ident_stack
5041 .pop()
5042 .expect("ident_stack should have exactly one element after traversal");
5043 assert!(
5044 ident_stack.is_empty(),
5045 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5046 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5047 ident_stack.len()
5048 );
5049 ret
5050 }
5051
5052 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5053 match self {
5054 HydroNode::Placeholder => {
5055 panic!()
5056 }
5057 HydroNode::Cast { .. }
5058 | HydroNode::ObserveNonDet { .. }
5059 | HydroNode::UnboundSingleton { .. }
5060 | HydroNode::AssertIsConsistent { .. } => {}
5061 HydroNode::Source { source, .. } => match source {
5062 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5063 HydroSource::ExternalNetwork()
5064 | HydroSource::Spin()
5065 | HydroSource::ClusterMembers(_, _)
5066 | HydroSource::Embedded(_)
5067 | HydroSource::EmbeddedSingleton(_) => {} },
5069 HydroNode::SingletonSource { value, .. } => {
5070 transform(value);
5071 }
5072 HydroNode::CycleSource { .. }
5073 | HydroNode::Tee { .. }
5074 | HydroNode::Reference { .. }
5075 | HydroNode::YieldConcat { .. }
5076 | HydroNode::BeginAtomic { .. }
5077 | HydroNode::EndAtomic { .. }
5078 | HydroNode::Batch { .. }
5079 | HydroNode::Chain { .. }
5080 | HydroNode::MergeOrdered { .. }
5081 | HydroNode::ChainFirst { .. }
5082 | HydroNode::CrossProduct { .. }
5083 | HydroNode::CrossSingleton { .. }
5084 | HydroNode::ResolveFutures { .. }
5085 | HydroNode::ResolveFuturesBlocking { .. }
5086 | HydroNode::ResolveFuturesOrdered { .. }
5087 | HydroNode::Join { .. }
5088 | HydroNode::JoinHalf { .. }
5089 | HydroNode::Difference { .. }
5090 | HydroNode::AntiJoin { .. }
5091 | HydroNode::DeferTick { .. }
5092 | HydroNode::Enumerate { .. }
5093 | HydroNode::Unique { .. }
5094 | HydroNode::Sort { .. } => {}
5095 HydroNode::Map { f, .. }
5096 | HydroNode::FlatMap { f, .. }
5097 | HydroNode::FlatMapStreamBlocking { f, .. }
5098 | HydroNode::Filter { f, .. }
5099 | HydroNode::FilterMap { f, .. }
5100 | HydroNode::Inspect { f, .. }
5101 | HydroNode::Partition { f, .. }
5102 | HydroNode::Reduce { f, .. }
5103 | HydroNode::ReduceKeyed { f, .. }
5104 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5105 transform(&mut f.expr);
5106 }
5107 HydroNode::Fold { init, acc, .. }
5108 | HydroNode::Scan { init, acc, .. }
5109 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5110 | HydroNode::FoldKeyed { init, acc, .. } => {
5111 transform(&mut init.expr);
5112 transform(&mut acc.expr);
5113 }
5114 HydroNode::Network {
5115 serialize_fn,
5116 deserialize_fn,
5117 ..
5118 } => {
5119 if let Some(serialize_fn) = serialize_fn {
5120 transform(serialize_fn);
5121 }
5122 if let Some(deserialize_fn) = deserialize_fn {
5123 transform(deserialize_fn);
5124 }
5125 }
5126 HydroNode::ExternalInput { deserialize_fn, .. } => {
5127 if let Some(deserialize_fn) = deserialize_fn {
5128 transform(deserialize_fn);
5129 }
5130 }
5131 HydroNode::Counter { duration, .. } => {
5132 transform(duration);
5133 }
5134 }
5135 }
5136
5137 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5138 &self.metadata().op
5139 }
5140
5141 pub fn metadata(&self) -> &HydroIrMetadata {
5142 match self {
5143 HydroNode::Placeholder => {
5144 panic!()
5145 }
5146 HydroNode::Cast { metadata, .. }
5147 | HydroNode::ObserveNonDet { metadata, .. }
5148 | HydroNode::AssertIsConsistent { metadata, .. }
5149 | HydroNode::UnboundSingleton { metadata, .. }
5150 | HydroNode::Source { metadata, .. }
5151 | HydroNode::SingletonSource { metadata, .. }
5152 | HydroNode::CycleSource { metadata, .. }
5153 | HydroNode::Tee { metadata, .. }
5154 | HydroNode::Reference { metadata, .. }
5155 | HydroNode::Partition { metadata, .. }
5156 | HydroNode::YieldConcat { metadata, .. }
5157 | HydroNode::BeginAtomic { metadata, .. }
5158 | HydroNode::EndAtomic { metadata, .. }
5159 | HydroNode::Batch { metadata, .. }
5160 | HydroNode::Chain { metadata, .. }
5161 | HydroNode::MergeOrdered { metadata, .. }
5162 | HydroNode::ChainFirst { metadata, .. }
5163 | HydroNode::CrossProduct { metadata, .. }
5164 | HydroNode::CrossSingleton { metadata, .. }
5165 | HydroNode::Join { metadata, .. }
5166 | HydroNode::JoinHalf { metadata, .. }
5167 | HydroNode::Difference { metadata, .. }
5168 | HydroNode::AntiJoin { metadata, .. }
5169 | HydroNode::ResolveFutures { metadata, .. }
5170 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5171 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5172 | HydroNode::Map { metadata, .. }
5173 | HydroNode::FlatMap { metadata, .. }
5174 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5175 | HydroNode::Filter { metadata, .. }
5176 | HydroNode::FilterMap { metadata, .. }
5177 | HydroNode::DeferTick { metadata, .. }
5178 | HydroNode::Enumerate { metadata, .. }
5179 | HydroNode::Inspect { metadata, .. }
5180 | HydroNode::Unique { metadata, .. }
5181 | HydroNode::Sort { metadata, .. }
5182 | HydroNode::Scan { metadata, .. }
5183 | HydroNode::ScanAsyncBlocking { metadata, .. }
5184 | HydroNode::Fold { metadata, .. }
5185 | HydroNode::FoldKeyed { metadata, .. }
5186 | HydroNode::Reduce { metadata, .. }
5187 | HydroNode::ReduceKeyed { metadata, .. }
5188 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5189 | HydroNode::ExternalInput { metadata, .. }
5190 | HydroNode::Network { metadata, .. }
5191 | HydroNode::Counter { metadata, .. } => metadata,
5192 }
5193 }
5194
5195 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5196 &mut self.metadata_mut().op
5197 }
5198
5199 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5200 match self {
5201 HydroNode::Placeholder => {
5202 panic!()
5203 }
5204 HydroNode::Cast { metadata, .. }
5205 | HydroNode::ObserveNonDet { metadata, .. }
5206 | HydroNode::AssertIsConsistent { metadata, .. }
5207 | HydroNode::UnboundSingleton { metadata, .. }
5208 | HydroNode::Source { metadata, .. }
5209 | HydroNode::SingletonSource { metadata, .. }
5210 | HydroNode::CycleSource { metadata, .. }
5211 | HydroNode::Tee { metadata, .. }
5212 | HydroNode::Reference { metadata, .. }
5213 | HydroNode::Partition { metadata, .. }
5214 | HydroNode::YieldConcat { metadata, .. }
5215 | HydroNode::BeginAtomic { metadata, .. }
5216 | HydroNode::EndAtomic { metadata, .. }
5217 | HydroNode::Batch { metadata, .. }
5218 | HydroNode::Chain { metadata, .. }
5219 | HydroNode::MergeOrdered { metadata, .. }
5220 | HydroNode::ChainFirst { metadata, .. }
5221 | HydroNode::CrossProduct { metadata, .. }
5222 | HydroNode::CrossSingleton { metadata, .. }
5223 | HydroNode::Join { metadata, .. }
5224 | HydroNode::JoinHalf { metadata, .. }
5225 | HydroNode::Difference { metadata, .. }
5226 | HydroNode::AntiJoin { metadata, .. }
5227 | HydroNode::ResolveFutures { metadata, .. }
5228 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5229 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5230 | HydroNode::Map { metadata, .. }
5231 | HydroNode::FlatMap { metadata, .. }
5232 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5233 | HydroNode::Filter { metadata, .. }
5234 | HydroNode::FilterMap { metadata, .. }
5235 | HydroNode::DeferTick { metadata, .. }
5236 | HydroNode::Enumerate { metadata, .. }
5237 | HydroNode::Inspect { metadata, .. }
5238 | HydroNode::Unique { metadata, .. }
5239 | HydroNode::Sort { metadata, .. }
5240 | HydroNode::Scan { metadata, .. }
5241 | HydroNode::ScanAsyncBlocking { metadata, .. }
5242 | HydroNode::Fold { metadata, .. }
5243 | HydroNode::FoldKeyed { metadata, .. }
5244 | HydroNode::Reduce { metadata, .. }
5245 | HydroNode::ReduceKeyed { metadata, .. }
5246 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5247 | HydroNode::ExternalInput { metadata, .. }
5248 | HydroNode::Network { metadata, .. }
5249 | HydroNode::Counter { metadata, .. } => metadata,
5250 }
5251 }
5252
5253 pub fn input(&self) -> Vec<&HydroNode> {
5254 match self {
5255 HydroNode::Placeholder => {
5256 panic!()
5257 }
5258 HydroNode::Source { .. }
5259 | HydroNode::SingletonSource { .. }
5260 | HydroNode::ExternalInput { .. }
5261 | HydroNode::CycleSource { .. }
5262 | HydroNode::Tee { .. }
5263 | HydroNode::Reference { .. }
5264 | HydroNode::Partition { .. } => {
5265 vec![]
5267 }
5268 HydroNode::Cast { inner, .. }
5269 | HydroNode::ObserveNonDet { inner, .. }
5270 | HydroNode::YieldConcat { inner, .. }
5271 | HydroNode::BeginAtomic { inner, .. }
5272 | HydroNode::EndAtomic { inner, .. }
5273 | HydroNode::Batch { inner, .. }
5274 | HydroNode::UnboundSingleton { inner, .. }
5275 | HydroNode::AssertIsConsistent { inner, .. } => {
5276 vec![inner]
5277 }
5278 HydroNode::Chain { first, second, .. } => {
5279 vec![first, second]
5280 }
5281 HydroNode::MergeOrdered { first, second, .. } => {
5282 vec![first, second]
5283 }
5284 HydroNode::ChainFirst { first, second, .. } => {
5285 vec![first, second]
5286 }
5287 HydroNode::CrossProduct { left, right, .. }
5288 | HydroNode::CrossSingleton { left, right, .. }
5289 | HydroNode::Join { left, right, .. }
5290 | HydroNode::JoinHalf { left, right, .. } => {
5291 vec![left, right]
5292 }
5293 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5294 vec![pos, neg]
5295 }
5296 HydroNode::Map { input, .. }
5297 | HydroNode::FlatMap { input, .. }
5298 | HydroNode::FlatMapStreamBlocking { input, .. }
5299 | HydroNode::Filter { input, .. }
5300 | HydroNode::FilterMap { input, .. }
5301 | HydroNode::Sort { input, .. }
5302 | HydroNode::DeferTick { input, .. }
5303 | HydroNode::Enumerate { input, .. }
5304 | HydroNode::Inspect { input, .. }
5305 | HydroNode::Unique { input, .. }
5306 | HydroNode::Network { input, .. }
5307 | HydroNode::Counter { input, .. }
5308 | HydroNode::ResolveFutures { input, .. }
5309 | HydroNode::ResolveFuturesBlocking { input, .. }
5310 | HydroNode::ResolveFuturesOrdered { input, .. }
5311 | HydroNode::Fold { input, .. }
5312 | HydroNode::FoldKeyed { input, .. }
5313 | HydroNode::Reduce { input, .. }
5314 | HydroNode::ReduceKeyed { input, .. }
5315 | HydroNode::Scan { input, .. }
5316 | HydroNode::ScanAsyncBlocking { input, .. } => {
5317 vec![input]
5318 }
5319 HydroNode::ReduceKeyedWatermark {
5320 input, watermark, ..
5321 } => {
5322 vec![input, watermark]
5323 }
5324 }
5325 }
5326
5327 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5328 self.input()
5329 .iter()
5330 .map(|input_node| input_node.metadata())
5331 .collect()
5332 }
5333
5334 pub fn is_shared_with_others(&self) -> bool {
5338 match self {
5339 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5340 Rc::strong_count(&inner.0) > 1
5341 }
5342 HydroNode::Reference { .. } => false,
5345 _ => false,
5346 }
5347 }
5348
5349 pub fn print_root(&self) -> String {
5350 match self {
5351 HydroNode::Placeholder => {
5352 panic!()
5353 }
5354 HydroNode::Cast { .. } => "Cast()".to_owned(),
5355 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5356 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5357 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5358 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5359 HydroNode::SingletonSource {
5360 value,
5361 first_tick_only,
5362 ..
5363 } => format!(
5364 "SingletonSource({:?}, first_tick_only={})",
5365 value, first_tick_only
5366 ),
5367 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5368 HydroNode::Tee { inner, .. } => {
5369 format!("Tee({})", inner.0.borrow().print_root())
5370 }
5371 HydroNode::Reference { inner, kind, .. } => {
5372 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5373 }
5374 HydroNode::Partition { f, is_true, .. } => {
5375 format!("Partition({:?}, is_true={})", f, is_true)
5376 }
5377 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5378 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5379 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5380 HydroNode::Batch { .. } => "Batch()".to_owned(),
5381 HydroNode::Chain { first, second, .. } => {
5382 format!("Chain({}, {})", first.print_root(), second.print_root())
5383 }
5384 HydroNode::MergeOrdered { first, second, .. } => {
5385 format!(
5386 "MergeOrdered({}, {})",
5387 first.print_root(),
5388 second.print_root()
5389 )
5390 }
5391 HydroNode::ChainFirst { first, second, .. } => {
5392 format!(
5393 "ChainFirst({}, {})",
5394 first.print_root(),
5395 second.print_root()
5396 )
5397 }
5398 HydroNode::CrossProduct { left, right, .. } => {
5399 format!(
5400 "CrossProduct({}, {})",
5401 left.print_root(),
5402 right.print_root()
5403 )
5404 }
5405 HydroNode::CrossSingleton { left, right, .. } => {
5406 format!(
5407 "CrossSingleton({}, {})",
5408 left.print_root(),
5409 right.print_root()
5410 )
5411 }
5412 HydroNode::Join { left, right, .. } => {
5413 format!("Join({}, {})", left.print_root(), right.print_root())
5414 }
5415 HydroNode::JoinHalf { left, right, .. } => {
5416 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5417 }
5418 HydroNode::Difference { pos, neg, .. } => {
5419 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5420 }
5421 HydroNode::AntiJoin { pos, neg, .. } => {
5422 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5423 }
5424 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5425 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5426 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5427 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5428 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5429 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5430 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5431 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5432 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5433 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5434 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5435 HydroNode::Unique { .. } => "Unique()".to_owned(),
5436 HydroNode::Sort { .. } => "Sort()".to_owned(),
5437 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5438 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5439 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5440 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5441 }
5442 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5443 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5444 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5445 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5446 HydroNode::Network { .. } => "Network()".to_owned(),
5447 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5448 HydroNode::Counter { tag, duration, .. } => {
5449 format!("Counter({:?}, {:?})", tag, duration)
5450 }
5451 }
5452 }
5453}
5454
5455#[cfg(feature = "build")]
5456fn instantiate_network<'a, D>(
5457 env: &mut D::InstantiateEnv,
5458 from_location: &LocationId,
5459 to_location: &LocationId,
5460 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5461 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5462 name: Option<&str>,
5463 networking_info: &crate::networking::NetworkingInfo,
5464) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5465where
5466 D: Deploy<'a>,
5467{
5468 let ((sink, source), connect_fn) = match (from_location, to_location) {
5469 (&LocationId::Process(from), &LocationId::Process(to)) => {
5470 let from_node = processes
5471 .get(from)
5472 .unwrap_or_else(|| {
5473 panic!("A process used in the graph was not instantiated: {}", from)
5474 })
5475 .clone();
5476 let to_node = processes
5477 .get(to)
5478 .unwrap_or_else(|| {
5479 panic!("A process used in the graph was not instantiated: {}", to)
5480 })
5481 .clone();
5482
5483 let sink_port = from_node.next_port();
5484 let source_port = to_node.next_port();
5485
5486 (
5487 D::o2o_sink_source(
5488 env,
5489 &from_node,
5490 &sink_port,
5491 &to_node,
5492 &source_port,
5493 name,
5494 networking_info,
5495 ),
5496 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5497 )
5498 }
5499 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5500 let from_node = processes
5501 .get(from)
5502 .unwrap_or_else(|| {
5503 panic!("A process used in the graph was not instantiated: {}", from)
5504 })
5505 .clone();
5506 let to_node = clusters
5507 .get(to)
5508 .unwrap_or_else(|| {
5509 panic!("A cluster used in the graph was not instantiated: {}", to)
5510 })
5511 .clone();
5512
5513 let sink_port = from_node.next_port();
5514 let source_port = to_node.next_port();
5515
5516 (
5517 D::o2m_sink_source(
5518 env,
5519 &from_node,
5520 &sink_port,
5521 &to_node,
5522 &source_port,
5523 name,
5524 networking_info,
5525 ),
5526 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5527 )
5528 }
5529 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5530 let from_node = clusters
5531 .get(from)
5532 .unwrap_or_else(|| {
5533 panic!("A cluster used in the graph was not instantiated: {}", from)
5534 })
5535 .clone();
5536 let to_node = processes
5537 .get(to)
5538 .unwrap_or_else(|| {
5539 panic!("A process used in the graph was not instantiated: {}", to)
5540 })
5541 .clone();
5542
5543 let sink_port = from_node.next_port();
5544 let source_port = to_node.next_port();
5545
5546 (
5547 D::m2o_sink_source(
5548 env,
5549 &from_node,
5550 &sink_port,
5551 &to_node,
5552 &source_port,
5553 name,
5554 networking_info,
5555 ),
5556 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5557 )
5558 }
5559 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5560 let from_node = clusters
5561 .get(from)
5562 .unwrap_or_else(|| {
5563 panic!("A cluster used in the graph was not instantiated: {}", from)
5564 })
5565 .clone();
5566 let to_node = clusters
5567 .get(to)
5568 .unwrap_or_else(|| {
5569 panic!("A cluster used in the graph was not instantiated: {}", to)
5570 })
5571 .clone();
5572
5573 let sink_port = from_node.next_port();
5574 let source_port = to_node.next_port();
5575
5576 (
5577 D::m2m_sink_source(
5578 env,
5579 &from_node,
5580 &sink_port,
5581 &to_node,
5582 &source_port,
5583 name,
5584 networking_info,
5585 ),
5586 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5587 )
5588 }
5589 (LocationId::Tick(_, _), _) => panic!(),
5590 (_, LocationId::Tick(_, _)) => panic!(),
5591 (LocationId::Atomic(_), _) => panic!(),
5592 (_, LocationId::Atomic(_)) => panic!(),
5593 };
5594 (sink, source, connect_fn)
5595}
5596
5597#[cfg(test)]
5598mod serde_test;
5599
5600#[cfg(test)]
5601mod test {
5602 use std::mem::size_of;
5603
5604 use stageleft::{QuotedWithContext, q};
5605
5606 use super::*;
5607
5608 #[test]
5609 #[cfg_attr(
5610 not(feature = "build"),
5611 ignore = "expects inclusion of feature-gated fields"
5612 )]
5613 fn hydro_node_size() {
5614 assert_eq!(size_of::<HydroNode>(), 264);
5615 }
5616
5617 #[test]
5618 #[cfg_attr(
5619 not(feature = "build"),
5620 ignore = "expects inclusion of feature-gated fields"
5621 )]
5622 fn hydro_root_size() {
5623 assert_eq!(size_of::<HydroRoot>(), 136);
5624 }
5625
5626 #[test]
5627 fn test_simplify_q_macro_basic() {
5628 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5630 let result = simplify_q_macro(simple_expr.clone());
5631 assert_eq!(result, simple_expr);
5632 }
5633
5634 #[test]
5635 fn test_simplify_q_macro_actual_stageleft_call() {
5636 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5638 let result = simplify_q_macro(stageleft_call);
5639 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5642 }
5643
5644 #[test]
5645 fn test_closure_no_pipe_at_start() {
5646 let stageleft_call = q!({
5648 let foo = 123;
5649 move |b: usize| b + foo
5650 })
5651 .splice_fn1_ctx(&());
5652 let result = simplify_q_macro(stageleft_call);
5653 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5654 }
5655}