1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602
603 fn create_versioned_network_fork(
604 &mut self,
605 channel_id: u32,
606 dest: &LocationId,
607 senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
608 tag_id: StmtId,
609 );
610
611 fn create_versioned_network(
612 &mut self,
613 channel_id: u32,
614 source: &LocationId,
615 dest: &LocationId,
616 out_ident: &syn::Ident,
617 deserialize: Option<&DebugExpr>,
618 tag_id: StmtId,
619 );
620}
621
622#[cfg(feature = "build")]
623impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
624 fn singleton_intermediates(&self) -> bool {
625 false
626 }
627
628 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
629 self.entry(location.root().key())
630 .expect("location was removed")
631 .or_default()
632 }
633
634 fn batch(
635 &mut self,
636 in_ident: syn::Ident,
637 in_location: &LocationId,
638 in_kind: &CollectionKind,
639 out_ident: &syn::Ident,
640 _out_location: &LocationId,
641 _op_meta: &HydroIrOpMetadata,
642 _fold_hooked_idents: &HashSet<String>,
643 ) {
644 let builder = self.get_dfir_mut(in_location.root());
645 if in_kind.is_bounded()
646 && matches!(
647 in_kind,
648 CollectionKind::Singleton { .. }
649 | CollectionKind::Optional { .. }
650 | CollectionKind::KeyedSingleton { .. }
651 )
652 {
653 assert!(in_location.is_top_level());
654 builder.add_dfir(
655 parse_quote! {
656 #out_ident = #in_ident -> persist::<'static>();
657 },
658 None,
659 None,
660 );
661 } else {
662 builder.add_dfir(
663 parse_quote! {
664 #out_ident = #in_ident;
665 },
666 None,
667 None,
668 );
669 }
670 }
671
672 fn yield_from_tick(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 ) {
680 let builder = self.get_dfir_mut(in_location.root());
681 builder.add_dfir(
682 parse_quote! {
683 #out_ident = #in_ident;
684 },
685 None,
686 None,
687 );
688 }
689
690 fn begin_atomic(
691 &mut self,
692 in_ident: syn::Ident,
693 in_location: &LocationId,
694 _in_kind: &CollectionKind,
695 out_ident: &syn::Ident,
696 _out_location: &LocationId,
697 _op_meta: &HydroIrOpMetadata,
698 ) {
699 let builder = self.get_dfir_mut(in_location.root());
700 builder.add_dfir(
701 parse_quote! {
702 #out_ident = #in_ident;
703 },
704 None,
705 None,
706 );
707 }
708
709 fn end_atomic(
710 &mut self,
711 in_ident: syn::Ident,
712 in_location: &LocationId,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 ) {
716 let builder = self.get_dfir_mut(in_location.root());
717 builder.add_dfir(
718 parse_quote! {
719 #out_ident = #in_ident;
720 },
721 None,
722 None,
723 );
724 }
725
726 fn observe_nondet(
727 &mut self,
728 _trusted: bool,
729 location: &LocationId,
730 in_ident: syn::Ident,
731 _in_kind: &CollectionKind,
732 out_ident: &syn::Ident,
733 _out_kind: &CollectionKind,
734 _op_meta: &HydroIrOpMetadata,
735 ) {
736 let builder = self.get_dfir_mut(location);
737 builder.add_dfir(
738 parse_quote! {
739 #out_ident = #in_ident;
740 },
741 None,
742 None,
743 );
744 }
745
746 fn merge_ordered(
747 &mut self,
748 location: &LocationId,
749 first_ident: syn::Ident,
750 second_ident: syn::Ident,
751 out_ident: &syn::Ident,
752 _in_kind: &CollectionKind,
753 _op_meta: &HydroIrOpMetadata,
754 operator_tag: Option<&str>,
755 ) {
756 let builder = self.get_dfir_mut(location);
757 builder.add_dfir(
758 parse_quote! {
759 #out_ident = union();
760 #first_ident -> [0]#out_ident;
761 #second_ident -> [1]#out_ident;
762 },
763 None,
764 operator_tag,
765 );
766 }
767
768 fn create_network(
769 &mut self,
770 from: &LocationId,
771 to: &LocationId,
772 input_ident: syn::Ident,
773 out_ident: &syn::Ident,
774 serialize: Option<&DebugExpr>,
775 sink: syn::Expr,
776 source: syn::Expr,
777 deserialize: Option<&DebugExpr>,
778 tag_id: StmtId,
779 _networking_info: &crate::networking::NetworkingInfo,
780 ) {
781 let sender_builder = self.get_dfir_mut(from);
782 if let Some(serialize_pipeline) = serialize {
783 sender_builder.add_dfir(
784 parse_quote! {
785 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
786 },
787 None,
788 Some(&format!("send{}", tag_id)),
790 );
791 } else {
792 sender_builder.add_dfir(
793 parse_quote! {
794 #input_ident -> dest_sink(#sink);
795 },
796 None,
797 Some(&format!("send{}", tag_id)),
798 );
799 }
800
801 let receiver_builder = self.get_dfir_mut(to);
802 if let Some(deserialize_pipeline) = deserialize {
803 receiver_builder.add_dfir(
804 parse_quote! {
805 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
806 },
807 None,
808 Some(&format!("recv{}", tag_id)),
809 );
810 } else {
811 receiver_builder.add_dfir(
812 parse_quote! {
813 #out_ident = source_stream(#source);
814 },
815 None,
816 Some(&format!("recv{}", tag_id)),
817 );
818 }
819 }
820
821 fn create_external_source(
822 &mut self,
823 on: &LocationId,
824 source_expr: syn::Expr,
825 out_ident: &syn::Ident,
826 deserialize: Option<&DebugExpr>,
827 tag_id: StmtId,
828 ) {
829 let receiver_builder = self.get_dfir_mut(on);
830 if let Some(deserialize_pipeline) = deserialize {
831 receiver_builder.add_dfir(
832 parse_quote! {
833 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
834 },
835 None,
836 Some(&format!("recv{}", tag_id)),
837 );
838 } else {
839 receiver_builder.add_dfir(
840 parse_quote! {
841 #out_ident = source_stream(#source_expr);
842 },
843 None,
844 Some(&format!("recv{}", tag_id)),
845 );
846 }
847 }
848
849 fn create_external_output(
850 &mut self,
851 on: &LocationId,
852 sink_expr: syn::Expr,
853 input_ident: &syn::Ident,
854 serialize: Option<&DebugExpr>,
855 tag_id: StmtId,
856 ) {
857 let sender_builder = self.get_dfir_mut(on);
858 if let Some(serialize_fn) = serialize {
859 sender_builder.add_dfir(
860 parse_quote! {
861 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
862 },
863 None,
864 Some(&format!("send{}", tag_id)),
866 );
867 } else {
868 sender_builder.add_dfir(
869 parse_quote! {
870 #input_ident -> dest_sink(#sink_expr);
871 },
872 None,
873 Some(&format!("send{}", tag_id)),
874 );
875 }
876 }
877
878 fn emit_fold_hook(
879 &mut self,
880 _location: &LocationId,
881 _in_ident: &syn::Ident,
882 _in_kind: &CollectionKind,
883 _op_meta: &HydroIrOpMetadata,
884 ) -> Option<syn::Ident> {
885 None
886 }
887
888 fn assert_is_consistent(
889 &mut self,
890 _trusted: bool,
891 location: &LocationId,
892 in_ident: syn::Ident,
893 out_ident: &syn::Ident,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904
905 fn observe_for_mut(
906 &mut self,
907 location: &LocationId,
908 in_ident: syn::Ident,
909 _in_kind: &CollectionKind,
910 out_ident: &syn::Ident,
911 _op_meta: &HydroIrOpMetadata,
912 ) {
913 let builder = self.get_dfir_mut(location);
914 builder.add_dfir(
915 parse_quote! {
916 #out_ident = #in_ident;
917 },
918 None,
919 None,
920 );
921 }
922
923 fn create_versioned_network_fork(
924 &mut self,
925 _channel_id: u32,
926 _dest: &LocationId,
927 _senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
928 _tag_id: StmtId,
929 ) {
930 unreachable!(
931 "HydroNode::VersionedNetworkFork is only produced by the multi-version simulator merge \
932 pass and cannot be emitted by the non-simulation builder"
933 );
934 }
935
936 fn create_versioned_network(
937 &mut self,
938 _channel_id: u32,
939 _source: &LocationId,
940 _dest: &LocationId,
941 _out_ident: &syn::Ident,
942 _deserialize: Option<&DebugExpr>,
943 _tag_id: StmtId,
944 ) {
945 unreachable!(
946 "HydroNode::VersionedNetwork is only produced by the multi-version simulator merge \
947 pass and cannot be emitted by the non-simulation builder"
948 );
949 }
950}
951
952#[cfg(feature = "build")]
953pub enum BuildersOrCallback<'a, L, N>
954where
955 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
956 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
957{
958 Builders(&'a mut dyn DfirBuilder),
959 Callback(L, N),
960}
961
962#[derive(Debug, Hash, serde::Serialize)]
966pub enum HydroRoot {
967 ForEach {
968 f: ClosureExpr,
969 input: Box<HydroNode>,
970 op_metadata: HydroIrOpMetadata,
971 },
972 SendExternal {
973 to_external_key: LocationKey,
974 to_port_id: ExternalPortId,
975 to_many: bool,
976 unpaired: bool,
977 serialize_fn: Option<DebugExpr>,
978 instantiate_fn: DebugInstantiate,
979 input: Box<HydroNode>,
980 op_metadata: HydroIrOpMetadata,
981 },
982 DestSink {
983 sink: DebugExpr,
984 input: Box<HydroNode>,
985 op_metadata: HydroIrOpMetadata,
986 },
987 CycleSink {
988 cycle_id: CycleId,
989 input: Box<HydroNode>,
990 op_metadata: HydroIrOpMetadata,
991 },
992 EmbeddedOutput {
993 #[serde(serialize_with = "serialize_ident")]
994 ident: syn::Ident,
995 input: Box<HydroNode>,
996 op_metadata: HydroIrOpMetadata,
997 },
998 Null {
999 input: Box<HydroNode>,
1000 op_metadata: HydroIrOpMetadata,
1001 },
1002}
1003
1004impl HydroRoot {
1005 #[cfg(feature = "build")]
1006 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1007 pub fn compile_network<'a, D>(
1008 &mut self,
1009 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1010 seen_tees: &mut SeenSharedNodes,
1011 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1012 processes: &SparseSecondaryMap<LocationKey, D::Process>,
1013 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1014 externals: &SparseSecondaryMap<LocationKey, D::External>,
1015 env: &mut D::InstantiateEnv,
1016 ) where
1017 D: Deploy<'a>,
1018 {
1019 let refcell_extra_stmts = RefCell::new(extra_stmts);
1020 let refcell_env = RefCell::new(env);
1021 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1022 self.transform_bottom_up(
1023 &mut |l| {
1024 if let HydroRoot::SendExternal {
1025 #[cfg(feature = "tokio")]
1026 input,
1027 #[cfg(feature = "tokio")]
1028 to_external_key,
1029 #[cfg(feature = "tokio")]
1030 to_port_id,
1031 #[cfg(feature = "tokio")]
1032 to_many,
1033 #[cfg(feature = "tokio")]
1034 unpaired,
1035 #[cfg(feature = "tokio")]
1036 instantiate_fn,
1037 ..
1038 } = l
1039 {
1040 #[cfg(feature = "tokio")]
1041 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1042 DebugInstantiate::Building => {
1043 let to_node = externals
1044 .get(*to_external_key)
1045 .unwrap_or_else(|| {
1046 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1047 })
1048 .clone();
1049
1050 match input.metadata().location_id.root() {
1051 &LocationId::Process(process_key) => {
1052 if *to_many {
1053 (
1054 (
1055 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1056 parse_quote!(DUMMY),
1057 ),
1058 Box::new(|| {}) as Box<dyn FnOnce()>,
1059 )
1060 } else {
1061 let from_node = processes
1062 .get(process_key)
1063 .unwrap_or_else(|| {
1064 panic!("A process used in the graph was not instantiated: {}", process_key)
1065 })
1066 .clone();
1067
1068 let sink_port = from_node.next_port();
1069 let source_port = to_node.next_port();
1070
1071 if *unpaired {
1072 use stageleft::quote_type;
1073 use tokio_util::codec::LengthDelimitedCodec;
1074
1075 to_node.register(*to_port_id, source_port.clone());
1076
1077 let _ = D::e2o_source(
1078 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1079 &to_node, &source_port,
1080 &from_node, &sink_port,
1081 "e_type::<LengthDelimitedCodec>(),
1082 format!("{}_{}", *to_external_key, *to_port_id)
1083 );
1084 }
1085
1086 (
1087 (
1088 D::o2e_sink(
1089 &from_node,
1090 &sink_port,
1091 &to_node,
1092 &source_port,
1093 format!("{}_{}", *to_external_key, *to_port_id)
1094 ),
1095 parse_quote!(DUMMY),
1096 ),
1097 if *unpaired {
1098 D::e2o_connect(
1099 &to_node,
1100 &source_port,
1101 &from_node,
1102 &sink_port,
1103 *to_many,
1104 NetworkHint::Auto,
1105 )
1106 } else {
1107 Box::new(|| {}) as Box<dyn FnOnce()>
1108 },
1109 )
1110 }
1111 }
1112 LocationId::Cluster(cluster_key) => {
1113 let from_node = clusters
1114 .get(*cluster_key)
1115 .unwrap_or_else(|| {
1116 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1117 })
1118 .clone();
1119
1120 let sink_port = from_node.next_port();
1121 let source_port = to_node.next_port();
1122
1123 if *unpaired {
1124 to_node.register(*to_port_id, source_port.clone());
1125 }
1126
1127 (
1128 (
1129 D::m2e_sink(
1130 &from_node,
1131 &sink_port,
1132 &to_node,
1133 &source_port,
1134 format!("{}_{}", *to_external_key, *to_port_id)
1135 ),
1136 parse_quote!(DUMMY),
1137 ),
1138 Box::new(|| {}) as Box<dyn FnOnce()>,
1139 )
1140 }
1141 _ => panic!()
1142 }
1143 },
1144
1145 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1146 };
1147
1148 #[cfg(not(feature = "tokio"))]
1149 {
1150 panic!("Cannot instantiate external inputs without tokio");
1151 };
1152
1153 #[cfg(feature = "tokio")]
1154 {
1155 *instantiate_fn = DebugInstantiateFinalized {
1156 sink: sink_expr,
1157 source: source_expr,
1158 connect_fn: Some(connect_fn),
1159 }
1160 .into();
1161 };
1162 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1163 let element_type = match &input.metadata().collection_kind {
1164 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1165 _ => panic!("Embedded output must have Stream collection kind"),
1166 };
1167 let location_key = match input.metadata().location_id.root() {
1168 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1169 _ => panic!("Embedded output must be on a process or cluster"),
1170 };
1171 D::register_embedded_output(
1172 &mut refcell_env.borrow_mut(),
1173 location_key,
1174 ident,
1175 &element_type,
1176 );
1177 }
1178 },
1179 &mut |n| {
1180 if let HydroNode::Network {
1181 name,
1182 networking_info,
1183 input,
1184 instantiate_fn,
1185 metadata,
1186 ..
1187 } = n
1188 {
1189 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1190 DebugInstantiate::Building => instantiate_network::<D>(
1191 &mut refcell_env.borrow_mut(),
1192 input.metadata().location_id.root(),
1193 metadata.location_id.root(),
1194 processes,
1195 clusters,
1196 name.as_deref(),
1197 networking_info,
1198 ),
1199
1200 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1201 };
1202
1203 *instantiate_fn = DebugInstantiateFinalized {
1204 sink: sink_expr,
1205 source: source_expr,
1206 connect_fn: Some(connect_fn),
1207 }
1208 .into();
1209 } else if let HydroNode::ExternalInput {
1210 from_external_key,
1211 from_port_id,
1212 from_many,
1213 codec_type,
1214 port_hint,
1215 instantiate_fn,
1216 metadata,
1217 ..
1218 } = n
1219 {
1220 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1221 DebugInstantiate::Building => {
1222 let from_node = externals
1223 .get(*from_external_key)
1224 .unwrap_or_else(|| {
1225 panic!(
1226 "A external used in the graph was not instantiated: {}",
1227 from_external_key,
1228 )
1229 })
1230 .clone();
1231
1232 match metadata.location_id.root() {
1233 &LocationId::Process(process_key) => {
1234 let to_node = processes
1235 .get(process_key)
1236 .unwrap_or_else(|| {
1237 panic!("A process used in the graph was not instantiated: {}", process_key)
1238 })
1239 .clone();
1240
1241 let sink_port = from_node.next_port();
1242 let source_port = to_node.next_port();
1243
1244 from_node.register(*from_port_id, sink_port.clone());
1245
1246 (
1247 (
1248 parse_quote!(DUMMY),
1249 if *from_many {
1250 D::e2o_many_source(
1251 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1252 &to_node, &source_port,
1253 codec_type.0.as_ref(),
1254 format!("{}_{}", *from_external_key, *from_port_id)
1255 )
1256 } else {
1257 D::e2o_source(
1258 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1259 &from_node, &sink_port,
1260 &to_node, &source_port,
1261 codec_type.0.as_ref(),
1262 format!("{}_{}", *from_external_key, *from_port_id)
1263 )
1264 },
1265 ),
1266 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1267 )
1268 }
1269 LocationId::Cluster(cluster_key) => {
1270 let to_node = clusters
1271 .get(*cluster_key)
1272 .unwrap_or_else(|| {
1273 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1274 })
1275 .clone();
1276
1277 let sink_port = from_node.next_port();
1278 let source_port = to_node.next_port();
1279
1280 from_node.register(*from_port_id, sink_port.clone());
1281
1282 (
1283 (
1284 parse_quote!(DUMMY),
1285 D::e2m_source(
1286 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1287 &from_node, &sink_port,
1288 &to_node, &source_port,
1289 codec_type.0.as_ref(),
1290 format!("{}_{}", *from_external_key, *from_port_id)
1291 ),
1292 ),
1293 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1294 )
1295 }
1296 _ => panic!()
1297 }
1298 },
1299
1300 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1301 };
1302
1303 *instantiate_fn = DebugInstantiateFinalized {
1304 sink: sink_expr,
1305 source: source_expr,
1306 connect_fn: Some(connect_fn),
1307 }
1308 .into();
1309 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1310 let element_type = match &metadata.collection_kind {
1311 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1312 _ => panic!("Embedded source must have Stream collection kind"),
1313 };
1314 let location_key = match metadata.location_id.root() {
1315 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1316 _ => panic!("Embedded source must be on a process or cluster"),
1317 };
1318 D::register_embedded_stream_input(
1319 &mut refcell_env.borrow_mut(),
1320 location_key,
1321 ident,
1322 &element_type,
1323 );
1324 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1325 let element_type = match &metadata.collection_kind {
1326 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1327 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1328 };
1329 let location_key = match metadata.location_id.root() {
1330 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1331 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1332 };
1333 D::register_embedded_singleton_input(
1334 &mut refcell_env.borrow_mut(),
1335 location_key,
1336 ident,
1337 &element_type,
1338 );
1339 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1340 match state {
1341 ClusterMembersState::Uninit => {
1342 let at_location = metadata.location_id.root().clone();
1343 let key = (at_location.clone(), location_id.key());
1344 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1345 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1347 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1348 &(),
1349 );
1350 *state = ClusterMembersState::Stream(expr.into());
1351 } else {
1352 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1354 }
1355 }
1356 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1357 panic!("cluster members already finalized");
1358 }
1359 }
1360 }
1361 },
1362 seen_tees,
1363 false,
1364 );
1365 }
1366
1367 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1368 self.transform_bottom_up(
1369 &mut |l| {
1370 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1371 match instantiate_fn {
1372 DebugInstantiate::Building => panic!("network not built"),
1373
1374 DebugInstantiate::Finalized(finalized) => {
1375 (finalized.connect_fn.take().unwrap())();
1376 }
1377 }
1378 }
1379 },
1380 &mut |n| {
1381 if let HydroNode::Network { instantiate_fn, .. }
1382 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1383 {
1384 match instantiate_fn {
1385 DebugInstantiate::Building => panic!("network not built"),
1386
1387 DebugInstantiate::Finalized(finalized) => {
1388 (finalized.connect_fn.take().unwrap())();
1389 }
1390 }
1391 }
1392 },
1393 seen_tees,
1394 false,
1395 );
1396 }
1397
1398 pub fn transform_bottom_up(
1399 &mut self,
1400 transform_root: &mut impl FnMut(&mut HydroRoot),
1401 transform_node: &mut impl FnMut(&mut HydroNode),
1402 seen_tees: &mut SeenSharedNodes,
1403 check_well_formed: bool,
1404 ) {
1405 self.transform_children(
1406 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1407 seen_tees,
1408 );
1409
1410 transform_root(self);
1411 }
1412
1413 pub fn transform_children(
1414 &mut self,
1415 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1416 seen_tees: &mut SeenSharedNodes,
1417 ) {
1418 match self {
1419 HydroRoot::ForEach { f, input, .. } => {
1420 f.transform_children(&mut transform, seen_tees);
1421 transform(input, seen_tees);
1422 }
1423 HydroRoot::SendExternal { input, .. }
1424 | HydroRoot::DestSink { input, .. }
1425 | HydroRoot::CycleSink { input, .. }
1426 | HydroRoot::EmbeddedOutput { input, .. }
1427 | HydroRoot::Null { input, .. } => {
1428 transform(input, seen_tees);
1429 }
1430 }
1431 }
1432
1433 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1434 match self {
1435 HydroRoot::ForEach {
1436 f,
1437 input,
1438 op_metadata,
1439 } => HydroRoot::ForEach {
1440 f: f.deep_clone(seen_tees),
1441 input: Box::new(input.deep_clone(seen_tees)),
1442 op_metadata: op_metadata.clone(),
1443 },
1444 HydroRoot::SendExternal {
1445 to_external_key,
1446 to_port_id,
1447 to_many,
1448 unpaired,
1449 serialize_fn,
1450 instantiate_fn,
1451 input,
1452 op_metadata,
1453 } => HydroRoot::SendExternal {
1454 to_external_key: *to_external_key,
1455 to_port_id: *to_port_id,
1456 to_many: *to_many,
1457 unpaired: *unpaired,
1458 serialize_fn: serialize_fn.clone(),
1459 instantiate_fn: instantiate_fn.clone(),
1460 input: Box::new(input.deep_clone(seen_tees)),
1461 op_metadata: op_metadata.clone(),
1462 },
1463 HydroRoot::DestSink {
1464 sink,
1465 input,
1466 op_metadata,
1467 } => HydroRoot::DestSink {
1468 sink: sink.clone(),
1469 input: Box::new(input.deep_clone(seen_tees)),
1470 op_metadata: op_metadata.clone(),
1471 },
1472 HydroRoot::CycleSink {
1473 cycle_id,
1474 input,
1475 op_metadata,
1476 } => HydroRoot::CycleSink {
1477 cycle_id: *cycle_id,
1478 input: Box::new(input.deep_clone(seen_tees)),
1479 op_metadata: op_metadata.clone(),
1480 },
1481 HydroRoot::EmbeddedOutput {
1482 ident,
1483 input,
1484 op_metadata,
1485 } => HydroRoot::EmbeddedOutput {
1486 ident: ident.clone(),
1487 input: Box::new(input.deep_clone(seen_tees)),
1488 op_metadata: op_metadata.clone(),
1489 },
1490 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1491 input: Box::new(input.deep_clone(seen_tees)),
1492 op_metadata: op_metadata.clone(),
1493 },
1494 }
1495 }
1496
1497 #[cfg(feature = "build")]
1498 pub fn emit(
1499 &mut self,
1500 graph_builders: &mut dyn DfirBuilder,
1501 seen_tees: &mut SeenSharedNodes,
1502 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1503 next_stmt_id: &mut crate::Counter<StmtId>,
1504 fold_hooked_idents: &mut HashSet<String>,
1505 ) {
1506 self.emit_core(
1507 &mut BuildersOrCallback::<
1508 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1509 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1510 >::Builders(graph_builders),
1511 seen_tees,
1512 built_tees,
1513 next_stmt_id,
1514 fold_hooked_idents,
1515 );
1516 }
1517
1518 #[cfg(feature = "build")]
1519 pub fn emit_core(
1520 &mut self,
1521 builders_or_callback: &mut BuildersOrCallback<
1522 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1523 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1524 >,
1525 seen_tees: &mut SeenSharedNodes,
1526 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1527 next_stmt_id: &mut crate::Counter<StmtId>,
1528 fold_hooked_idents: &mut HashSet<String>,
1529 ) {
1530 match self {
1531 HydroRoot::ForEach { f, input, .. } => {
1532 let input_ident = input.emit_core(
1533 builders_or_callback,
1534 seen_tees,
1535 built_tees,
1536 next_stmt_id,
1537 fold_hooked_idents,
1538 );
1539
1540 let stmt_id = next_stmt_id.get_and_increment();
1541
1542 match builders_or_callback {
1543 BuildersOrCallback::Builders(graph_builders) => {
1544 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1545
1546 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1548 let HydroNode::Reference { inner, .. } = ref_node else {
1549 panic!("singleton_refs should only contain HydroNode::Reference");
1550 };
1551 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1552 let idents = built_tees.get(&ptr).expect(
1553 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1554 );
1555 ident_stack.push(idents[0].clone());
1556 }
1557
1558 let f_tokens = f.emit_tokens(&mut ident_stack);
1559
1560 graph_builders
1561 .get_dfir_mut(&input.metadata().location_id)
1562 .add_dfir(
1563 parse_quote! {
1564 #input_ident -> for_each(#f_tokens);
1565 },
1566 None,
1567 Some(&stmt_id.to_string()),
1568 );
1569 }
1570 BuildersOrCallback::Callback(leaf_callback, _) => {
1571 leaf_callback(self, next_stmt_id);
1572 }
1573 }
1574 }
1575
1576 HydroRoot::SendExternal {
1577 serialize_fn,
1578 instantiate_fn,
1579 input,
1580 ..
1581 } => {
1582 let input_ident = input.emit_core(
1583 builders_or_callback,
1584 seen_tees,
1585 built_tees,
1586 next_stmt_id,
1587 fold_hooked_idents,
1588 );
1589
1590 let stmt_id = next_stmt_id.get_and_increment();
1591
1592 match builders_or_callback {
1593 BuildersOrCallback::Builders(graph_builders) => {
1594 let (sink_expr, _) = match instantiate_fn {
1595 DebugInstantiate::Building => (
1596 syn::parse_quote!(DUMMY_SINK),
1597 syn::parse_quote!(DUMMY_SOURCE),
1598 ),
1599
1600 DebugInstantiate::Finalized(finalized) => {
1601 (finalized.sink.clone(), finalized.source.clone())
1602 }
1603 };
1604
1605 graph_builders.create_external_output(
1606 &input.metadata().location_id,
1607 sink_expr,
1608 &input_ident,
1609 serialize_fn.as_ref(),
1610 stmt_id,
1611 );
1612 }
1613 BuildersOrCallback::Callback(leaf_callback, _) => {
1614 leaf_callback(self, next_stmt_id);
1615 }
1616 }
1617 }
1618
1619 HydroRoot::DestSink { sink, input, .. } => {
1620 let input_ident = input.emit_core(
1621 builders_or_callback,
1622 seen_tees,
1623 built_tees,
1624 next_stmt_id,
1625 fold_hooked_idents,
1626 );
1627
1628 let stmt_id = next_stmt_id.get_and_increment();
1629
1630 match builders_or_callback {
1631 BuildersOrCallback::Builders(graph_builders) => {
1632 graph_builders
1633 .get_dfir_mut(&input.metadata().location_id)
1634 .add_dfir(
1635 parse_quote! {
1636 #input_ident -> dest_sink(#sink);
1637 },
1638 None,
1639 Some(&stmt_id.to_string()),
1640 );
1641 }
1642 BuildersOrCallback::Callback(leaf_callback, _) => {
1643 leaf_callback(self, next_stmt_id);
1644 }
1645 }
1646 }
1647
1648 HydroRoot::CycleSink {
1649 cycle_id, input, ..
1650 } => {
1651 let input_ident = input.emit_core(
1652 builders_or_callback,
1653 seen_tees,
1654 built_tees,
1655 next_stmt_id,
1656 fold_hooked_idents,
1657 );
1658
1659 match builders_or_callback {
1660 BuildersOrCallback::Builders(graph_builders) => {
1661 let elem_type: syn::Type = match &input.metadata().collection_kind {
1662 CollectionKind::KeyedSingleton {
1663 key_type,
1664 value_type,
1665 ..
1666 }
1667 | CollectionKind::KeyedStream {
1668 key_type,
1669 value_type,
1670 ..
1671 } => {
1672 parse_quote!((#key_type, #value_type))
1673 }
1674 CollectionKind::Stream { element_type, .. }
1675 | CollectionKind::Singleton { element_type, .. }
1676 | CollectionKind::Optional { element_type, .. } => {
1677 parse_quote!(#element_type)
1678 }
1679 };
1680
1681 let cycle_id_ident = cycle_id.as_ident();
1682 graph_builders
1683 .get_dfir_mut(&input.metadata().location_id)
1684 .add_dfir(
1685 parse_quote! {
1686 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1687 },
1688 None,
1689 None,
1690 );
1691 }
1692 BuildersOrCallback::Callback(_, _) => {}
1694 }
1695 }
1696
1697 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1698 let input_ident = input.emit_core(
1699 builders_or_callback,
1700 seen_tees,
1701 built_tees,
1702 next_stmt_id,
1703 fold_hooked_idents,
1704 );
1705
1706 let stmt_id = next_stmt_id.get_and_increment();
1707
1708 match builders_or_callback {
1709 BuildersOrCallback::Builders(graph_builders) => {
1710 graph_builders
1711 .get_dfir_mut(&input.metadata().location_id)
1712 .add_dfir(
1713 parse_quote! {
1714 #input_ident -> for_each(&mut #ident);
1715 },
1716 None,
1717 Some(&stmt_id.to_string()),
1718 );
1719 }
1720 BuildersOrCallback::Callback(leaf_callback, _) => {
1721 leaf_callback(self, next_stmt_id);
1722 }
1723 }
1724 }
1725
1726 HydroRoot::Null { input, .. } => {
1727 let input_ident = input.emit_core(
1728 builders_or_callback,
1729 seen_tees,
1730 built_tees,
1731 next_stmt_id,
1732 fold_hooked_idents,
1733 );
1734
1735 let stmt_id = next_stmt_id.get_and_increment();
1736
1737 match builders_or_callback {
1738 BuildersOrCallback::Builders(graph_builders) => {
1739 graph_builders
1740 .get_dfir_mut(&input.metadata().location_id)
1741 .add_dfir(
1742 parse_quote! {
1743 #input_ident -> for_each(|_| {});
1744 },
1745 None,
1746 Some(&stmt_id.to_string()),
1747 );
1748 }
1749 BuildersOrCallback::Callback(leaf_callback, _) => {
1750 leaf_callback(self, next_stmt_id);
1751 }
1752 }
1753 }
1754 }
1755 }
1756
1757 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1758 match self {
1759 HydroRoot::ForEach { op_metadata, .. }
1760 | HydroRoot::SendExternal { op_metadata, .. }
1761 | HydroRoot::DestSink { op_metadata, .. }
1762 | HydroRoot::CycleSink { op_metadata, .. }
1763 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1764 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1765 }
1766 }
1767
1768 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1769 match self {
1770 HydroRoot::ForEach { op_metadata, .. }
1771 | HydroRoot::SendExternal { op_metadata, .. }
1772 | HydroRoot::DestSink { op_metadata, .. }
1773 | HydroRoot::CycleSink { op_metadata, .. }
1774 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1775 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1776 }
1777 }
1778
1779 pub fn input(&self) -> &HydroNode {
1780 match self {
1781 HydroRoot::ForEach { input, .. }
1782 | HydroRoot::SendExternal { input, .. }
1783 | HydroRoot::DestSink { input, .. }
1784 | HydroRoot::CycleSink { input, .. }
1785 | HydroRoot::EmbeddedOutput { input, .. }
1786 | HydroRoot::Null { input, .. } => input,
1787 }
1788 }
1789
1790 pub fn input_metadata(&self) -> &HydroIrMetadata {
1791 self.input().metadata()
1792 }
1793
1794 pub fn print_root(&self) -> String {
1795 match self {
1796 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1797 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1798 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1799 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1800 HydroRoot::EmbeddedOutput { ident, .. } => {
1801 format!("EmbeddedOutput({})", ident)
1802 }
1803 HydroRoot::Null { .. } => "Null".to_owned(),
1804 }
1805 }
1806
1807 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1808 match self {
1809 HydroRoot::ForEach { f, .. } => {
1810 transform(&mut f.expr);
1811 }
1812 HydroRoot::DestSink { sink, .. } => {
1813 transform(sink);
1814 }
1815 HydroRoot::SendExternal { .. }
1816 | HydroRoot::CycleSink { .. }
1817 | HydroRoot::EmbeddedOutput { .. }
1818 | HydroRoot::Null { .. } => {}
1819 }
1820 }
1821}
1822
1823#[cfg(feature = "build")]
1824fn tick_of(loc: &LocationId) -> Option<ClockId> {
1825 match loc {
1826 LocationId::Tick(id, _) => Some(*id),
1827 LocationId::Atomic(inner) => tick_of(inner),
1828 _ => None,
1829 }
1830}
1831
1832#[cfg(feature = "build")]
1833fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1834 match loc {
1835 LocationId::Tick(id, inner) => {
1836 *id = uf_find(uf, *id);
1837 remap_location(inner, uf);
1838 }
1839 LocationId::Atomic(inner) => {
1840 remap_location(inner, uf);
1841 }
1842 LocationId::Process(_) | LocationId::Cluster(_) => {}
1843 }
1844}
1845
1846#[cfg(feature = "build")]
1847fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1848 let p = *parent.get(&x).unwrap_or(&x);
1849 if p == x {
1850 return x;
1851 }
1852 let root = uf_find(parent, p);
1853 parent.insert(x, root);
1854 root
1855}
1856
1857#[cfg(feature = "build")]
1858fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1859 let ra = uf_find(parent, a);
1860 let rb = uf_find(parent, b);
1861 if ra != rb {
1862 parent.insert(ra, rb);
1863 }
1864}
1865
1866#[cfg(feature = "build")]
1870pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1871 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1872
1873 transform_bottom_up(
1875 ir,
1876 &mut |_| {},
1877 &mut |node: &mut HydroNode| match node {
1878 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1879 if let (Some(a), Some(b)) = (
1880 tick_of(&inner.metadata().location_id),
1881 tick_of(&metadata.location_id),
1882 ) {
1883 uf_union(&mut uf, a, b);
1884 }
1885 }
1886 HydroNode::Chain {
1887 first,
1888 second,
1889 metadata,
1890 }
1891 | HydroNode::ChainFirst {
1892 first,
1893 second,
1894 metadata,
1895 }
1896 | HydroNode::MergeOrdered {
1897 first,
1898 second,
1899 metadata,
1900 } => {
1901 if let (Some(a), Some(b)) = (
1902 tick_of(&first.metadata().location_id),
1903 tick_of(&metadata.location_id),
1904 ) {
1905 uf_union(&mut uf, a, b);
1906 }
1907 if let (Some(a), Some(b)) = (
1908 tick_of(&second.metadata().location_id),
1909 tick_of(&metadata.location_id),
1910 ) {
1911 uf_union(&mut uf, a, b);
1912 }
1913 }
1914 _ => {}
1915 },
1916 false,
1917 );
1918
1919 transform_bottom_up(
1921 ir,
1922 &mut |_| {},
1923 &mut |node: &mut HydroNode| {
1924 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1925 },
1926 false,
1927 );
1928}
1929
1930#[cfg(feature = "build")]
1931pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1932 let mut builders = SecondaryMap::new();
1933 let mut seen_tees = HashMap::new();
1934 let mut built_tees = HashMap::new();
1935 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1936 let mut fold_hooked_idents = HashSet::new();
1937 for leaf in ir {
1938 leaf.emit(
1939 &mut builders,
1940 &mut seen_tees,
1941 &mut built_tees,
1942 &mut next_stmt_id,
1943 &mut fold_hooked_idents,
1944 );
1945 }
1946 builders
1947}
1948
1949#[cfg(feature = "build")]
1950pub fn traverse_dfir(
1951 ir: &mut [HydroRoot],
1952 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1953 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1954) {
1955 let mut seen_tees = HashMap::new();
1956 let mut built_tees = HashMap::new();
1957 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1958 let mut fold_hooked_idents = HashSet::new();
1959 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1960 ir.iter_mut().for_each(|leaf| {
1961 leaf.emit_core(
1962 &mut callback,
1963 &mut seen_tees,
1964 &mut built_tees,
1965 &mut next_stmt_id,
1966 &mut fold_hooked_idents,
1967 );
1968 });
1969}
1970
1971pub fn transform_bottom_up(
1972 ir: &mut [HydroRoot],
1973 transform_root: &mut impl FnMut(&mut HydroRoot),
1974 transform_node: &mut impl FnMut(&mut HydroNode),
1975 check_well_formed: bool,
1976) {
1977 let mut seen_tees = HashMap::new();
1978 ir.iter_mut().for_each(|leaf| {
1979 leaf.transform_bottom_up(
1980 transform_root,
1981 transform_node,
1982 &mut seen_tees,
1983 check_well_formed,
1984 );
1985 });
1986}
1987
1988pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1989 let mut seen_tees = HashMap::new();
1990 ir.iter()
1991 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1992 .collect()
1993}
1994
1995type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1996thread_local! {
1997 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1998 static SERIALIZED_SHARED: PrintedTees
2002 = const { RefCell::new(None) };
2003}
2004
2005pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
2006 PRINTED_TEES.with(|printed_tees| {
2007 let mut printed_tees_mut = printed_tees.borrow_mut();
2008 *printed_tees_mut = Some((0, HashMap::new()));
2009 drop(printed_tees_mut);
2010
2011 let ret = f();
2012
2013 let mut printed_tees_mut = printed_tees.borrow_mut();
2014 *printed_tees_mut = None;
2015
2016 ret
2017 })
2018}
2019
2020pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2025 let _guard = SerializedSharedGuard::enter();
2026 f()
2027}
2028
2029struct SerializedSharedGuard {
2032 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2033}
2034
2035impl SerializedSharedGuard {
2036 fn enter() -> Self {
2037 let previous = SERIALIZED_SHARED.with(|cell| {
2038 let mut guard = cell.borrow_mut();
2039 guard.replace((0, HashMap::new()))
2040 });
2041 Self { previous }
2042 }
2043}
2044
2045impl Drop for SerializedSharedGuard {
2046 fn drop(&mut self) {
2047 SERIALIZED_SHARED.with(|cell| {
2048 *cell.borrow_mut() = self.previous.take();
2049 });
2050 }
2051}
2052
2053pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2054
2055impl serde::Serialize for SharedNode {
2056 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2067 SERIALIZED_SHARED.with(|cell| {
2068 let mut guard = cell.borrow_mut();
2069 let state = guard.as_mut().ok_or_else(|| {
2071 serde::ser::Error::custom(
2072 "SharedNode serialization requires an active serialize_dedup_shared scope",
2073 )
2074 })?;
2075 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2076
2077 if let Some(&id) = state.1.get(&ptr) {
2078 drop(guard);
2079 use serde::ser::SerializeMap;
2080 let mut map = serializer.serialize_map(Some(1))?;
2081 map.serialize_entry("$shared_ref", &id)?;
2082 map.end()
2083 } else {
2084 let id = state.0;
2085 state.0 += 1;
2086 state.1.insert(ptr, id);
2087 drop(guard);
2088
2089 use serde::ser::SerializeMap;
2090 let mut map = serializer.serialize_map(Some(2))?;
2091 map.serialize_entry("$shared", &id)?;
2092 map.serialize_entry("node", &*self.0.borrow())?;
2093 map.end()
2094 }
2095 })
2096 }
2097}
2098
2099impl SharedNode {
2100 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2101 Rc::as_ptr(&self.0)
2102 }
2103}
2104
2105impl Debug for SharedNode {
2106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2107 PRINTED_TEES.with(|printed_tees| {
2108 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2109 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2110
2111 if let Some(printed_tees_mut) = printed_tees_mut {
2112 if let Some(existing) = printed_tees_mut
2113 .1
2114 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2115 {
2116 write!(f, "<shared {}>", existing)
2117 } else {
2118 let next_id = printed_tees_mut.0;
2119 printed_tees_mut.0 += 1;
2120 printed_tees_mut
2121 .1
2122 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2123 drop(printed_tees_mut_borrow);
2124 write!(f, "<shared {}>: ", next_id)?;
2125 Debug::fmt(&self.0.borrow(), f)
2126 }
2127 } else {
2128 drop(printed_tees_mut_borrow);
2129 write!(f, "<shared>: ")?;
2130 Debug::fmt(&self.0.borrow(), f)
2131 }
2132 })
2133 }
2134}
2135
2136impl Hash for SharedNode {
2137 fn hash<H: Hasher>(&self, state: &mut H) {
2138 self.0.borrow_mut().hash(state);
2139 }
2140}
2141
2142#[derive(Debug)]
2147pub enum AccessCounter {
2148 Counting(Cell<u32>),
2149 Frozen(u32),
2150}
2151
2152impl AccessCounter {
2153 pub fn new() -> Self {
2154 Self::Counting(Cell::new(0))
2155 }
2156
2157 pub fn next_group(&self, is_mut: bool) -> Self {
2161 let AccessCounter::Counting(count) = self else {
2162 panic!("Cannot count on `AccessCounter::Frozen`");
2163 };
2164 let c = if is_mut {
2165 let c = count.get() + 1;
2166 count.set(c + 1);
2167 c
2168 } else {
2169 count.get()
2170 };
2171 Self::Frozen(c)
2172 }
2173
2174 pub fn freeze(&self) -> Self {
2176 Self::Frozen(match self {
2177 Self::Counting(count) => count.get(),
2178 Self::Frozen(count) => *count,
2179 })
2180 }
2181
2182 pub fn frozen_group(&self) -> u32 {
2183 let Self::Frozen(count) = self else {
2184 panic!("`AccessCounter` not frozen");
2185 };
2186 *count
2187 }
2188}
2189
2190impl Default for AccessCounter {
2191 fn default() -> Self {
2192 Self::new()
2193 }
2194}
2195
2196impl Hash for AccessCounter {
2197 fn hash<H: Hasher>(&self, _state: &mut H) {
2198 }
2200}
2201
2202impl serde::Serialize for AccessCounter {
2203 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2204 let count = match self {
2205 AccessCounter::Counting(count) => count.get(),
2206 AccessCounter::Frozen(count) => *count,
2207 };
2208 count.serialize(serializer)
2209 }
2210}
2211
2212#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2213pub enum BoundKind {
2214 Unbounded,
2215 Bounded,
2216}
2217
2218#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2219pub enum StreamOrder {
2220 NoOrder,
2221 TotalOrder,
2222}
2223
2224#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2225pub enum StreamRetry {
2226 AtLeastOnce,
2227 ExactlyOnce,
2228}
2229
2230#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2231pub enum KeyedSingletonBoundKind {
2232 Unbounded,
2233 MonotonicKeys,
2234 MonotonicValue,
2235 BoundedValue,
2236 Bounded,
2237}
2238
2239#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2240pub enum SingletonBoundKind {
2241 Unbounded,
2242 Monotonic,
2243 Bounded,
2244}
2245
2246#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2247pub enum CollectionKind {
2248 Stream {
2249 bound: BoundKind,
2250 order: StreamOrder,
2251 retry: StreamRetry,
2252 element_type: DebugType,
2253 },
2254 Singleton {
2255 bound: SingletonBoundKind,
2256 element_type: DebugType,
2257 },
2258 Optional {
2259 bound: BoundKind,
2260 element_type: DebugType,
2261 },
2262 KeyedStream {
2263 bound: BoundKind,
2264 value_order: StreamOrder,
2265 value_retry: StreamRetry,
2266 key_type: DebugType,
2267 value_type: DebugType,
2268 },
2269 KeyedSingleton {
2270 bound: KeyedSingletonBoundKind,
2271 key_type: DebugType,
2272 value_type: DebugType,
2273 },
2274}
2275
2276impl CollectionKind {
2277 pub fn is_bounded(&self) -> bool {
2278 matches!(
2279 self,
2280 CollectionKind::Stream {
2281 bound: BoundKind::Bounded,
2282 ..
2283 } | CollectionKind::Singleton {
2284 bound: SingletonBoundKind::Bounded,
2285 ..
2286 } | CollectionKind::Optional {
2287 bound: BoundKind::Bounded,
2288 ..
2289 } | CollectionKind::KeyedStream {
2290 bound: BoundKind::Bounded,
2291 ..
2292 } | CollectionKind::KeyedSingleton {
2293 bound: KeyedSingletonBoundKind::Bounded,
2294 ..
2295 }
2296 )
2297 }
2298
2299 pub fn is_strict(&self) -> bool {
2302 match self {
2303 CollectionKind::Stream { order, retry, .. } => {
2304 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2305 }
2306 CollectionKind::KeyedStream {
2307 value_order,
2308 value_retry,
2309 ..
2310 } => {
2311 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2312 }
2313 CollectionKind::Singleton { .. }
2316 | CollectionKind::Optional { .. }
2317 | CollectionKind::KeyedSingleton { .. } => true,
2318 }
2319 }
2320
2321 pub fn strict_kind(&self) -> CollectionKind {
2323 match self {
2324 CollectionKind::Stream {
2325 bound,
2326 element_type,
2327 ..
2328 } => CollectionKind::Stream {
2329 bound: bound.clone(),
2330 order: StreamOrder::TotalOrder,
2331 retry: StreamRetry::ExactlyOnce,
2332 element_type: element_type.clone(),
2333 },
2334 CollectionKind::KeyedStream {
2335 bound,
2336 key_type,
2337 value_type,
2338 ..
2339 } => CollectionKind::KeyedStream {
2340 bound: bound.clone(),
2341 value_order: StreamOrder::TotalOrder,
2342 value_retry: StreamRetry::ExactlyOnce,
2343 key_type: key_type.clone(),
2344 value_type: value_type.clone(),
2345 },
2346 other => other.clone(),
2347 }
2348 }
2349}
2350
2351#[derive(Clone, serde::Serialize)]
2352pub struct HydroIrMetadata {
2353 pub location_id: LocationId,
2354 pub collection_kind: CollectionKind,
2355 pub consistency: Option<ClusterConsistency>,
2356 pub cardinality: Option<usize>,
2357 pub tag: Option<String>,
2358 pub op: HydroIrOpMetadata,
2359}
2360
2361impl Hash for HydroIrMetadata {
2363 fn hash<H: Hasher>(&self, _: &mut H) {}
2364}
2365
2366impl PartialEq for HydroIrMetadata {
2367 fn eq(&self, _: &Self) -> bool {
2368 true
2369 }
2370}
2371
2372impl Eq for HydroIrMetadata {}
2373
2374impl Debug for HydroIrMetadata {
2375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2376 f.debug_struct("HydroIrMetadata")
2377 .field("location_id", &self.location_id)
2378 .field("collection_kind", &self.collection_kind)
2379 .finish()
2380 }
2381}
2382
2383#[derive(Clone, serde::Serialize)]
2386pub struct HydroIrOpMetadata {
2387 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2388 pub backtrace: Backtrace,
2389 pub cpu_usage: Option<f64>,
2390 pub network_recv_cpu_usage: Option<f64>,
2391 pub id: Option<usize>,
2392}
2393
2394impl HydroIrOpMetadata {
2395 #[expect(
2396 clippy::new_without_default,
2397 reason = "explicit calls to new ensure correct backtrace bounds"
2398 )]
2399 pub fn new() -> HydroIrOpMetadata {
2400 Self::new_with_skip(1)
2401 }
2402
2403 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2404 HydroIrOpMetadata {
2405 backtrace: Backtrace::get_backtrace(2 + skip_count),
2406 cpu_usage: None,
2407 network_recv_cpu_usage: None,
2408 id: None,
2409 }
2410 }
2411}
2412
2413impl Debug for HydroIrOpMetadata {
2414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2415 f.debug_struct("HydroIrOpMetadata").finish()
2416 }
2417}
2418
2419impl Hash for HydroIrOpMetadata {
2420 fn hash<H: Hasher>(&self, _: &mut H) {}
2421}
2422
2423#[derive(Debug, Hash, serde::Serialize)]
2426pub enum HydroNode {
2427 Placeholder,
2428
2429 Cast {
2437 inner: Box<HydroNode>,
2438 metadata: HydroIrMetadata,
2439 },
2440
2441 ObserveNonDet {
2447 inner: Box<HydroNode>,
2448 trusted: bool, metadata: HydroIrMetadata,
2450 },
2451
2452 Source {
2453 source: HydroSource,
2454 metadata: HydroIrMetadata,
2455 },
2456
2457 SingletonSource {
2458 value: DebugExpr,
2459 first_tick_only: bool,
2460 metadata: HydroIrMetadata,
2461 },
2462
2463 CycleSource {
2464 cycle_id: CycleId,
2465 metadata: HydroIrMetadata,
2466 },
2467
2468 Tee {
2469 inner: SharedNode,
2470 metadata: HydroIrMetadata,
2471 },
2472
2473 Reference {
2482 inner: SharedNode,
2483 kind: crate::handoff_ref::HandoffRefKind,
2484 access_counter: AccessCounter,
2485 metadata: HydroIrMetadata,
2486 },
2487
2488 Partition {
2489 inner: SharedNode,
2490 f: ClosureExpr,
2491 is_true: bool,
2492 metadata: HydroIrMetadata,
2493 },
2494
2495 BeginAtomic {
2496 inner: Box<HydroNode>,
2497 metadata: HydroIrMetadata,
2498 },
2499
2500 EndAtomic {
2501 inner: Box<HydroNode>,
2502 metadata: HydroIrMetadata,
2503 },
2504
2505 Batch {
2506 inner: Box<HydroNode>,
2507 metadata: HydroIrMetadata,
2508 },
2509
2510 YieldConcat {
2511 inner: Box<HydroNode>,
2512 metadata: HydroIrMetadata,
2513 },
2514
2515 Chain {
2516 first: Box<HydroNode>,
2517 second: Box<HydroNode>,
2518 metadata: HydroIrMetadata,
2519 },
2520
2521 MergeOrdered {
2522 first: Box<HydroNode>,
2523 second: Box<HydroNode>,
2524 metadata: HydroIrMetadata,
2525 },
2526
2527 ChainFirst {
2528 first: Box<HydroNode>,
2529 second: Box<HydroNode>,
2530 metadata: HydroIrMetadata,
2531 },
2532
2533 CrossProduct {
2534 left: Box<HydroNode>,
2535 right: Box<HydroNode>,
2536 metadata: HydroIrMetadata,
2537 },
2538
2539 CrossSingleton {
2540 left: Box<HydroNode>,
2541 right: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544
2545 Join {
2546 left: Box<HydroNode>,
2547 right: Box<HydroNode>,
2548 metadata: HydroIrMetadata,
2549 },
2550
2551 JoinHalf {
2555 left: Box<HydroNode>,
2556 right: Box<HydroNode>,
2557 metadata: HydroIrMetadata,
2558 },
2559
2560 Difference {
2561 pos: Box<HydroNode>,
2562 neg: Box<HydroNode>,
2563 metadata: HydroIrMetadata,
2564 },
2565
2566 AntiJoin {
2567 pos: Box<HydroNode>,
2568 neg: Box<HydroNode>,
2569 metadata: HydroIrMetadata,
2570 },
2571
2572 ResolveFutures {
2573 input: Box<HydroNode>,
2574 metadata: HydroIrMetadata,
2575 },
2576 ResolveFuturesBlocking {
2577 input: Box<HydroNode>,
2578 metadata: HydroIrMetadata,
2579 },
2580 ResolveFuturesOrdered {
2581 input: Box<HydroNode>,
2582 metadata: HydroIrMetadata,
2583 },
2584
2585 Map {
2586 f: ClosureExpr,
2587 input: Box<HydroNode>,
2588 metadata: HydroIrMetadata,
2589 },
2590 FlatMap {
2591 f: ClosureExpr,
2592 input: Box<HydroNode>,
2593 metadata: HydroIrMetadata,
2594 },
2595 FlatMapStreamBlocking {
2596 f: ClosureExpr,
2597 input: Box<HydroNode>,
2598 metadata: HydroIrMetadata,
2599 },
2600 Filter {
2601 f: ClosureExpr,
2602 input: Box<HydroNode>,
2603 metadata: HydroIrMetadata,
2604 },
2605 FilterMap {
2606 f: ClosureExpr,
2607 input: Box<HydroNode>,
2608 metadata: HydroIrMetadata,
2609 },
2610
2611 DeferTick {
2612 input: Box<HydroNode>,
2613 metadata: HydroIrMetadata,
2614 },
2615 Enumerate {
2616 input: Box<HydroNode>,
2617 metadata: HydroIrMetadata,
2618 },
2619 Inspect {
2620 f: ClosureExpr,
2621 input: Box<HydroNode>,
2622 metadata: HydroIrMetadata,
2623 },
2624
2625 Unique {
2626 input: Box<HydroNode>,
2627 metadata: HydroIrMetadata,
2628 },
2629
2630 Sort {
2631 input: Box<HydroNode>,
2632 metadata: HydroIrMetadata,
2633 },
2634 Fold {
2635 init: ClosureExpr,
2636 acc: ClosureExpr,
2637 input: Box<HydroNode>,
2638 metadata: HydroIrMetadata,
2639 },
2640
2641 Scan {
2642 init: ClosureExpr,
2643 acc: ClosureExpr,
2644 input: Box<HydroNode>,
2645 metadata: HydroIrMetadata,
2646 },
2647 ScanAsyncBlocking {
2648 init: ClosureExpr,
2649 acc: ClosureExpr,
2650 input: Box<HydroNode>,
2651 metadata: HydroIrMetadata,
2652 },
2653 FoldKeyed {
2654 init: ClosureExpr,
2655 acc: ClosureExpr,
2656 input: Box<HydroNode>,
2657 metadata: HydroIrMetadata,
2658 },
2659
2660 Reduce {
2661 f: ClosureExpr,
2662 input: Box<HydroNode>,
2663 metadata: HydroIrMetadata,
2664 },
2665 ReduceKeyed {
2666 f: ClosureExpr,
2667 input: Box<HydroNode>,
2668 metadata: HydroIrMetadata,
2669 },
2670 ReduceKeyedWatermark {
2671 f: ClosureExpr,
2672 input: Box<HydroNode>,
2673 watermark: Box<HydroNode>,
2674 metadata: HydroIrMetadata,
2675 },
2676
2677 Network {
2678 name: Option<String>,
2679 networking_info: crate::networking::NetworkingInfo,
2680 serialize_fn: Option<DebugExpr>,
2681 instantiate_fn: DebugInstantiate,
2682 deserialize_fn: Option<DebugExpr>,
2683 input: Box<HydroNode>,
2684 metadata: HydroIrMetadata,
2685 },
2686
2687 VersionedNetworkFork {
2688 channel_id: u32,
2689 channel_name: String,
2690 senders: Vec<(u32, Box<HydroNode>, Option<DebugExpr>)>,
2691 metadata: HydroIrMetadata,
2692 },
2693
2694 VersionedNetwork {
2695 fork: SharedNode,
2696 version: u32,
2697 deserialize_fn: Option<DebugExpr>,
2698 metadata: HydroIrMetadata,
2699 },
2700
2701 ExternalInput {
2702 from_external_key: LocationKey,
2703 from_port_id: ExternalPortId,
2704 from_many: bool,
2705 codec_type: DebugType,
2706 #[serde(skip)]
2707 port_hint: NetworkHint,
2708 instantiate_fn: DebugInstantiate,
2709 deserialize_fn: Option<DebugExpr>,
2710 metadata: HydroIrMetadata,
2711 },
2712
2713 Counter {
2714 tag: String,
2715 duration: DebugExpr,
2716 prefix: String,
2717 input: Box<HydroNode>,
2718 metadata: HydroIrMetadata,
2719 },
2720
2721 AssertIsConsistent {
2722 inner: Box<HydroNode>,
2723 trusted: bool,
2724 metadata: HydroIrMetadata,
2725 },
2726
2727 UnboundSingleton {
2728 inner: Box<HydroNode>,
2729 metadata: HydroIrMetadata,
2730 },
2731}
2732
2733pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2734pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2735
2736#[cfg(feature = "build")]
2740fn maybe_observe_for_mut(
2741 f: &ClosureExpr,
2742 in_ident: syn::Ident,
2743 in_location: &LocationId,
2744 in_kind: &CollectionKind,
2745 op_meta: &HydroIrOpMetadata,
2746 builders_or_callback: &mut BuildersOrCallback<
2747 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2748 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2749 >,
2750 next_stmt_id: &mut crate::Counter<StmtId>,
2751) -> syn::Ident {
2752 if f.has_mut_ref() && !in_kind.is_strict() {
2753 let observe_stmt_id = next_stmt_id.get_and_increment();
2754 let observe_ident =
2755 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2756 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2757 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2758 }
2759 observe_ident
2760 } else {
2761 in_ident
2762 }
2763}
2764
2765impl HydroNode {
2766 pub fn transform_bottom_up(
2767 &mut self,
2768 transform: &mut impl FnMut(&mut HydroNode),
2769 seen_tees: &mut SeenSharedNodes,
2770 check_well_formed: bool,
2771 ) {
2772 self.transform_children(
2773 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2774 seen_tees,
2775 );
2776
2777 transform(self);
2778
2779 let self_location = self.metadata().location_id.root();
2780
2781 if check_well_formed {
2782 match &*self {
2783 HydroNode::Network { .. } => {}
2784 _ => {
2785 self.input_metadata().iter().for_each(|i| {
2786 if i.location_id.root() != self_location {
2787 panic!(
2788 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2789 i,
2790 i.location_id.root(),
2791 self,
2792 self_location
2793 )
2794 }
2795 });
2796 }
2797 }
2798 }
2799 }
2800
2801 #[inline(always)]
2802 pub fn transform_children(
2803 &mut self,
2804 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2805 seen_tees: &mut SeenSharedNodes,
2806 ) {
2807 match self {
2808 HydroNode::Placeholder => {
2809 panic!();
2810 }
2811
2812 HydroNode::Source { .. }
2813 | HydroNode::SingletonSource { .. }
2814 | HydroNode::CycleSource { .. }
2815 | HydroNode::ExternalInput { .. } => {}
2816
2817 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2818 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2819 *inner = SharedNode(transformed.clone());
2820 } else {
2821 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2822 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2823 let mut orig = inner.0.replace(HydroNode::Placeholder);
2824 transform(&mut orig, seen_tees);
2825 *transformed_cell.borrow_mut() = orig;
2826 *inner = SharedNode(transformed_cell);
2827 }
2828 }
2829
2830 HydroNode::Partition { inner, f, .. } => {
2831 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2832 *inner = SharedNode(transformed.clone());
2833 } else {
2834 f.transform_children(&mut transform, seen_tees);
2835 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2836 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2837 let mut orig = inner.0.replace(HydroNode::Placeholder);
2838 transform(&mut orig, seen_tees);
2839 *transformed_cell.borrow_mut() = orig;
2840 *inner = SharedNode(transformed_cell);
2841 }
2842 }
2843
2844 HydroNode::Cast { inner, .. }
2845 | HydroNode::ObserveNonDet { inner, .. }
2846 | HydroNode::BeginAtomic { inner, .. }
2847 | HydroNode::EndAtomic { inner, .. }
2848 | HydroNode::Batch { inner, .. }
2849 | HydroNode::YieldConcat { inner, .. }
2850 | HydroNode::UnboundSingleton { inner, .. }
2851 | HydroNode::AssertIsConsistent { inner, .. } => {
2852 transform(inner.as_mut(), seen_tees);
2853 }
2854
2855 HydroNode::Chain { first, second, .. } => {
2856 transform(first.as_mut(), seen_tees);
2857 transform(second.as_mut(), seen_tees);
2858 }
2859
2860 HydroNode::MergeOrdered { first, second, .. } => {
2861 transform(first.as_mut(), seen_tees);
2862 transform(second.as_mut(), seen_tees);
2863 }
2864
2865 HydroNode::ChainFirst { first, second, .. } => {
2866 transform(first.as_mut(), seen_tees);
2867 transform(second.as_mut(), seen_tees);
2868 }
2869
2870 HydroNode::CrossSingleton { left, right, .. }
2871 | HydroNode::CrossProduct { left, right, .. }
2872 | HydroNode::Join { left, right, .. }
2873 | HydroNode::JoinHalf { left, right, .. } => {
2874 transform(left.as_mut(), seen_tees);
2875 transform(right.as_mut(), seen_tees);
2876 }
2877
2878 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2879 transform(pos.as_mut(), seen_tees);
2880 transform(neg.as_mut(), seen_tees);
2881 }
2882
2883 HydroNode::Map { f, input, .. } => {
2884 f.transform_children(&mut transform, seen_tees);
2885 transform(input.as_mut(), seen_tees);
2886 }
2887 HydroNode::FlatMap { f, input, .. }
2888 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2889 | HydroNode::Filter { f, input, .. }
2890 | HydroNode::FilterMap { f, input, .. }
2891 | HydroNode::Inspect { f, input, .. }
2892 | HydroNode::Reduce { f, input, .. }
2893 | HydroNode::ReduceKeyed { f, input, .. } => {
2894 f.transform_children(&mut transform, seen_tees);
2895 transform(input.as_mut(), seen_tees);
2896 }
2897 HydroNode::ReduceKeyedWatermark {
2898 f,
2899 input,
2900 watermark,
2901 ..
2902 } => {
2903 f.transform_children(&mut transform, seen_tees);
2904 transform(input.as_mut(), seen_tees);
2905 transform(watermark.as_mut(), seen_tees);
2906 }
2907 HydroNode::Fold {
2908 init, acc, input, ..
2909 }
2910 | HydroNode::Scan {
2911 init, acc, input, ..
2912 }
2913 | HydroNode::ScanAsyncBlocking {
2914 init, acc, input, ..
2915 }
2916 | HydroNode::FoldKeyed {
2917 init, acc, input, ..
2918 } => {
2919 init.transform_children(&mut transform, seen_tees);
2920 acc.transform_children(&mut transform, seen_tees);
2921 transform(input.as_mut(), seen_tees);
2922 }
2923 HydroNode::ResolveFutures { input, .. }
2924 | HydroNode::ResolveFuturesBlocking { input, .. }
2925 | HydroNode::ResolveFuturesOrdered { input, .. }
2926 | HydroNode::Sort { input, .. }
2927 | HydroNode::DeferTick { input, .. }
2928 | HydroNode::Enumerate { input, .. }
2929 | HydroNode::Unique { input, .. }
2930 | HydroNode::Network { input, .. }
2931 | HydroNode::Counter { input, .. } => {
2932 transform(input.as_mut(), seen_tees);
2933 }
2934
2935 HydroNode::VersionedNetworkFork { senders, .. } => {
2936 for (_version, sender, _serialize) in senders.iter_mut() {
2937 transform(sender.as_mut(), seen_tees);
2938 }
2939 }
2940
2941 HydroNode::VersionedNetwork { fork, .. } => {
2942 if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
2943 *fork = SharedNode(transformed.clone());
2944 } else {
2945 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2946 seen_tees.insert(fork.as_ptr(), transformed_cell.clone());
2947 let mut orig = fork.0.replace(HydroNode::Placeholder);
2948 transform(&mut orig, seen_tees);
2949 *transformed_cell.borrow_mut() = orig;
2950 *fork = SharedNode(transformed_cell);
2951 }
2952 }
2953 }
2954 }
2955
2956 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2957 match self {
2958 HydroNode::Placeholder => HydroNode::Placeholder,
2959 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2960 inner: Box::new(inner.deep_clone(seen_tees)),
2961 metadata: metadata.clone(),
2962 },
2963 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2964 inner: Box::new(inner.deep_clone(seen_tees)),
2965 metadata: metadata.clone(),
2966 },
2967 HydroNode::ObserveNonDet {
2968 inner,
2969 trusted,
2970 metadata,
2971 } => HydroNode::ObserveNonDet {
2972 inner: Box::new(inner.deep_clone(seen_tees)),
2973 trusted: *trusted,
2974 metadata: metadata.clone(),
2975 },
2976 HydroNode::AssertIsConsistent {
2977 inner,
2978 trusted,
2979 metadata,
2980 } => HydroNode::AssertIsConsistent {
2981 inner: Box::new(inner.deep_clone(seen_tees)),
2982 trusted: *trusted,
2983 metadata: metadata.clone(),
2984 },
2985 HydroNode::Source { source, metadata } => HydroNode::Source {
2986 source: source.clone(),
2987 metadata: metadata.clone(),
2988 },
2989 HydroNode::SingletonSource {
2990 value,
2991 first_tick_only,
2992 metadata,
2993 } => HydroNode::SingletonSource {
2994 value: value.clone(),
2995 first_tick_only: *first_tick_only,
2996 metadata: metadata.clone(),
2997 },
2998 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2999 cycle_id: *cycle_id,
3000 metadata: metadata.clone(),
3001 },
3002 HydroNode::Tee { inner, metadata }
3003 | HydroNode::Reference {
3004 inner, metadata, ..
3005 } => {
3006 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3007 SharedNode(transformed.clone())
3008 } else {
3009 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3010 seen_tees.insert(inner.as_ptr(), new_rc.clone());
3011 let cloned = inner.0.borrow().deep_clone(seen_tees);
3012 *new_rc.borrow_mut() = cloned;
3013 SharedNode(new_rc)
3014 };
3015 if let HydroNode::Reference {
3016 kind,
3017 access_counter,
3018 ..
3019 } = self
3020 {
3021 HydroNode::Reference {
3022 inner: cloned_inner,
3023 kind: *kind,
3024 access_counter: access_counter.freeze(),
3025 metadata: metadata.clone(),
3026 }
3027 } else {
3028 HydroNode::Tee {
3029 inner: cloned_inner,
3030 metadata: metadata.clone(),
3031 }
3032 }
3033 }
3034 HydroNode::Partition {
3035 inner,
3036 f,
3037 is_true,
3038 metadata,
3039 } => {
3040 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3041 HydroNode::Partition {
3042 inner: SharedNode(transformed.clone()),
3043 f: f.deep_clone(seen_tees),
3044 is_true: *is_true,
3045 metadata: metadata.clone(),
3046 }
3047 } else {
3048 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3049 seen_tees.insert(inner.as_ptr(), new_rc.clone());
3050 let cloned = inner.0.borrow().deep_clone(seen_tees);
3051 *new_rc.borrow_mut() = cloned;
3052 HydroNode::Partition {
3053 inner: SharedNode(new_rc),
3054 f: f.deep_clone(seen_tees),
3055 is_true: *is_true,
3056 metadata: metadata.clone(),
3057 }
3058 }
3059 }
3060 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
3061 inner: Box::new(inner.deep_clone(seen_tees)),
3062 metadata: metadata.clone(),
3063 },
3064 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
3065 inner: Box::new(inner.deep_clone(seen_tees)),
3066 metadata: metadata.clone(),
3067 },
3068 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3069 inner: Box::new(inner.deep_clone(seen_tees)),
3070 metadata: metadata.clone(),
3071 },
3072 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3073 inner: Box::new(inner.deep_clone(seen_tees)),
3074 metadata: metadata.clone(),
3075 },
3076 HydroNode::Chain {
3077 first,
3078 second,
3079 metadata,
3080 } => HydroNode::Chain {
3081 first: Box::new(first.deep_clone(seen_tees)),
3082 second: Box::new(second.deep_clone(seen_tees)),
3083 metadata: metadata.clone(),
3084 },
3085 HydroNode::MergeOrdered {
3086 first,
3087 second,
3088 metadata,
3089 } => HydroNode::MergeOrdered {
3090 first: Box::new(first.deep_clone(seen_tees)),
3091 second: Box::new(second.deep_clone(seen_tees)),
3092 metadata: metadata.clone(),
3093 },
3094 HydroNode::ChainFirst {
3095 first,
3096 second,
3097 metadata,
3098 } => HydroNode::ChainFirst {
3099 first: Box::new(first.deep_clone(seen_tees)),
3100 second: Box::new(second.deep_clone(seen_tees)),
3101 metadata: metadata.clone(),
3102 },
3103 HydroNode::CrossProduct {
3104 left,
3105 right,
3106 metadata,
3107 } => HydroNode::CrossProduct {
3108 left: Box::new(left.deep_clone(seen_tees)),
3109 right: Box::new(right.deep_clone(seen_tees)),
3110 metadata: metadata.clone(),
3111 },
3112 HydroNode::CrossSingleton {
3113 left,
3114 right,
3115 metadata,
3116 } => HydroNode::CrossSingleton {
3117 left: Box::new(left.deep_clone(seen_tees)),
3118 right: Box::new(right.deep_clone(seen_tees)),
3119 metadata: metadata.clone(),
3120 },
3121 HydroNode::Join {
3122 left,
3123 right,
3124 metadata,
3125 } => HydroNode::Join {
3126 left: Box::new(left.deep_clone(seen_tees)),
3127 right: Box::new(right.deep_clone(seen_tees)),
3128 metadata: metadata.clone(),
3129 },
3130 HydroNode::JoinHalf {
3131 left,
3132 right,
3133 metadata,
3134 } => HydroNode::JoinHalf {
3135 left: Box::new(left.deep_clone(seen_tees)),
3136 right: Box::new(right.deep_clone(seen_tees)),
3137 metadata: metadata.clone(),
3138 },
3139 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3140 pos: Box::new(pos.deep_clone(seen_tees)),
3141 neg: Box::new(neg.deep_clone(seen_tees)),
3142 metadata: metadata.clone(),
3143 },
3144 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3145 pos: Box::new(pos.deep_clone(seen_tees)),
3146 neg: Box::new(neg.deep_clone(seen_tees)),
3147 metadata: metadata.clone(),
3148 },
3149 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3150 input: Box::new(input.deep_clone(seen_tees)),
3151 metadata: metadata.clone(),
3152 },
3153 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3154 HydroNode::ResolveFuturesBlocking {
3155 input: Box::new(input.deep_clone(seen_tees)),
3156 metadata: metadata.clone(),
3157 }
3158 }
3159 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3160 HydroNode::ResolveFuturesOrdered {
3161 input: Box::new(input.deep_clone(seen_tees)),
3162 metadata: metadata.clone(),
3163 }
3164 }
3165 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3166 f: f.deep_clone(seen_tees),
3167 input: Box::new(input.deep_clone(seen_tees)),
3168 metadata: metadata.clone(),
3169 },
3170 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3171 f: f.deep_clone(seen_tees),
3172 input: Box::new(input.deep_clone(seen_tees)),
3173 metadata: metadata.clone(),
3174 },
3175 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3176 HydroNode::FlatMapStreamBlocking {
3177 f: f.deep_clone(seen_tees),
3178 input: Box::new(input.deep_clone(seen_tees)),
3179 metadata: metadata.clone(),
3180 }
3181 }
3182 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3183 f: f.deep_clone(seen_tees),
3184 input: Box::new(input.deep_clone(seen_tees)),
3185 metadata: metadata.clone(),
3186 },
3187 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3188 f: f.deep_clone(seen_tees),
3189 input: Box::new(input.deep_clone(seen_tees)),
3190 metadata: metadata.clone(),
3191 },
3192 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3193 input: Box::new(input.deep_clone(seen_tees)),
3194 metadata: metadata.clone(),
3195 },
3196 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3197 input: Box::new(input.deep_clone(seen_tees)),
3198 metadata: metadata.clone(),
3199 },
3200 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3201 f: f.deep_clone(seen_tees),
3202 input: Box::new(input.deep_clone(seen_tees)),
3203 metadata: metadata.clone(),
3204 },
3205 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3206 input: Box::new(input.deep_clone(seen_tees)),
3207 metadata: metadata.clone(),
3208 },
3209 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3210 input: Box::new(input.deep_clone(seen_tees)),
3211 metadata: metadata.clone(),
3212 },
3213 HydroNode::Fold {
3214 init,
3215 acc,
3216 input,
3217 metadata,
3218 } => HydroNode::Fold {
3219 init: init.deep_clone(seen_tees),
3220 acc: acc.deep_clone(seen_tees),
3221 input: Box::new(input.deep_clone(seen_tees)),
3222 metadata: metadata.clone(),
3223 },
3224 HydroNode::Scan {
3225 init,
3226 acc,
3227 input,
3228 metadata,
3229 } => HydroNode::Scan {
3230 init: init.deep_clone(seen_tees),
3231 acc: acc.deep_clone(seen_tees),
3232 input: Box::new(input.deep_clone(seen_tees)),
3233 metadata: metadata.clone(),
3234 },
3235 HydroNode::ScanAsyncBlocking {
3236 init,
3237 acc,
3238 input,
3239 metadata,
3240 } => HydroNode::ScanAsyncBlocking {
3241 init: init.deep_clone(seen_tees),
3242 acc: acc.deep_clone(seen_tees),
3243 input: Box::new(input.deep_clone(seen_tees)),
3244 metadata: metadata.clone(),
3245 },
3246 HydroNode::FoldKeyed {
3247 init,
3248 acc,
3249 input,
3250 metadata,
3251 } => HydroNode::FoldKeyed {
3252 init: init.deep_clone(seen_tees),
3253 acc: acc.deep_clone(seen_tees),
3254 input: Box::new(input.deep_clone(seen_tees)),
3255 metadata: metadata.clone(),
3256 },
3257 HydroNode::ReduceKeyedWatermark {
3258 f,
3259 input,
3260 watermark,
3261 metadata,
3262 } => HydroNode::ReduceKeyedWatermark {
3263 f: f.deep_clone(seen_tees),
3264 input: Box::new(input.deep_clone(seen_tees)),
3265 watermark: Box::new(watermark.deep_clone(seen_tees)),
3266 metadata: metadata.clone(),
3267 },
3268 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3269 f: f.deep_clone(seen_tees),
3270 input: Box::new(input.deep_clone(seen_tees)),
3271 metadata: metadata.clone(),
3272 },
3273 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3274 f: f.deep_clone(seen_tees),
3275 input: Box::new(input.deep_clone(seen_tees)),
3276 metadata: metadata.clone(),
3277 },
3278 HydroNode::Network {
3279 name,
3280 networking_info,
3281 serialize_fn,
3282 instantiate_fn,
3283 deserialize_fn,
3284 input,
3285 metadata,
3286 } => HydroNode::Network {
3287 name: name.clone(),
3288 networking_info: networking_info.clone(),
3289 serialize_fn: serialize_fn.clone(),
3290 instantiate_fn: instantiate_fn.clone(),
3291 deserialize_fn: deserialize_fn.clone(),
3292 input: Box::new(input.deep_clone(seen_tees)),
3293 metadata: metadata.clone(),
3294 },
3295 HydroNode::ExternalInput {
3296 from_external_key,
3297 from_port_id,
3298 from_many,
3299 codec_type,
3300 port_hint,
3301 instantiate_fn,
3302 deserialize_fn,
3303 metadata,
3304 } => HydroNode::ExternalInput {
3305 from_external_key: *from_external_key,
3306 from_port_id: *from_port_id,
3307 from_many: *from_many,
3308 codec_type: codec_type.clone(),
3309 port_hint: *port_hint,
3310 instantiate_fn: instantiate_fn.clone(),
3311 deserialize_fn: deserialize_fn.clone(),
3312 metadata: metadata.clone(),
3313 },
3314 HydroNode::Counter {
3315 tag,
3316 duration,
3317 prefix,
3318 input,
3319 metadata,
3320 } => HydroNode::Counter {
3321 tag: tag.clone(),
3322 duration: duration.clone(),
3323 prefix: prefix.clone(),
3324 input: Box::new(input.deep_clone(seen_tees)),
3325 metadata: metadata.clone(),
3326 },
3327 HydroNode::VersionedNetworkFork {
3328 channel_id,
3329 channel_name,
3330 senders,
3331 metadata,
3332 } => HydroNode::VersionedNetworkFork {
3333 channel_id: *channel_id,
3334 channel_name: channel_name.clone(),
3335 senders: senders
3336 .iter()
3337 .map(|(version, sender, serialize)| {
3338 (
3339 *version,
3340 Box::new(sender.deep_clone(seen_tees)),
3341 serialize.clone(),
3342 )
3343 })
3344 .collect(),
3345 metadata: metadata.clone(),
3346 },
3347 HydroNode::VersionedNetwork {
3348 fork,
3349 version,
3350 deserialize_fn,
3351 metadata,
3352 } => {
3353 let cloned_fork = if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
3354 SharedNode(transformed.clone())
3355 } else {
3356 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3357 seen_tees.insert(fork.as_ptr(), new_rc.clone());
3358 let cloned = fork.0.borrow().deep_clone(seen_tees);
3359 *new_rc.borrow_mut() = cloned;
3360 SharedNode(new_rc)
3361 };
3362 HydroNode::VersionedNetwork {
3363 fork: cloned_fork,
3364 version: *version,
3365 deserialize_fn: deserialize_fn.clone(),
3366 metadata: metadata.clone(),
3367 }
3368 }
3369 }
3370 }
3371
3372 #[cfg(feature = "build")]
3373 pub fn emit_core(
3374 &mut self,
3375 builders_or_callback: &mut BuildersOrCallback<
3376 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3377 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3378 >,
3379 seen_tees: &mut SeenSharedNodes,
3380 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3381 next_stmt_id: &mut crate::Counter<StmtId>,
3382 fold_hooked_idents: &mut HashSet<String>,
3383 ) -> syn::Ident {
3384 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3385
3386 self.transform_bottom_up(
3387 &mut |node: &mut HydroNode| {
3388 let out_location = node.metadata().location_id.clone();
3389 match node {
3390 HydroNode::Placeholder => {
3391 panic!()
3392 }
3393
3394 HydroNode::Cast { .. } => {
3395 let _ = next_stmt_id.get_and_increment();
3398 match builders_or_callback {
3399 BuildersOrCallback::Builders(_) => {}
3400 BuildersOrCallback::Callback(_, node_callback) => {
3401 node_callback(node, next_stmt_id);
3402 }
3403 }
3404 }
3406
3407 HydroNode::UnboundSingleton { .. } => {
3408 let inner_ident = ident_stack.pop().unwrap();
3409
3410 let stmt_id = next_stmt_id.get_and_increment();
3411 let out_ident =
3412 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3413
3414 match builders_or_callback {
3415 BuildersOrCallback::Builders(graph_builders) => {
3416 if graph_builders.singleton_intermediates() {
3417 let builder = graph_builders.get_dfir_mut(&out_location);
3418 builder.add_dfir(
3419 parse_quote! {
3420 #out_ident = #inner_ident;
3421 },
3422 None,
3423 None,
3424 );
3425 } else {
3426 let builder = graph_builders.get_dfir_mut(&out_location);
3427 builder.add_dfir(
3428 parse_quote! {
3429 #out_ident = #inner_ident -> persist::<'static>();
3430 },
3431 None,
3432 None,
3433 );
3434 }
3435 }
3436 BuildersOrCallback::Callback(_, node_callback) => {
3437 node_callback(node, next_stmt_id);
3438 }
3439 }
3440
3441 ident_stack.push(out_ident);
3442 }
3443
3444 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3445 let inner_ident = ident_stack.pop().unwrap();
3446
3447 let stmt_id = next_stmt_id.get_and_increment();
3448 let out_ident =
3449 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3450
3451 match builders_or_callback {
3452 BuildersOrCallback::Builders(graph_builders) => {
3453 graph_builders.assert_is_consistent(
3454 *trusted,
3455 &inner.metadata().location_id,
3456 inner_ident,
3457 &out_ident,
3458 );
3459 }
3460 BuildersOrCallback::Callback(_, node_callback) => {
3461 node_callback(node, next_stmt_id);
3462 }
3463 }
3464
3465 ident_stack.push(out_ident);
3466 }
3467
3468 HydroNode::ObserveNonDet {
3469 inner,
3470 trusted,
3471 metadata,
3472 ..
3473 } => {
3474 let inner_ident = ident_stack.pop().unwrap();
3475
3476 let stmt_id = next_stmt_id.get_and_increment();
3477 let observe_ident =
3478 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3479
3480 match builders_or_callback {
3481 BuildersOrCallback::Builders(graph_builders) => {
3482 graph_builders.observe_nondet(
3483 *trusted,
3484 &inner.metadata().location_id,
3485 inner_ident,
3486 &inner.metadata().collection_kind,
3487 &observe_ident,
3488 &metadata.collection_kind,
3489 &metadata.op,
3490 );
3491 }
3492 BuildersOrCallback::Callback(_, node_callback) => {
3493 node_callback(node, next_stmt_id);
3494 }
3495 }
3496
3497 ident_stack.push(observe_ident);
3498 }
3499
3500 HydroNode::Batch {
3501 inner, metadata, ..
3502 } => {
3503 let inner_ident = ident_stack.pop().unwrap();
3504
3505 let stmt_id = next_stmt_id.get_and_increment();
3506 let batch_ident =
3507 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3508
3509 match builders_or_callback {
3510 BuildersOrCallback::Builders(graph_builders) => {
3511 graph_builders.batch(
3512 inner_ident,
3513 &inner.metadata().location_id,
3514 &inner.metadata().collection_kind,
3515 &batch_ident,
3516 &out_location,
3517 &metadata.op,
3518 fold_hooked_idents,
3519 );
3520 }
3521 BuildersOrCallback::Callback(_, node_callback) => {
3522 node_callback(node, next_stmt_id);
3523 }
3524 }
3525
3526 ident_stack.push(batch_ident);
3527 }
3528
3529 HydroNode::YieldConcat { inner, .. } => {
3530 let inner_ident = ident_stack.pop().unwrap();
3531
3532 let stmt_id = next_stmt_id.get_and_increment();
3533 let yield_ident =
3534 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3535
3536 match builders_or_callback {
3537 BuildersOrCallback::Builders(graph_builders) => {
3538 graph_builders.yield_from_tick(
3539 inner_ident,
3540 &inner.metadata().location_id,
3541 &inner.metadata().collection_kind,
3542 &yield_ident,
3543 &out_location,
3544 );
3545 }
3546 BuildersOrCallback::Callback(_, node_callback) => {
3547 node_callback(node, next_stmt_id);
3548 }
3549 }
3550
3551 ident_stack.push(yield_ident);
3552 }
3553
3554 HydroNode::BeginAtomic { inner, metadata } => {
3555 let inner_ident = ident_stack.pop().unwrap();
3556
3557 let stmt_id = next_stmt_id.get_and_increment();
3558 let begin_ident =
3559 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3560
3561 match builders_or_callback {
3562 BuildersOrCallback::Builders(graph_builders) => {
3563 graph_builders.begin_atomic(
3564 inner_ident,
3565 &inner.metadata().location_id,
3566 &inner.metadata().collection_kind,
3567 &begin_ident,
3568 &out_location,
3569 &metadata.op,
3570 );
3571 }
3572 BuildersOrCallback::Callback(_, node_callback) => {
3573 node_callback(node, next_stmt_id);
3574 }
3575 }
3576
3577 ident_stack.push(begin_ident);
3578 }
3579
3580 HydroNode::EndAtomic { inner, .. } => {
3581 let inner_ident = ident_stack.pop().unwrap();
3582
3583 let stmt_id = next_stmt_id.get_and_increment();
3584 let end_ident =
3585 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3586
3587 match builders_or_callback {
3588 BuildersOrCallback::Builders(graph_builders) => {
3589 graph_builders.end_atomic(
3590 inner_ident,
3591 &inner.metadata().location_id,
3592 &inner.metadata().collection_kind,
3593 &end_ident,
3594 );
3595 }
3596 BuildersOrCallback::Callback(_, node_callback) => {
3597 node_callback(node, next_stmt_id);
3598 }
3599 }
3600
3601 ident_stack.push(end_ident);
3602 }
3603
3604 HydroNode::Source {
3605 source, metadata, ..
3606 } => {
3607 if let HydroSource::ExternalNetwork() = source {
3608 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3609 } else {
3610 let stmt_id = next_stmt_id.get_and_increment();
3611 let source_ident =
3612 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3613
3614 let source_stmt = match source {
3615 HydroSource::Stream(expr) => {
3616 debug_assert!(metadata.location_id.is_top_level());
3617 parse_quote! {
3618 #source_ident = source_stream(#expr);
3619 }
3620 }
3621
3622 HydroSource::ExternalNetwork() => {
3623 unreachable!()
3624 }
3625
3626 HydroSource::Iter(expr) => {
3627 if metadata.location_id.is_top_level() {
3628 parse_quote! {
3629 #source_ident = source_iter(#expr);
3630 }
3631 } else {
3632 parse_quote! {
3634 #source_ident = source_iter(#expr) -> persist::<'static>();
3635 }
3636 }
3637 }
3638
3639 HydroSource::Spin() => {
3640 debug_assert!(metadata.location_id.is_top_level());
3641 parse_quote! {
3642 #source_ident = spin();
3643 }
3644 }
3645
3646 HydroSource::ClusterMembers(target_loc, state) => {
3647 debug_assert!(metadata.location_id.is_top_level());
3648
3649 let members_tee_ident = syn::Ident::new(
3650 &format!(
3651 "__cluster_members_tee_{}_{}",
3652 metadata.location_id.root().key(),
3653 target_loc.key(),
3654 ),
3655 Span::call_site(),
3656 );
3657
3658 match state {
3659 ClusterMembersState::Stream(d) => {
3660 parse_quote! {
3661 #members_tee_ident = source_stream(#d) -> tee();
3662 #source_ident = #members_tee_ident;
3663 }
3664 },
3665 ClusterMembersState::Uninit => syn::parse_quote! {
3666 #source_ident = source_stream(DUMMY);
3667 },
3668 ClusterMembersState::Tee(..) => parse_quote! {
3669 #source_ident = #members_tee_ident;
3670 },
3671 }
3672 }
3673
3674 HydroSource::Embedded(ident) => {
3675 parse_quote! {
3676 #source_ident = source_stream(#ident);
3677 }
3678 }
3679
3680 HydroSource::EmbeddedSingleton(ident) => {
3681 parse_quote! {
3682 #source_ident = source_iter([#ident]);
3683 }
3684 }
3685 };
3686
3687 match builders_or_callback {
3688 BuildersOrCallback::Builders(graph_builders) => {
3689 let builder = graph_builders.get_dfir_mut(&out_location);
3690 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3691 }
3692 BuildersOrCallback::Callback(_, node_callback) => {
3693 node_callback(node, next_stmt_id);
3694 }
3695 }
3696
3697 ident_stack.push(source_ident);
3698 }
3699 }
3700
3701 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3702 let stmt_id = next_stmt_id.get_and_increment();
3703 let source_ident =
3704 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3705
3706 match builders_or_callback {
3707 BuildersOrCallback::Builders(graph_builders) => {
3708 let builder = graph_builders.get_dfir_mut(&out_location);
3709
3710 if *first_tick_only {
3711 assert!(
3712 !metadata.location_id.is_top_level(),
3713 "first_tick_only SingletonSource must be inside a tick"
3714 );
3715 }
3716
3717 if *first_tick_only
3718 || (metadata.location_id.is_top_level()
3719 && metadata.collection_kind.is_bounded())
3720 {
3721 builder.add_dfir(
3722 parse_quote! {
3723 #source_ident = source_iter([#value]);
3724 },
3725 None,
3726 Some(&stmt_id.to_string()),
3727 );
3728 } else {
3729 builder.add_dfir(
3730 parse_quote! {
3731 #source_ident = source_iter([#value]) -> persist::<'static>();
3732 },
3733 None,
3734 Some(&stmt_id.to_string()),
3735 );
3736 }
3737 }
3738 BuildersOrCallback::Callback(_, node_callback) => {
3739 node_callback(node, next_stmt_id);
3740 }
3741 }
3742
3743 ident_stack.push(source_ident);
3744 }
3745
3746 HydroNode::CycleSource { cycle_id, .. } => {
3747 let ident = cycle_id.as_ident();
3748
3749 let _ = next_stmt_id.get_and_increment();
3751
3752 match builders_or_callback {
3753 BuildersOrCallback::Builders(_) => {}
3754 BuildersOrCallback::Callback(_, node_callback) => {
3755 node_callback(node, next_stmt_id);
3756 }
3757 }
3758
3759 ident_stack.push(ident);
3760 }
3761
3762 HydroNode::Tee { inner, .. } => {
3763 let stmt_id = next_stmt_id.get_and_increment();
3766
3767 let ret_ident = if let Some(built_idents) =
3768 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3769 {
3770 match builders_or_callback {
3771 BuildersOrCallback::Builders(_) => {}
3772 BuildersOrCallback::Callback(_, node_callback) => {
3773 node_callback(node, next_stmt_id);
3774 }
3775 }
3776
3777 built_idents[0].clone()
3778 } else {
3779 let inner_ident = ident_stack.pop().unwrap();
3782
3783 let tee_ident =
3784 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3785
3786 built_tees.insert(
3787 inner.0.as_ref() as *const RefCell<HydroNode>,
3788 vec![tee_ident.clone()],
3789 );
3790
3791 match builders_or_callback {
3792 BuildersOrCallback::Builders(graph_builders) => {
3793 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3805 fold_hooked_idents.insert(tee_ident.to_string());
3806 }
3807 let builder = graph_builders.get_dfir_mut(&out_location);
3808 builder.add_dfir(
3809 parse_quote! {
3810 #tee_ident = #inner_ident -> tee();
3811 },
3812 None,
3813 Some(&stmt_id.to_string()),
3814 );
3815 }
3816 BuildersOrCallback::Callback(_, node_callback) => {
3817 node_callback(node, next_stmt_id);
3818 }
3819 }
3820
3821 tee_ident
3822 };
3823
3824 ident_stack.push(ret_ident);
3825 }
3826
3827 HydroNode::Reference { inner, kind, .. } => {
3828 let stmt_id = next_stmt_id.get_and_increment();
3831
3832 let ret_ident = if let Some(built_idents) =
3833 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3834 {
3835 built_idents[0].clone()
3836 } else {
3837 let inner_ident = ident_stack.pop().unwrap();
3838
3839 let ref_ident =
3840 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3841
3842 built_tees.insert(
3843 inner.0.as_ref() as *const RefCell<HydroNode>,
3844 vec![ref_ident.clone()],
3845 );
3846
3847 match builders_or_callback {
3848 BuildersOrCallback::Builders(graph_builders) => {
3849 let builder = graph_builders.get_dfir_mut(&out_location);
3850 let op_ident = syn::Ident::new(
3851 match kind {
3852 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3853 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3854 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3855 },
3856 Span::call_site(),
3857 );
3858 builder.add_dfir(
3859 parse_quote! {
3860 #ref_ident = #inner_ident -> #op_ident();
3861 },
3862 None,
3863 Some(&stmt_id.to_string()),
3864 );
3865 }
3866 BuildersOrCallback::Callback(_, node_callback) => {
3867 node_callback(node, next_stmt_id);
3868 }
3869 }
3870
3871 ref_ident
3872 };
3873
3874 ident_stack.push(ret_ident);
3875 }
3876
3877 HydroNode::Partition {
3878 inner, f, is_true, metadata,
3879 } => {
3880 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3882 let stmt_id = next_stmt_id.get_and_increment();
3883
3884 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3885 match builders_or_callback {
3886 BuildersOrCallback::Builders(_) => {}
3887 BuildersOrCallback::Callback(_, node_callback) => {
3888 node_callback(node, next_stmt_id);
3889 }
3890 }
3891
3892 let idx = if is_true { 0 } else { 1 };
3893 built_idents[idx].clone()
3894 } else {
3895 let inner_ident = ident_stack.pop().unwrap();
3898 let f_tokens = f.emit_tokens(&mut ident_stack);
3899
3900 let inner_ident = {
3901 let inner_borrow = inner.0.borrow();
3902 maybe_observe_for_mut(
3903 f, inner_ident,
3904 &inner_borrow.metadata().location_id,
3905 &inner_borrow.metadata().collection_kind,
3906 &metadata.op,
3907 builders_or_callback, next_stmt_id,
3908 )
3909 };
3910
3911 let partition_ident = syn::Ident::new(
3912 &format!("stream_{}_partition", stmt_id),
3913 Span::call_site(),
3914 );
3915 let true_ident = syn::Ident::new(
3916 &format!("stream_{}_true", stmt_id),
3917 Span::call_site(),
3918 );
3919 let false_ident = syn::Ident::new(
3920 &format!("stream_{}_false", stmt_id),
3921 Span::call_site(),
3922 );
3923
3924 built_tees.insert(
3925 ptr,
3926 vec![true_ident.clone(), false_ident.clone()],
3927 );
3928
3929 let stmt_id = next_stmt_id.get_and_increment();
3930 match builders_or_callback {
3931 BuildersOrCallback::Builders(graph_builders) => {
3932 let builder = graph_builders.get_dfir_mut(&out_location);
3933 builder.add_dfir(
3934 parse_quote! {
3935 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3936 #true_ident = #partition_ident[0];
3937 #false_ident = #partition_ident[1];
3938 },
3939 None,
3940 Some(&stmt_id.to_string()),
3941 );
3942 }
3943 BuildersOrCallback::Callback(_, node_callback) => {
3944 node_callback(node, next_stmt_id);
3945 }
3946 }
3947
3948 if is_true { true_ident } else { false_ident }
3949 };
3950
3951 ident_stack.push(ret_ident);
3952 }
3953
3954 HydroNode::Chain { .. } => {
3955 let second_ident = ident_stack.pop().unwrap();
3957 let first_ident = ident_stack.pop().unwrap();
3958
3959 let stmt_id = next_stmt_id.get_and_increment();
3960 let chain_ident =
3961 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3962
3963 match builders_or_callback {
3964 BuildersOrCallback::Builders(graph_builders) => {
3965 let builder = graph_builders.get_dfir_mut(&out_location);
3966 builder.add_dfir(
3967 parse_quote! {
3968 #chain_ident = chain();
3969 #first_ident -> [0]#chain_ident;
3970 #second_ident -> [1]#chain_ident;
3971 },
3972 None,
3973 Some(&stmt_id.to_string()),
3974 );
3975 }
3976 BuildersOrCallback::Callback(_, node_callback) => {
3977 node_callback(node, next_stmt_id);
3978 }
3979 }
3980
3981 ident_stack.push(chain_ident);
3982 }
3983
3984 HydroNode::MergeOrdered { first, metadata, .. } => {
3985 let second_ident = ident_stack.pop().unwrap();
3986 let first_ident = ident_stack.pop().unwrap();
3987
3988 let stmt_id = next_stmt_id.get_and_increment();
3989 let merge_ident =
3990 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3991
3992 match builders_or_callback {
3993 BuildersOrCallback::Builders(graph_builders) => {
3994 graph_builders.merge_ordered(
3995 &first.metadata().location_id,
3996 first_ident,
3997 second_ident,
3998 &merge_ident,
3999 &first.metadata().collection_kind,
4000 &metadata.op,
4001 Some(&stmt_id.to_string()),
4002 );
4003 }
4004 BuildersOrCallback::Callback(_, node_callback) => {
4005 node_callback(node, next_stmt_id);
4006 }
4007 }
4008
4009 ident_stack.push(merge_ident);
4010 }
4011
4012 HydroNode::ChainFirst { .. } => {
4013 let second_ident = ident_stack.pop().unwrap();
4014 let first_ident = ident_stack.pop().unwrap();
4015
4016 let stmt_id = next_stmt_id.get_and_increment();
4017 let chain_ident =
4018 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4019
4020 match builders_or_callback {
4021 BuildersOrCallback::Builders(graph_builders) => {
4022 let builder = graph_builders.get_dfir_mut(&out_location);
4023 builder.add_dfir(
4024 parse_quote! {
4025 #chain_ident = chain_first_n(1);
4026 #first_ident -> [0]#chain_ident;
4027 #second_ident -> [1]#chain_ident;
4028 },
4029 None,
4030 Some(&stmt_id.to_string()),
4031 );
4032 }
4033 BuildersOrCallback::Callback(_, node_callback) => {
4034 node_callback(node, next_stmt_id);
4035 }
4036 }
4037
4038 ident_stack.push(chain_ident);
4039 }
4040
4041 HydroNode::CrossSingleton { right, .. } => {
4042 let right_ident = ident_stack.pop().unwrap();
4043 let left_ident = ident_stack.pop().unwrap();
4044
4045 let stmt_id = next_stmt_id.get_and_increment();
4046 let cross_ident =
4047 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4048
4049 match builders_or_callback {
4050 BuildersOrCallback::Builders(graph_builders) => {
4051 let builder = graph_builders.get_dfir_mut(&out_location);
4052
4053 if right.metadata().location_id.is_top_level()
4054 && right.metadata().collection_kind.is_bounded()
4055 {
4056 builder.add_dfir(
4057 parse_quote! {
4058 #cross_ident = cross_singleton::<'static>();
4059 #left_ident -> [input]#cross_ident;
4060 #right_ident -> [single]#cross_ident;
4061 },
4062 None,
4063 Some(&stmt_id.to_string()),
4064 );
4065 } else {
4066 builder.add_dfir(
4067 parse_quote! {
4068 #cross_ident = cross_singleton();
4069 #left_ident -> [input]#cross_ident;
4070 #right_ident -> [single]#cross_ident;
4071 },
4072 None,
4073 Some(&stmt_id.to_string()),
4074 );
4075 }
4076 }
4077 BuildersOrCallback::Callback(_, node_callback) => {
4078 node_callback(node, next_stmt_id);
4079 }
4080 }
4081
4082 ident_stack.push(cross_ident);
4083 }
4084
4085 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
4086 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
4087 parse_quote!(cross_join_multiset)
4088 } else {
4089 parse_quote!(join_multiset)
4090 };
4091
4092 let (HydroNode::CrossProduct { left, right, .. }
4093 | HydroNode::Join { left, right, .. }) = node
4094 else {
4095 unreachable!()
4096 };
4097
4098 let is_top_level = left.metadata().location_id.is_top_level()
4099 && right.metadata().location_id.is_top_level();
4100 let left_lifetime = if left.metadata().location_id.is_top_level() {
4101 quote!('static)
4102 } else {
4103 quote!('tick)
4104 };
4105
4106 let right_lifetime = if right.metadata().location_id.is_top_level() {
4107 quote!('static)
4108 } else {
4109 quote!('tick)
4110 };
4111
4112 let right_ident = ident_stack.pop().unwrap();
4113 let left_ident = ident_stack.pop().unwrap();
4114
4115 let stmt_id = next_stmt_id.get_and_increment();
4116 let stream_ident =
4117 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4118
4119 match builders_or_callback {
4120 BuildersOrCallback::Builders(graph_builders) => {
4121 let builder = graph_builders.get_dfir_mut(&out_location);
4122 builder.add_dfir(
4123 if is_top_level {
4124 parse_quote! {
4127 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4128 #left_ident -> [0]#stream_ident;
4129 #right_ident -> [1]#stream_ident;
4130 }
4131 } else {
4132 parse_quote! {
4133 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4134 #left_ident -> [0]#stream_ident;
4135 #right_ident -> [1]#stream_ident;
4136 }
4137 }
4138 ,
4139 None,
4140 Some(&stmt_id.to_string()),
4141 );
4142 }
4143 BuildersOrCallback::Callback(_, node_callback) => {
4144 node_callback(node, next_stmt_id);
4145 }
4146 }
4147
4148 ident_stack.push(stream_ident);
4149 }
4150
4151 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4152 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4153 parse_quote!(difference)
4154 } else {
4155 parse_quote!(anti_join)
4156 };
4157
4158 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4159 node
4160 else {
4161 unreachable!()
4162 };
4163
4164 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4165 quote!('static)
4166 } else {
4167 quote!('tick)
4168 };
4169
4170 let neg_ident = ident_stack.pop().unwrap();
4171 let pos_ident = ident_stack.pop().unwrap();
4172
4173 let stmt_id = next_stmt_id.get_and_increment();
4174 let stream_ident =
4175 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4176
4177 match builders_or_callback {
4178 BuildersOrCallback::Builders(graph_builders) => {
4179 let builder = graph_builders.get_dfir_mut(&out_location);
4180 builder.add_dfir(
4181 parse_quote! {
4182 #stream_ident = #operator::<'tick, #neg_lifetime>();
4183 #pos_ident -> [pos]#stream_ident;
4184 #neg_ident -> [neg]#stream_ident;
4185 },
4186 None,
4187 Some(&stmt_id.to_string()),
4188 );
4189 }
4190 BuildersOrCallback::Callback(_, node_callback) => {
4191 node_callback(node, next_stmt_id);
4192 }
4193 }
4194
4195 ident_stack.push(stream_ident);
4196 }
4197
4198 HydroNode::JoinHalf { .. } => {
4199 let HydroNode::JoinHalf { right, .. } = node else {
4200 unreachable!()
4201 };
4202
4203 assert!(
4204 right.metadata().collection_kind.is_bounded(),
4205 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4206 right.metadata().collection_kind
4207 );
4208
4209 let build_lifetime = if right.metadata().location_id.is_top_level() {
4210 quote!('static)
4211 } else {
4212 quote!('tick)
4213 };
4214
4215 let build_ident = ident_stack.pop().unwrap();
4216 let probe_ident = ident_stack.pop().unwrap();
4217
4218 let stmt_id = next_stmt_id.get_and_increment();
4219 let stream_ident =
4220 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4221
4222 match builders_or_callback {
4223 BuildersOrCallback::Builders(graph_builders) => {
4224 let builder = graph_builders.get_dfir_mut(&out_location);
4225 builder.add_dfir(
4226 parse_quote! {
4227 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4228 #probe_ident -> [probe]#stream_ident;
4229 #build_ident -> [build]#stream_ident;
4230 },
4231 None,
4232 Some(&stmt_id.to_string()),
4233 );
4234 }
4235 BuildersOrCallback::Callback(_, node_callback) => {
4236 node_callback(node, next_stmt_id);
4237 }
4238 }
4239
4240 ident_stack.push(stream_ident);
4241 }
4242
4243 HydroNode::ResolveFutures { .. } => {
4244 let input_ident = ident_stack.pop().unwrap();
4245
4246 let stmt_id = next_stmt_id.get_and_increment();
4247 let futures_ident =
4248 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4249
4250 match builders_or_callback {
4251 BuildersOrCallback::Builders(graph_builders) => {
4252 let builder = graph_builders.get_dfir_mut(&out_location);
4253 builder.add_dfir(
4254 parse_quote! {
4255 #futures_ident = #input_ident -> resolve_futures();
4256 },
4257 None,
4258 Some(&stmt_id.to_string()),
4259 );
4260 }
4261 BuildersOrCallback::Callback(_, node_callback) => {
4262 node_callback(node, next_stmt_id);
4263 }
4264 }
4265
4266 ident_stack.push(futures_ident);
4267 }
4268
4269 HydroNode::ResolveFuturesBlocking { .. } => {
4270 let input_ident = ident_stack.pop().unwrap();
4271
4272 let stmt_id = next_stmt_id.get_and_increment();
4273 let futures_ident =
4274 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4275
4276 match builders_or_callback {
4277 BuildersOrCallback::Builders(graph_builders) => {
4278 let builder = graph_builders.get_dfir_mut(&out_location);
4279 builder.add_dfir(
4280 parse_quote! {
4281 #futures_ident = #input_ident -> resolve_futures_blocking();
4282 },
4283 None,
4284 Some(&stmt_id.to_string()),
4285 );
4286 }
4287 BuildersOrCallback::Callback(_, node_callback) => {
4288 node_callback(node, next_stmt_id);
4289 }
4290 }
4291
4292 ident_stack.push(futures_ident);
4293 }
4294
4295 HydroNode::ResolveFuturesOrdered { .. } => {
4296 let input_ident = ident_stack.pop().unwrap();
4297
4298 let stmt_id = next_stmt_id.get_and_increment();
4299 let futures_ident =
4300 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4301
4302 match builders_or_callback {
4303 BuildersOrCallback::Builders(graph_builders) => {
4304 let builder = graph_builders.get_dfir_mut(&out_location);
4305 builder.add_dfir(
4306 parse_quote! {
4307 #futures_ident = #input_ident -> resolve_futures_ordered();
4308 },
4309 None,
4310 Some(&stmt_id.to_string()),
4311 );
4312 }
4313 BuildersOrCallback::Callback(_, node_callback) => {
4314 node_callback(node, next_stmt_id);
4315 }
4316 }
4317
4318 ident_stack.push(futures_ident);
4319 }
4320
4321 HydroNode::Map {
4322 f,
4323 input,
4324 metadata,
4325 } => {
4326 let input_ident = ident_stack.pop().unwrap();
4328 let f_tokens = f.emit_tokens(&mut ident_stack);
4329
4330 let input_ident = maybe_observe_for_mut(
4331 f,
4332 input_ident,
4333 &input.metadata().location_id,
4334 &input.metadata().collection_kind,
4335 &metadata.op,
4336 builders_or_callback,
4337 next_stmt_id,
4338 );
4339
4340 let stmt_id = next_stmt_id.get_and_increment();
4341 let map_ident =
4342 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4343
4344 match builders_or_callback {
4345 BuildersOrCallback::Builders(graph_builders) => {
4346 let builder = graph_builders.get_dfir_mut(&out_location);
4347 builder.add_dfir(
4348 parse_quote! {
4349 #map_ident = #input_ident -> map(#f_tokens);
4350 },
4351 None,
4352 Some(&stmt_id.to_string()),
4353 );
4354 }
4355 BuildersOrCallback::Callback(_, node_callback) => {
4356 node_callback(node, next_stmt_id);
4357 }
4358 }
4359
4360 ident_stack.push(map_ident);
4361 }
4362
4363 HydroNode::FlatMap { f, input, metadata } => {
4364 let input_ident = ident_stack.pop().unwrap();
4365 let f_tokens = f.emit_tokens(&mut ident_stack);
4366
4367 let input_ident = maybe_observe_for_mut(
4368 f, input_ident,
4369 &input.metadata().location_id,
4370 &input.metadata().collection_kind,
4371 &metadata.op,
4372 builders_or_callback, next_stmt_id,
4373 );
4374
4375 let stmt_id = next_stmt_id.get_and_increment();
4376 let flat_map_ident =
4377 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4378
4379 match builders_or_callback {
4380 BuildersOrCallback::Builders(graph_builders) => {
4381 let builder = graph_builders.get_dfir_mut(&out_location);
4382 builder.add_dfir(
4383 parse_quote! {
4384 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4385 },
4386 None,
4387 Some(&stmt_id.to_string()),
4388 );
4389 }
4390 BuildersOrCallback::Callback(_, node_callback) => {
4391 node_callback(node, next_stmt_id);
4392 }
4393 }
4394
4395 ident_stack.push(flat_map_ident);
4396 }
4397
4398 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4399 let input_ident = ident_stack.pop().unwrap();
4400 let f_tokens = f.emit_tokens(&mut ident_stack);
4401
4402 let input_ident = maybe_observe_for_mut(
4403 f, input_ident,
4404 &input.metadata().location_id,
4405 &input.metadata().collection_kind,
4406 &metadata.op,
4407 builders_or_callback, next_stmt_id,
4408 );
4409
4410 let stmt_id = next_stmt_id.get_and_increment();
4411 let flat_map_stream_blocking_ident =
4412 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4413
4414 match builders_or_callback {
4415 BuildersOrCallback::Builders(graph_builders) => {
4416 let builder = graph_builders.get_dfir_mut(&out_location);
4417 builder.add_dfir(
4418 parse_quote! {
4419 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4420 },
4421 None,
4422 Some(&stmt_id.to_string()),
4423 );
4424 }
4425 BuildersOrCallback::Callback(_, node_callback) => {
4426 node_callback(node, next_stmt_id);
4427 }
4428 }
4429
4430 ident_stack.push(flat_map_stream_blocking_ident);
4431 }
4432
4433 HydroNode::Filter { f, input, metadata } => {
4434 let input_ident = ident_stack.pop().unwrap();
4435 let f_tokens = f.emit_tokens(&mut ident_stack);
4436
4437 let input_ident = maybe_observe_for_mut(
4438 f, input_ident,
4439 &input.metadata().location_id,
4440 &input.metadata().collection_kind,
4441 &metadata.op,
4442 builders_or_callback, next_stmt_id,
4443 );
4444
4445 let stmt_id = next_stmt_id.get_and_increment();
4446 let filter_ident =
4447 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4448
4449 match builders_or_callback {
4450 BuildersOrCallback::Builders(graph_builders) => {
4451 let builder = graph_builders.get_dfir_mut(&out_location);
4452 builder.add_dfir(
4453 parse_quote! {
4454 #filter_ident = #input_ident -> filter(#f_tokens);
4455 },
4456 None,
4457 Some(&stmt_id.to_string()),
4458 );
4459 }
4460 BuildersOrCallback::Callback(_, node_callback) => {
4461 node_callback(node, next_stmt_id);
4462 }
4463 }
4464
4465 ident_stack.push(filter_ident);
4466 }
4467
4468 HydroNode::FilterMap { f, input, metadata } => {
4469 let input_ident = ident_stack.pop().unwrap();
4470 let f_tokens = f.emit_tokens(&mut ident_stack);
4471
4472 let input_ident = maybe_observe_for_mut(
4473 f, input_ident,
4474 &input.metadata().location_id,
4475 &input.metadata().collection_kind,
4476 &metadata.op,
4477 builders_or_callback, next_stmt_id,
4478 );
4479
4480 let stmt_id = next_stmt_id.get_and_increment();
4481 let filter_map_ident =
4482 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4483
4484 match builders_or_callback {
4485 BuildersOrCallback::Builders(graph_builders) => {
4486 let builder = graph_builders.get_dfir_mut(&out_location);
4487 builder.add_dfir(
4488 parse_quote! {
4489 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4490 },
4491 None,
4492 Some(&stmt_id.to_string()),
4493 );
4494 }
4495 BuildersOrCallback::Callback(_, node_callback) => {
4496 node_callback(node, next_stmt_id);
4497 }
4498 }
4499
4500 ident_stack.push(filter_map_ident);
4501 }
4502
4503 HydroNode::Sort { .. } => {
4504 let input_ident = ident_stack.pop().unwrap();
4505
4506 let stmt_id = next_stmt_id.get_and_increment();
4507 let sort_ident =
4508 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4509
4510 match builders_or_callback {
4511 BuildersOrCallback::Builders(graph_builders) => {
4512 let builder = graph_builders.get_dfir_mut(&out_location);
4513 builder.add_dfir(
4514 parse_quote! {
4515 #sort_ident = #input_ident -> sort();
4516 },
4517 None,
4518 Some(&stmt_id.to_string()),
4519 );
4520 }
4521 BuildersOrCallback::Callback(_, node_callback) => {
4522 node_callback(node, next_stmt_id);
4523 }
4524 }
4525
4526 ident_stack.push(sort_ident);
4527 }
4528
4529 HydroNode::DeferTick { .. } => {
4530 let input_ident = ident_stack.pop().unwrap();
4531
4532 let stmt_id = next_stmt_id.get_and_increment();
4533 let defer_tick_ident =
4534 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4535
4536 match builders_or_callback {
4537 BuildersOrCallback::Builders(graph_builders) => {
4538 let builder = graph_builders.get_dfir_mut(&out_location);
4539 builder.add_dfir(
4540 parse_quote! {
4541 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4542 },
4543 None,
4544 Some(&stmt_id.to_string()),
4545 );
4546 }
4547 BuildersOrCallback::Callback(_, node_callback) => {
4548 node_callback(node, next_stmt_id);
4549 }
4550 }
4551
4552 ident_stack.push(defer_tick_ident);
4553 }
4554
4555 HydroNode::Enumerate { input, .. } => {
4556 let input_ident = ident_stack.pop().unwrap();
4557
4558 let stmt_id = next_stmt_id.get_and_increment();
4559 let enumerate_ident =
4560 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4561
4562 match builders_or_callback {
4563 BuildersOrCallback::Builders(graph_builders) => {
4564 let builder = graph_builders.get_dfir_mut(&out_location);
4565 let lifetime = if input.metadata().location_id.is_top_level() {
4566 quote!('static)
4567 } else {
4568 quote!('tick)
4569 };
4570 builder.add_dfir(
4571 parse_quote! {
4572 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4573 },
4574 None,
4575 Some(&stmt_id.to_string()),
4576 );
4577 }
4578 BuildersOrCallback::Callback(_, node_callback) => {
4579 node_callback(node, next_stmt_id);
4580 }
4581 }
4582
4583 ident_stack.push(enumerate_ident);
4584 }
4585
4586 HydroNode::Inspect { f, input, metadata } => {
4587 let input_ident = ident_stack.pop().unwrap();
4588 let f_tokens = f.emit_tokens(&mut ident_stack);
4589
4590 let input_ident = maybe_observe_for_mut(
4591 f, input_ident,
4592 &input.metadata().location_id,
4593 &input.metadata().collection_kind,
4594 &metadata.op,
4595 builders_or_callback, next_stmt_id,
4596 );
4597
4598 let stmt_id = next_stmt_id.get_and_increment();
4599 let inspect_ident =
4600 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4601
4602 match builders_or_callback {
4603 BuildersOrCallback::Builders(graph_builders) => {
4604 let builder = graph_builders.get_dfir_mut(&out_location);
4605 builder.add_dfir(
4606 parse_quote! {
4607 #inspect_ident = #input_ident -> inspect(#f_tokens);
4608 },
4609 None,
4610 Some(&stmt_id.to_string()),
4611 );
4612 }
4613 BuildersOrCallback::Callback(_, node_callback) => {
4614 node_callback(node, next_stmt_id);
4615 }
4616 }
4617
4618 ident_stack.push(inspect_ident);
4619 }
4620
4621 HydroNode::Unique { input, .. } => {
4622 let input_ident = ident_stack.pop().unwrap();
4623
4624 let stmt_id = next_stmt_id.get_and_increment();
4625 let unique_ident =
4626 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4627
4628 match builders_or_callback {
4629 BuildersOrCallback::Builders(graph_builders) => {
4630 let builder = graph_builders.get_dfir_mut(&out_location);
4631 let lifetime = if input.metadata().location_id.is_top_level() {
4632 quote!('static)
4633 } else {
4634 quote!('tick)
4635 };
4636
4637 builder.add_dfir(
4638 parse_quote! {
4639 #unique_ident = #input_ident -> unique::<#lifetime>();
4640 },
4641 None,
4642 Some(&stmt_id.to_string()),
4643 );
4644 }
4645 BuildersOrCallback::Callback(_, node_callback) => {
4646 node_callback(node, next_stmt_id);
4647 }
4648 }
4649
4650 ident_stack.push(unique_ident);
4651 }
4652
4653 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4654 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4655 if input.metadata().location_id.is_top_level()
4656 && input.metadata().collection_kind.is_bounded()
4657 {
4658 parse_quote!(fold_no_replay)
4659 } else {
4660 parse_quote!(fold)
4661 }
4662 } else if matches!(node, HydroNode::Scan { .. }) {
4663 parse_quote!(scan)
4664 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4665 parse_quote!(scan_async_blocking)
4666 } else if let HydroNode::FoldKeyed { input, .. } = node {
4667 if input.metadata().location_id.is_top_level()
4668 && input.metadata().collection_kind.is_bounded()
4669 {
4670 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4671 } else {
4672 parse_quote!(fold_keyed)
4673 }
4674 } else {
4675 unreachable!()
4676 };
4677
4678 let (HydroNode::Fold { input, .. }
4679 | HydroNode::FoldKeyed { input, .. }
4680 | HydroNode::Scan { input, .. }
4681 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4682 else {
4683 unreachable!()
4684 };
4685
4686 let lifetime = if input.metadata().location_id.is_top_level() {
4687 quote!('static)
4688 } else {
4689 quote!('tick)
4690 };
4691
4692 let input_ident = ident_stack.pop().unwrap();
4693
4694 let (HydroNode::Fold { init, acc, .. }
4695 | HydroNode::FoldKeyed { init, acc, .. }
4696 | HydroNode::Scan { init, acc, .. }
4697 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4698 else {
4699 unreachable!()
4700 };
4701
4702 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4703 let init_tokens = init.emit_tokens(&mut ident_stack);
4704
4705 let stmt_id = next_stmt_id.get_and_increment();
4706 let fold_ident =
4707 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4708
4709 match builders_or_callback {
4710 BuildersOrCallback::Builders(graph_builders) => {
4711 if matches!(node, HydroNode::Fold { .. })
4712 && node.metadata().location_id.is_top_level()
4713 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4714 && graph_builders.singleton_intermediates()
4715 && !node.metadata().collection_kind.is_bounded()
4716 {
4717 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4718 let hooked_input_ident = graph_builders.emit_fold_hook(
4719 &input.metadata().location_id,
4720 &input_ident,
4721 &input.metadata().collection_kind,
4722 &node.metadata().op,
4723 );
4724
4725 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4726 let acc: syn::Expr = parse_quote!({
4727 let mut __inner = #acc_tokens;
4728 move |__state, __batch: Vec<_>| {
4729 if __batch.is_empty() {
4730 return None;
4731 }
4732 for __value in __batch {
4733 __inner(__state, __value);
4734 }
4735 Some(__state.clone())
4736 }
4737 });
4738 (hooked, acc)
4739 } else {
4740 let acc: syn::Expr = parse_quote!({
4741 let mut __inner = #acc_tokens;
4742 move |__state, __value| {
4743 __inner(__state, __value);
4744 Some(__state.clone())
4745 }
4746 });
4747 (&input_ident, acc)
4748 };
4749
4750 let builder = graph_builders.get_dfir_mut(&out_location);
4751 builder.add_dfir(
4752 parse_quote! {
4753 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4754 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4755 #fold_ident = chain();
4756 },
4757 None,
4758 Some(&stmt_id.to_string()),
4759 );
4760
4761 if hooked_input_ident.is_some() {
4762 fold_hooked_idents.insert(fold_ident.to_string());
4763 }
4764 } else if matches!(node, HydroNode::FoldKeyed { .. })
4765 && node.metadata().location_id.is_top_level()
4766 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4767 && graph_builders.singleton_intermediates()
4768 && !node.metadata().collection_kind.is_bounded()
4769 {
4770 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4771 let hooked_input_ident = graph_builders.emit_fold_hook(
4772 &input.metadata().location_id,
4773 &input_ident,
4774 &input.metadata().collection_kind,
4775 &node.metadata().op,
4776 );
4777 let builder = graph_builders.get_dfir_mut(&out_location);
4778
4779 let wrapped_acc: syn::Expr = parse_quote!({
4780 let mut __init = #init_tokens;
4781 let mut __inner = #acc_tokens;
4782 move |__state, __kv: (_, _)| {
4783 let __state = __state
4785 .entry(::std::clone::Clone::clone(&__kv.0))
4786 .or_insert_with(|| (__init)());
4787 __inner(__state, __kv.1);
4788 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4789 }
4790 });
4791
4792 if let Some(hooked_input_ident) = hooked_input_ident {
4793 builder.add_dfir(
4794 parse_quote! {
4795 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4796 },
4797 None,
4798 Some(&stmt_id.to_string()),
4799 );
4800
4801 fold_hooked_idents.insert(fold_ident.to_string());
4802 } else {
4803 builder.add_dfir(
4804 parse_quote! {
4805 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4806 },
4807 None,
4808 Some(&stmt_id.to_string()),
4809 );
4810 }
4811 } else if (matches!(node, HydroNode::Fold { .. })
4812 || matches!(node, HydroNode::FoldKeyed { .. }))
4813 && !node.metadata().location_id.is_top_level()
4814 && graph_builders.singleton_intermediates()
4815 {
4816 let input_ref = match &*node {
4817 HydroNode::Fold { input, .. } => input,
4818 HydroNode::FoldKeyed { input, .. } => input,
4819 _ => unreachable!(),
4820 };
4821 let hooked_input_ident = graph_builders.emit_fold_hook(
4822 &input_ref.metadata().location_id,
4823 &input_ident,
4824 &input_ref.metadata().collection_kind,
4825 &node.metadata().op,
4826 );
4827
4828 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4829 let builder = graph_builders.get_dfir_mut(&out_location);
4830 builder.add_dfir(
4831 parse_quote! {
4832 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4833 },
4834 None,
4835 Some(&stmt_id.to_string()),
4836 );
4837 } else {
4838 let builder = graph_builders.get_dfir_mut(&out_location);
4839 builder.add_dfir(
4840 parse_quote! {
4841 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4842 },
4843 None,
4844 Some(&stmt_id.to_string()),
4845 );
4846 }
4847 }
4848 BuildersOrCallback::Callback(_, node_callback) => {
4849 node_callback(node, next_stmt_id);
4850 }
4851 }
4852
4853 ident_stack.push(fold_ident);
4854 }
4855
4856 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4857 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4858 if input.metadata().location_id.is_top_level()
4859 && input.metadata().collection_kind.is_bounded()
4860 {
4861 parse_quote!(reduce_no_replay)
4862 } else {
4863 parse_quote!(reduce)
4864 }
4865 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4866 if input.metadata().location_id.is_top_level()
4867 && input.metadata().collection_kind.is_bounded()
4868 {
4869 todo!(
4870 "Calling keyed reduce on a top-level bounded collection is not supported"
4871 )
4872 } else {
4873 parse_quote!(reduce_keyed)
4874 }
4875 } else {
4876 unreachable!()
4877 };
4878
4879 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4880 else {
4881 unreachable!()
4882 };
4883
4884 let lifetime = if input.metadata().location_id.is_top_level() {
4885 quote!('static)
4886 } else {
4887 quote!('tick)
4888 };
4889
4890 let input_ident = ident_stack.pop().unwrap();
4891
4892 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4893 else {
4894 unreachable!()
4895 };
4896
4897 let f_tokens = f.emit_tokens(&mut ident_stack);
4898
4899 let stmt_id = next_stmt_id.get_and_increment();
4900 let reduce_ident =
4901 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4902
4903 match builders_or_callback {
4904 BuildersOrCallback::Builders(graph_builders) => {
4905 if matches!(node, HydroNode::Reduce { .. })
4906 && node.metadata().location_id.is_top_level()
4907 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4908 && graph_builders.singleton_intermediates()
4909 && !node.metadata().collection_kind.is_bounded()
4910 {
4911 todo!(
4912 "Reduce with optional intermediates is not yet supported in simulator"
4913 );
4914 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4915 && node.metadata().location_id.is_top_level()
4916 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4917 && graph_builders.singleton_intermediates()
4918 && !node.metadata().collection_kind.is_bounded()
4919 {
4920 todo!(
4921 "Reduce keyed with optional intermediates is not yet supported in simulator"
4922 );
4923 } else {
4924 let builder = graph_builders.get_dfir_mut(&out_location);
4925 builder.add_dfir(
4926 parse_quote! {
4927 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4928 },
4929 None,
4930 Some(&stmt_id.to_string()),
4931 );
4932 }
4933 }
4934 BuildersOrCallback::Callback(_, node_callback) => {
4935 node_callback(node, next_stmt_id);
4936 }
4937 }
4938
4939 ident_stack.push(reduce_ident);
4940 }
4941
4942 HydroNode::ReduceKeyedWatermark {
4943 f,
4944 input,
4945 metadata,
4946 ..
4947 } => {
4948 let lifetime = if input.metadata().location_id.is_top_level() {
4949 quote!('static)
4950 } else {
4951 quote!('tick)
4952 };
4953
4954 let watermark_ident = ident_stack.pop().unwrap();
4956 let input_ident = ident_stack.pop().unwrap();
4957 let f_tokens = f.emit_tokens(&mut ident_stack);
4958
4959 let stmt_id = next_stmt_id.get_and_increment();
4960 let chain_ident = syn::Ident::new(
4961 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4962 Span::call_site(),
4963 );
4964
4965 let fold_ident =
4966 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4967
4968 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4969 && input.metadata().collection_kind.is_bounded()
4970 {
4971 parse_quote!(fold_no_replay)
4972 } else {
4973 parse_quote!(fold)
4974 };
4975
4976 match builders_or_callback {
4977 BuildersOrCallback::Builders(graph_builders) => {
4978 if metadata.location_id.is_top_level()
4979 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4980 && graph_builders.singleton_intermediates()
4981 && !metadata.collection_kind.is_bounded()
4982 {
4983 todo!(
4984 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4985 )
4986 } else {
4987 let builder = graph_builders.get_dfir_mut(&out_location);
4988 builder.add_dfir(
4989 parse_quote! {
4990 #chain_ident = chain();
4991 #input_ident
4992 -> map(|x| (Some(x), None))
4993 -> [0]#chain_ident;
4994 #watermark_ident
4995 -> map(|watermark| (None, Some(watermark)))
4996 -> [1]#chain_ident;
4997
4998 #fold_ident = #chain_ident
4999 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
5000 let __reduce_keyed_fn = #f_tokens;
5001 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
5002 if let Some((k, v)) = opt_payload {
5003 if let Some(curr_watermark) = *opt_curr_watermark {
5004 if k < curr_watermark {
5005 return;
5006 }
5007 }
5008 match map.entry(k) {
5009 ::std::collections::hash_map::Entry::Vacant(e) => {
5010 e.insert(v);
5011 }
5012 ::std::collections::hash_map::Entry::Occupied(mut e) => {
5013 __reduce_keyed_fn(e.get_mut(), v);
5014 }
5015 }
5016 } else {
5017 let watermark = opt_watermark.unwrap();
5018 if let Some(curr_watermark) = *opt_curr_watermark {
5019 if watermark <= curr_watermark {
5020 return;
5021 }
5022 }
5023 map.retain(|k, _| *k >= watermark);
5024 *opt_curr_watermark = Some(watermark);
5025 }
5026 }
5027 })
5028 -> flat_map(|(map, _curr_watermark)| map);
5029 },
5030 None,
5031 Some(&stmt_id.to_string()),
5032 );
5033 }
5034 }
5035 BuildersOrCallback::Callback(_, node_callback) => {
5036 node_callback(node, next_stmt_id);
5037 }
5038 }
5039
5040 ident_stack.push(fold_ident);
5041 }
5042
5043 HydroNode::Network {
5044 networking_info,
5045 serialize_fn: serialize_pipeline,
5046 instantiate_fn,
5047 deserialize_fn: deserialize_pipeline,
5048 input,
5049 ..
5050 } => {
5051 let input_ident = ident_stack.pop().unwrap();
5052
5053 let stmt_id = next_stmt_id.get_and_increment();
5054 let receiver_stream_ident =
5055 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5056
5057 match builders_or_callback {
5058 BuildersOrCallback::Builders(graph_builders) => {
5059 let (sink_expr, source_expr) = match instantiate_fn {
5060 DebugInstantiate::Building => (
5061 syn::parse_quote!(DUMMY_SINK),
5062 syn::parse_quote!(DUMMY_SOURCE),
5063 ),
5064
5065 DebugInstantiate::Finalized(finalized) => {
5066 (finalized.sink.clone(), finalized.source.clone())
5067 }
5068 };
5069
5070 graph_builders.create_network(
5071 &input.metadata().location_id,
5072 &out_location,
5073 input_ident,
5074 &receiver_stream_ident,
5075 serialize_pipeline.as_ref(),
5076 sink_expr,
5077 source_expr,
5078 deserialize_pipeline.as_ref(),
5079 stmt_id,
5080 networking_info,
5081 );
5082 }
5083 BuildersOrCallback::Callback(_, node_callback) => {
5084 node_callback(node, next_stmt_id);
5085 }
5086 }
5087
5088 ident_stack.push(receiver_stream_ident);
5089 }
5090
5091 HydroNode::ExternalInput {
5092 instantiate_fn,
5093 deserialize_fn: deserialize_pipeline,
5094 ..
5095 } => {
5096 let stmt_id = next_stmt_id.get_and_increment();
5097 let receiver_stream_ident =
5098 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5099
5100 match builders_or_callback {
5101 BuildersOrCallback::Builders(graph_builders) => {
5102 let (_, source_expr) = match instantiate_fn {
5103 DebugInstantiate::Building => (
5104 syn::parse_quote!(DUMMY_SINK),
5105 syn::parse_quote!(DUMMY_SOURCE),
5106 ),
5107
5108 DebugInstantiate::Finalized(finalized) => {
5109 (finalized.sink.clone(), finalized.source.clone())
5110 }
5111 };
5112
5113 graph_builders.create_external_source(
5114 &out_location,
5115 source_expr,
5116 &receiver_stream_ident,
5117 deserialize_pipeline.as_ref(),
5118 stmt_id,
5119 );
5120 }
5121 BuildersOrCallback::Callback(_, node_callback) => {
5122 node_callback(node, next_stmt_id);
5123 }
5124 }
5125
5126 ident_stack.push(receiver_stream_ident);
5127 }
5128
5129 HydroNode::Counter {
5130 tag,
5131 duration,
5132 prefix,
5133 ..
5134 } => {
5135 let input_ident = ident_stack.pop().unwrap();
5136
5137 let stmt_id = next_stmt_id.get_and_increment();
5138 let counter_ident =
5139 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5140
5141 match builders_or_callback {
5142 BuildersOrCallback::Builders(graph_builders) => {
5143 let arg = format!("{}({})", prefix, tag);
5144 let builder = graph_builders.get_dfir_mut(&out_location);
5145 builder.add_dfir(
5146 parse_quote! {
5147 #counter_ident = #input_ident -> _counter(#arg, #duration);
5148 },
5149 None,
5150 Some(&stmt_id.to_string()),
5151 );
5152 }
5153 BuildersOrCallback::Callback(_, node_callback) => {
5154 node_callback(node, next_stmt_id);
5155 }
5156 }
5157
5158 ident_stack.push(counter_ident);
5159 }
5160
5161 HydroNode::VersionedNetworkFork {
5162 channel_id,
5163 senders,
5164 metadata,
5165 ..
5166 } => {
5167 let split_at = ident_stack.len() - senders.len();
5169 let sender_idents = ident_stack.split_off(split_at);
5170
5171 let stmt_id = next_stmt_id.get_and_increment();
5172
5173 match builders_or_callback {
5174 BuildersOrCallback::Builders(graph_builders) => {
5175 let sender_args: Vec<(LocationId, syn::Ident, Option<DebugExpr>)> =
5176 senders
5177 .iter()
5178 .zip(sender_idents)
5179 .map(|((_version, sender, serialize), ident)| {
5180 (
5181 sender.metadata().location_id.clone(),
5182 ident,
5183 serialize.clone(),
5184 )
5185 })
5186 .collect();
5187 graph_builders.create_versioned_network_fork(
5188 *channel_id,
5189 &metadata.location_id,
5190 sender_args,
5191 stmt_id,
5192 );
5193 }
5194 BuildersOrCallback::Callback(_, node_callback) => {
5195 node_callback(node, next_stmt_id);
5196 }
5197 }
5198 }
5199
5200 HydroNode::VersionedNetwork {
5201 fork,
5202 deserialize_fn,
5203 metadata,
5204 ..
5205 } => {
5206 let stmt_id = next_stmt_id.get_and_increment();
5207 let receiver_stream_ident =
5208 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5209
5210 let (channel_id, source_loc) = {
5213 let fork_ref = fork.0.borrow();
5214 let HydroNode::VersionedNetworkFork {
5215 channel_id,
5216 senders,
5217 ..
5218 } = &*fork_ref
5219 else {
5220 unreachable!("VersionedNetwork.fork must be a VersionedNetworkFork");
5221 };
5222 let source_loc = senders
5223 .first()
5224 .map(|(_v, sender, _s)| sender.metadata().location_id.clone())
5225 .expect("a VersionedNetworkFork always has at least one sender");
5226 (*channel_id, source_loc)
5227 };
5228
5229 match builders_or_callback {
5230 BuildersOrCallback::Builders(graph_builders) => {
5231 graph_builders.create_versioned_network(
5232 channel_id,
5233 &source_loc,
5234 &metadata.location_id,
5235 &receiver_stream_ident,
5236 deserialize_fn.as_ref(),
5237 stmt_id,
5238 );
5239 }
5240 BuildersOrCallback::Callback(_, node_callback) => {
5241 node_callback(node, next_stmt_id);
5242 }
5243 }
5244
5245 ident_stack.push(receiver_stream_ident);
5246 }
5247 }
5248 },
5249 seen_tees,
5250 false,
5251 );
5252
5253 let ret = ident_stack
5254 .pop()
5255 .expect("ident_stack should have exactly one element after traversal");
5256 assert!(
5257 ident_stack.is_empty(),
5258 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5259 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5260 ident_stack.len()
5261 );
5262 ret
5263 }
5264
5265 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5266 match self {
5267 HydroNode::Placeholder => {
5268 panic!()
5269 }
5270 HydroNode::Cast { .. }
5271 | HydroNode::ObserveNonDet { .. }
5272 | HydroNode::UnboundSingleton { .. }
5273 | HydroNode::AssertIsConsistent { .. } => {}
5274 HydroNode::Source { source, .. } => match source {
5275 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5276 HydroSource::ExternalNetwork()
5277 | HydroSource::Spin()
5278 | HydroSource::ClusterMembers(_, _)
5279 | HydroSource::Embedded(_)
5280 | HydroSource::EmbeddedSingleton(_) => {} },
5282 HydroNode::SingletonSource { value, .. } => {
5283 transform(value);
5284 }
5285 HydroNode::CycleSource { .. }
5286 | HydroNode::Tee { .. }
5287 | HydroNode::Reference { .. }
5288 | HydroNode::YieldConcat { .. }
5289 | HydroNode::BeginAtomic { .. }
5290 | HydroNode::EndAtomic { .. }
5291 | HydroNode::Batch { .. }
5292 | HydroNode::Chain { .. }
5293 | HydroNode::MergeOrdered { .. }
5294 | HydroNode::ChainFirst { .. }
5295 | HydroNode::CrossProduct { .. }
5296 | HydroNode::CrossSingleton { .. }
5297 | HydroNode::ResolveFutures { .. }
5298 | HydroNode::ResolveFuturesBlocking { .. }
5299 | HydroNode::ResolveFuturesOrdered { .. }
5300 | HydroNode::Join { .. }
5301 | HydroNode::JoinHalf { .. }
5302 | HydroNode::Difference { .. }
5303 | HydroNode::AntiJoin { .. }
5304 | HydroNode::DeferTick { .. }
5305 | HydroNode::Enumerate { .. }
5306 | HydroNode::Unique { .. }
5307 | HydroNode::Sort { .. }
5308 | HydroNode::VersionedNetworkFork { .. }
5309 | HydroNode::VersionedNetwork { .. } => {}
5310 HydroNode::Map { f, .. }
5311 | HydroNode::FlatMap { f, .. }
5312 | HydroNode::FlatMapStreamBlocking { f, .. }
5313 | HydroNode::Filter { f, .. }
5314 | HydroNode::FilterMap { f, .. }
5315 | HydroNode::Inspect { f, .. }
5316 | HydroNode::Partition { f, .. }
5317 | HydroNode::Reduce { f, .. }
5318 | HydroNode::ReduceKeyed { f, .. }
5319 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5320 transform(&mut f.expr);
5321 }
5322 HydroNode::Fold { init, acc, .. }
5323 | HydroNode::Scan { init, acc, .. }
5324 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5325 | HydroNode::FoldKeyed { init, acc, .. } => {
5326 transform(&mut init.expr);
5327 transform(&mut acc.expr);
5328 }
5329 HydroNode::Network {
5330 serialize_fn,
5331 deserialize_fn,
5332 ..
5333 } => {
5334 if let Some(serialize_fn) = serialize_fn {
5335 transform(serialize_fn);
5336 }
5337 if let Some(deserialize_fn) = deserialize_fn {
5338 transform(deserialize_fn);
5339 }
5340 }
5341 HydroNode::ExternalInput { deserialize_fn, .. } => {
5342 if let Some(deserialize_fn) = deserialize_fn {
5343 transform(deserialize_fn);
5344 }
5345 }
5346 HydroNode::Counter { duration, .. } => {
5347 transform(duration);
5348 }
5349 }
5350 }
5351
5352 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5353 &self.metadata().op
5354 }
5355
5356 pub fn metadata(&self) -> &HydroIrMetadata {
5357 match self {
5358 HydroNode::Placeholder => {
5359 panic!()
5360 }
5361 HydroNode::VersionedNetworkFork { metadata, .. }
5362 | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5363 HydroNode::Cast { metadata, .. }
5364 | HydroNode::ObserveNonDet { metadata, .. }
5365 | HydroNode::AssertIsConsistent { metadata, .. }
5366 | HydroNode::UnboundSingleton { metadata, .. }
5367 | HydroNode::Source { metadata, .. }
5368 | HydroNode::SingletonSource { metadata, .. }
5369 | HydroNode::CycleSource { metadata, .. }
5370 | HydroNode::Tee { metadata, .. }
5371 | HydroNode::Reference { metadata, .. }
5372 | HydroNode::Partition { metadata, .. }
5373 | HydroNode::YieldConcat { metadata, .. }
5374 | HydroNode::BeginAtomic { metadata, .. }
5375 | HydroNode::EndAtomic { metadata, .. }
5376 | HydroNode::Batch { metadata, .. }
5377 | HydroNode::Chain { metadata, .. }
5378 | HydroNode::MergeOrdered { metadata, .. }
5379 | HydroNode::ChainFirst { metadata, .. }
5380 | HydroNode::CrossProduct { metadata, .. }
5381 | HydroNode::CrossSingleton { metadata, .. }
5382 | HydroNode::Join { metadata, .. }
5383 | HydroNode::JoinHalf { metadata, .. }
5384 | HydroNode::Difference { metadata, .. }
5385 | HydroNode::AntiJoin { metadata, .. }
5386 | HydroNode::ResolveFutures { metadata, .. }
5387 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5388 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5389 | HydroNode::Map { metadata, .. }
5390 | HydroNode::FlatMap { metadata, .. }
5391 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5392 | HydroNode::Filter { metadata, .. }
5393 | HydroNode::FilterMap { metadata, .. }
5394 | HydroNode::DeferTick { metadata, .. }
5395 | HydroNode::Enumerate { metadata, .. }
5396 | HydroNode::Inspect { metadata, .. }
5397 | HydroNode::Unique { metadata, .. }
5398 | HydroNode::Sort { metadata, .. }
5399 | HydroNode::Scan { metadata, .. }
5400 | HydroNode::ScanAsyncBlocking { metadata, .. }
5401 | HydroNode::Fold { metadata, .. }
5402 | HydroNode::FoldKeyed { metadata, .. }
5403 | HydroNode::Reduce { metadata, .. }
5404 | HydroNode::ReduceKeyed { metadata, .. }
5405 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5406 | HydroNode::ExternalInput { metadata, .. }
5407 | HydroNode::Network { metadata, .. }
5408 | HydroNode::Counter { metadata, .. } => metadata,
5409 }
5410 }
5411
5412 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5413 &mut self.metadata_mut().op
5414 }
5415
5416 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5417 match self {
5418 HydroNode::Placeholder => {
5419 panic!()
5420 }
5421 HydroNode::VersionedNetworkFork { metadata, .. }
5422 | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5423 HydroNode::Cast { metadata, .. }
5424 | HydroNode::ObserveNonDet { metadata, .. }
5425 | HydroNode::AssertIsConsistent { metadata, .. }
5426 | HydroNode::UnboundSingleton { metadata, .. }
5427 | HydroNode::Source { metadata, .. }
5428 | HydroNode::SingletonSource { metadata, .. }
5429 | HydroNode::CycleSource { metadata, .. }
5430 | HydroNode::Tee { metadata, .. }
5431 | HydroNode::Reference { metadata, .. }
5432 | HydroNode::Partition { metadata, .. }
5433 | HydroNode::YieldConcat { metadata, .. }
5434 | HydroNode::BeginAtomic { metadata, .. }
5435 | HydroNode::EndAtomic { metadata, .. }
5436 | HydroNode::Batch { metadata, .. }
5437 | HydroNode::Chain { metadata, .. }
5438 | HydroNode::MergeOrdered { metadata, .. }
5439 | HydroNode::ChainFirst { metadata, .. }
5440 | HydroNode::CrossProduct { metadata, .. }
5441 | HydroNode::CrossSingleton { metadata, .. }
5442 | HydroNode::Join { metadata, .. }
5443 | HydroNode::JoinHalf { metadata, .. }
5444 | HydroNode::Difference { metadata, .. }
5445 | HydroNode::AntiJoin { metadata, .. }
5446 | HydroNode::ResolveFutures { metadata, .. }
5447 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5448 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5449 | HydroNode::Map { metadata, .. }
5450 | HydroNode::FlatMap { metadata, .. }
5451 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5452 | HydroNode::Filter { metadata, .. }
5453 | HydroNode::FilterMap { metadata, .. }
5454 | HydroNode::DeferTick { metadata, .. }
5455 | HydroNode::Enumerate { metadata, .. }
5456 | HydroNode::Inspect { metadata, .. }
5457 | HydroNode::Unique { metadata, .. }
5458 | HydroNode::Sort { metadata, .. }
5459 | HydroNode::Scan { metadata, .. }
5460 | HydroNode::ScanAsyncBlocking { metadata, .. }
5461 | HydroNode::Fold { metadata, .. }
5462 | HydroNode::FoldKeyed { metadata, .. }
5463 | HydroNode::Reduce { metadata, .. }
5464 | HydroNode::ReduceKeyed { metadata, .. }
5465 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5466 | HydroNode::ExternalInput { metadata, .. }
5467 | HydroNode::Network { metadata, .. }
5468 | HydroNode::Counter { metadata, .. } => metadata,
5469 }
5470 }
5471
5472 pub fn input(&self) -> Vec<&HydroNode> {
5473 match self {
5474 HydroNode::Placeholder => {
5475 panic!()
5476 }
5477 HydroNode::Source { .. }
5478 | HydroNode::SingletonSource { .. }
5479 | HydroNode::ExternalInput { .. }
5480 | HydroNode::CycleSource { .. }
5481 | HydroNode::Tee { .. }
5482 | HydroNode::Reference { .. }
5483 | HydroNode::Partition { .. }
5484 | HydroNode::VersionedNetwork { .. } => {
5485 vec![]
5487 }
5488 HydroNode::Cast { inner, .. }
5489 | HydroNode::ObserveNonDet { inner, .. }
5490 | HydroNode::YieldConcat { inner, .. }
5491 | HydroNode::BeginAtomic { inner, .. }
5492 | HydroNode::EndAtomic { inner, .. }
5493 | HydroNode::Batch { inner, .. }
5494 | HydroNode::UnboundSingleton { inner, .. }
5495 | HydroNode::AssertIsConsistent { inner, .. } => {
5496 vec![inner]
5497 }
5498 HydroNode::Chain { first, second, .. } => {
5499 vec![first, second]
5500 }
5501 HydroNode::MergeOrdered { first, second, .. } => {
5502 vec![first, second]
5503 }
5504 HydroNode::ChainFirst { first, second, .. } => {
5505 vec![first, second]
5506 }
5507 HydroNode::CrossProduct { left, right, .. }
5508 | HydroNode::CrossSingleton { left, right, .. }
5509 | HydroNode::Join { left, right, .. }
5510 | HydroNode::JoinHalf { left, right, .. } => {
5511 vec![left, right]
5512 }
5513 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5514 vec![pos, neg]
5515 }
5516 HydroNode::Map { input, .. }
5517 | HydroNode::FlatMap { input, .. }
5518 | HydroNode::FlatMapStreamBlocking { input, .. }
5519 | HydroNode::Filter { input, .. }
5520 | HydroNode::FilterMap { input, .. }
5521 | HydroNode::Sort { input, .. }
5522 | HydroNode::DeferTick { input, .. }
5523 | HydroNode::Enumerate { input, .. }
5524 | HydroNode::Inspect { input, .. }
5525 | HydroNode::Unique { input, .. }
5526 | HydroNode::Network { input, .. }
5527 | HydroNode::Counter { input, .. }
5528 | HydroNode::ResolveFutures { input, .. }
5529 | HydroNode::ResolveFuturesBlocking { input, .. }
5530 | HydroNode::ResolveFuturesOrdered { input, .. }
5531 | HydroNode::Fold { input, .. }
5532 | HydroNode::FoldKeyed { input, .. }
5533 | HydroNode::Reduce { input, .. }
5534 | HydroNode::ReduceKeyed { input, .. }
5535 | HydroNode::Scan { input, .. }
5536 | HydroNode::ScanAsyncBlocking { input, .. } => {
5537 vec![input]
5538 }
5539 HydroNode::ReduceKeyedWatermark {
5540 input, watermark, ..
5541 } => {
5542 vec![input, watermark]
5543 }
5544 HydroNode::VersionedNetworkFork { senders, .. } => senders
5545 .iter()
5546 .map(|(_version, sender, _serialize)| sender.as_ref())
5547 .collect(),
5548 }
5549 }
5550
5551 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5552 self.input()
5553 .iter()
5554 .map(|input_node| input_node.metadata())
5555 .collect()
5556 }
5557
5558 pub fn is_shared_with_others(&self) -> bool {
5562 match self {
5563 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5564 Rc::strong_count(&inner.0) > 1
5565 }
5566 HydroNode::Reference { .. } => false,
5569 _ => false,
5570 }
5571 }
5572
5573 pub fn print_root(&self) -> String {
5574 match self {
5575 HydroNode::Placeholder => {
5576 panic!()
5577 }
5578 HydroNode::Cast { .. } => "Cast()".to_owned(),
5579 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5580 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5581 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5582 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5583 HydroNode::SingletonSource {
5584 value,
5585 first_tick_only,
5586 ..
5587 } => format!(
5588 "SingletonSource({:?}, first_tick_only={})",
5589 value, first_tick_only
5590 ),
5591 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5592 HydroNode::Tee { inner, .. } => {
5593 format!("Tee({})", inner.0.borrow().print_root())
5594 }
5595 HydroNode::Reference { inner, kind, .. } => {
5596 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5597 }
5598 HydroNode::Partition { f, is_true, .. } => {
5599 format!("Partition({:?}, is_true={})", f, is_true)
5600 }
5601 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5602 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5603 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5604 HydroNode::Batch { .. } => "Batch()".to_owned(),
5605 HydroNode::Chain { first, second, .. } => {
5606 format!("Chain({}, {})", first.print_root(), second.print_root())
5607 }
5608 HydroNode::MergeOrdered { first, second, .. } => {
5609 format!(
5610 "MergeOrdered({}, {})",
5611 first.print_root(),
5612 second.print_root()
5613 )
5614 }
5615 HydroNode::ChainFirst { first, second, .. } => {
5616 format!(
5617 "ChainFirst({}, {})",
5618 first.print_root(),
5619 second.print_root()
5620 )
5621 }
5622 HydroNode::CrossProduct { left, right, .. } => {
5623 format!(
5624 "CrossProduct({}, {})",
5625 left.print_root(),
5626 right.print_root()
5627 )
5628 }
5629 HydroNode::CrossSingleton { left, right, .. } => {
5630 format!(
5631 "CrossSingleton({}, {})",
5632 left.print_root(),
5633 right.print_root()
5634 )
5635 }
5636 HydroNode::Join { left, right, .. } => {
5637 format!("Join({}, {})", left.print_root(), right.print_root())
5638 }
5639 HydroNode::JoinHalf { left, right, .. } => {
5640 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5641 }
5642 HydroNode::Difference { pos, neg, .. } => {
5643 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5644 }
5645 HydroNode::AntiJoin { pos, neg, .. } => {
5646 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5647 }
5648 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5649 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5650 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5651 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5652 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5653 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5654 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5655 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5656 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5657 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5658 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5659 HydroNode::Unique { .. } => "Unique()".to_owned(),
5660 HydroNode::Sort { .. } => "Sort()".to_owned(),
5661 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5662 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5663 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5664 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5665 }
5666 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5667 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5668 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5669 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5670 HydroNode::Network { .. } => "Network()".to_owned(),
5671 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5672 HydroNode::Counter { tag, duration, .. } => {
5673 format!("Counter({:?}, {:?})", tag, duration)
5674 }
5675 HydroNode::VersionedNetworkFork {
5676 channel_name,
5677 senders,
5678 ..
5679 } => {
5680 let versions: Vec<u32> = senders.iter().map(|(v, _, _)| *v).collect();
5681 format!(
5682 "VersionedNetworkFork({}, senders={:?})",
5683 channel_name, versions
5684 )
5685 }
5686 HydroNode::VersionedNetwork { version, .. } => {
5687 format!("VersionedNetwork(v{})", version)
5688 }
5689 }
5690 }
5691}
5692
5693#[cfg(feature = "build")]
5694fn instantiate_network<'a, D>(
5695 env: &mut D::InstantiateEnv,
5696 from_location: &LocationId,
5697 to_location: &LocationId,
5698 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5699 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5700 name: Option<&str>,
5701 networking_info: &crate::networking::NetworkingInfo,
5702) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5703where
5704 D: Deploy<'a>,
5705{
5706 let ((sink, source), connect_fn) = match (from_location, to_location) {
5707 (&LocationId::Process(from), &LocationId::Process(to)) => {
5708 let from_node = processes
5709 .get(from)
5710 .unwrap_or_else(|| {
5711 panic!("A process used in the graph was not instantiated: {}", from)
5712 })
5713 .clone();
5714 let to_node = processes
5715 .get(to)
5716 .unwrap_or_else(|| {
5717 panic!("A process used in the graph was not instantiated: {}", to)
5718 })
5719 .clone();
5720
5721 let sink_port = from_node.next_port();
5722 let source_port = to_node.next_port();
5723
5724 (
5725 D::o2o_sink_source(
5726 env,
5727 &from_node,
5728 &sink_port,
5729 &to_node,
5730 &source_port,
5731 name,
5732 networking_info,
5733 ),
5734 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5735 )
5736 }
5737 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5738 let from_node = processes
5739 .get(from)
5740 .unwrap_or_else(|| {
5741 panic!("A process used in the graph was not instantiated: {}", from)
5742 })
5743 .clone();
5744 let to_node = clusters
5745 .get(to)
5746 .unwrap_or_else(|| {
5747 panic!("A cluster used in the graph was not instantiated: {}", to)
5748 })
5749 .clone();
5750
5751 let sink_port = from_node.next_port();
5752 let source_port = to_node.next_port();
5753
5754 (
5755 D::o2m_sink_source(
5756 env,
5757 &from_node,
5758 &sink_port,
5759 &to_node,
5760 &source_port,
5761 name,
5762 networking_info,
5763 ),
5764 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5765 )
5766 }
5767 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5768 let from_node = clusters
5769 .get(from)
5770 .unwrap_or_else(|| {
5771 panic!("A cluster used in the graph was not instantiated: {}", from)
5772 })
5773 .clone();
5774 let to_node = processes
5775 .get(to)
5776 .unwrap_or_else(|| {
5777 panic!("A process used in the graph was not instantiated: {}", to)
5778 })
5779 .clone();
5780
5781 let sink_port = from_node.next_port();
5782 let source_port = to_node.next_port();
5783
5784 (
5785 D::m2o_sink_source(
5786 env,
5787 &from_node,
5788 &sink_port,
5789 &to_node,
5790 &source_port,
5791 name,
5792 networking_info,
5793 ),
5794 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5795 )
5796 }
5797 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5798 let from_node = clusters
5799 .get(from)
5800 .unwrap_or_else(|| {
5801 panic!("A cluster used in the graph was not instantiated: {}", from)
5802 })
5803 .clone();
5804 let to_node = clusters
5805 .get(to)
5806 .unwrap_or_else(|| {
5807 panic!("A cluster used in the graph was not instantiated: {}", to)
5808 })
5809 .clone();
5810
5811 let sink_port = from_node.next_port();
5812 let source_port = to_node.next_port();
5813
5814 (
5815 D::m2m_sink_source(
5816 env,
5817 &from_node,
5818 &sink_port,
5819 &to_node,
5820 &source_port,
5821 name,
5822 networking_info,
5823 ),
5824 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5825 )
5826 }
5827 (LocationId::Tick(_, _), _) => panic!(),
5828 (_, LocationId::Tick(_, _)) => panic!(),
5829 (LocationId::Atomic(_), _) => panic!(),
5830 (_, LocationId::Atomic(_)) => panic!(),
5831 };
5832 (sink, source, connect_fn)
5833}
5834
5835#[cfg(test)]
5836mod serde_test;
5837
5838#[cfg(test)]
5839mod test {
5840 use std::mem::size_of;
5841
5842 use stageleft::{QuotedWithContext, q};
5843
5844 use super::*;
5845
5846 #[test]
5847 #[cfg_attr(
5848 not(feature = "build"),
5849 ignore = "expects inclusion of feature-gated fields"
5850 )]
5851 fn hydro_node_size() {
5852 assert_eq!(size_of::<HydroNode>(), 264);
5853 }
5854
5855 #[test]
5856 #[cfg_attr(
5857 not(feature = "build"),
5858 ignore = "expects inclusion of feature-gated fields"
5859 )]
5860 fn hydro_root_size() {
5861 assert_eq!(size_of::<HydroRoot>(), 136);
5862 }
5863
5864 #[test]
5865 fn test_simplify_q_macro_basic() {
5866 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5868 let result = simplify_q_macro(simple_expr.clone());
5869 assert_eq!(result, simple_expr);
5870 }
5871
5872 #[test]
5873 fn test_simplify_q_macro_actual_stageleft_call() {
5874 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5876 let result = simplify_q_macro(stageleft_call);
5877 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5880 }
5881
5882 #[test]
5883 fn test_closure_no_pipe_at_start() {
5884 let stageleft_call = q!({
5886 let foo = 123;
5887 move |b: usize| b + foo
5888 })
5889 .splice_fn1_ctx(&());
5890 let result = simplify_q_macro(stageleft_call);
5891 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5892 }
5893}