use std::collections::BTreeMap;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow, RelationType};
mod map;
mod reduce;
mod src_sink;
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
pub local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
pub input_collection_batch: BTreeMap<GlobalId, CollectionBundle<Batch>>,
pub local_scope_batch: Vec<BTreeMap<LocalId, CollectionBundle<Batch>>>,
pub err_collector: ErrCollector,
}
impl Drop for Context<'_, '_> {
fn drop(&mut self) {
for bundle in std::mem::take(&mut self.input_collection)
.into_values()
.chain(
std::mem::take(&mut self.local_scope)
.into_iter()
.flat_map(|v| v.into_iter())
.map(|(_k, v)| v),
)
{
bundle.collection.into_inner().drop(self.df);
drop(bundle.arranged);
}
for bundle in std::mem::take(&mut self.input_collection_batch)
.into_values()
.chain(
std::mem::take(&mut self.local_scope_batch)
.into_iter()
.flat_map(|v| v.into_iter())
.map(|(_k, v)| v),
)
{
bundle.collection.into_inner().drop(self.df);
drop(bundle.arranged);
}
}
}
impl Context<'_, '_> {
pub fn insert_global(&mut self, id: GlobalId, collection: CollectionBundle) {
self.input_collection.insert(id, collection);
}
pub fn insert_local(&mut self, id: LocalId, collection: CollectionBundle) {
if let Some(last) = self.local_scope.last_mut() {
last.insert(id, collection);
} else {
let first = BTreeMap::from([(id, collection)]);
self.local_scope.push(first);
}
}
pub fn insert_global_batch(&mut self, id: GlobalId, collection: CollectionBundle<Batch>) {
self.input_collection_batch.insert(id, collection);
}
pub fn insert_local_batch(&mut self, id: LocalId, collection: CollectionBundle<Batch>) {
if let Some(last) = self.local_scope_batch.last_mut() {
last.insert(id, collection);
} else {
let first = BTreeMap::from([(id, collection)]);
self.local_scope_batch.push(first);
}
}
}
impl Context<'_, '_> {
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows, &plan.schema.typ)),
Plan::Get { id } => self.get_batch_by_id(id),
Plan::Let { id, value, body } => self.eval_batch_let(id, value, body),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp, &plan.schema.typ),
Plan::Reduce {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP",
}
.fail(),
}
}
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Plan::Get { id } => self.get_by_id(id),
Plan::Let { id, value, body } => self.eval_let(id, value, body),
Plan::Mfp { input, mfp } => self.render_mfp(input, mfp),
Plan::Reduce {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP",
}
.fail(),
}
}
pub fn render_constant_batch(
&mut self,
rows: Vec<DiffRow>,
output_type: &RelationType,
) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}
let now = self.compute_state.current_time_ref();
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();
let output_type = output_type.clone();
let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
let mut after = per_time.split_off(&(*now.borrow() + 1));
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows_with_types(
rows,
&output_type
.column_types
.iter()
.map(|ty| ty.scalar_type().clone())
.collect_vec(),
)?;
send_port.give(vec![batch]);
Ok(())
});
});
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);
CollectionBundle::from_collection(Collection::from_port(recv_port))
}
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}
let now = self.compute_state.current_time_ref();
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let subgraph_id =
self.df
.add_subgraph_source("Constant", send_port, move |_ctx, send_port| {
let mut after = per_time.split_off(&(*now.borrow() + 1));
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
send_port.give(rows);
});
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);
CollectionBundle::from_collection(Collection::from_port(recv_port))
}
pub fn get_batch_by_id(&mut self, id: expr::Id) -> Result<CollectionBundle<Batch>, Error> {
let ret = match id {
expr::Id::Local(local) => {
let bundle = self
.local_scope_batch
.iter()
.rev()
.find_map(|scope| scope.get(&local))
.with_context(|| InvalidQuerySnafu {
reason: format!("Local variable {:?} not found", local),
})?;
bundle.clone(self.df)
}
expr::Id::Global(id) => {
let bundle =
self.input_collection_batch
.get(&id)
.with_context(|| InvalidQuerySnafu {
reason: format!("Collection {:?} not found", id),
})?;
bundle.clone(self.df)
}
};
Ok(ret)
}
pub fn get_by_id(&mut self, id: expr::Id) -> Result<CollectionBundle, Error> {
let ret = match id {
expr::Id::Local(local) => {
let bundle = self
.local_scope
.iter()
.rev()
.find_map(|scope| scope.get(&local))
.with_context(|| InvalidQuerySnafu {
reason: format!("Local variable {:?} not found", local),
})?;
bundle.clone(self.df)
}
expr::Id::Global(id) => {
let bundle = self
.input_collection
.get(&id)
.with_context(|| InvalidQuerySnafu {
reason: format!("Collection {:?} not found", id),
})?;
bundle.clone(self.df)
}
};
Ok(ret)
}
pub fn eval_batch_let(
&mut self,
id: LocalId,
value: Box<TypedPlan>,
body: Box<TypedPlan>,
) -> Result<CollectionBundle<Batch>, Error> {
let value = self.render_plan_batch(*value)?;
self.local_scope_batch.push(Default::default());
self.insert_local_batch(id, value);
let ret = self.render_plan_batch(*body)?;
Ok(ret)
}
pub fn eval_let(
&mut self,
id: LocalId,
value: Box<TypedPlan>,
body: Box<TypedPlan>,
) -> Result<CollectionBundle, Error> {
let value = self.render_plan(*value)?;
self.local_scope.push(Default::default());
self.insert_local(id, value);
let ret = self.render_plan(*body)?;
Ok(ret)
}
}
struct SubgraphArg<'a, T = Toff> {
now: repr::Timestamp,
err_collector: &'a ErrCollector,
scheduler: &'a Scheduler,
send: &'a PortCtx<SEND, T>,
}
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::assert_eq;
use super::*;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
) {
for now in time_range {
state.set_current_ts(now);
state.run_available_with_schedule(df);
if !state.get_err_collector().is_empty() {
panic!(
"Errors occur: {:?}",
state.get_err_collector().get_all_blocking()
)
}
assert!(state.get_err_collector().is_empty());
if let Some(expected) = expected.get(&now) {
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
} else {
assert_eq!(*output.borrow(), vec![], "at ts={}", now);
};
output.borrow_mut().clear();
}
}
pub fn get_output_handle(
ctx: &mut Context,
mut bundle: CollectionBundle,
) -> Rc<RefCell<Vec<DiffRow>>> {
let collection = bundle.collection;
let _arranged = bundle.arranged.pop_first().unwrap().1;
let output = Rc::new(RefCell::new(vec![]));
let output_inner = output.clone();
let _subgraph = ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
output_inner.borrow_mut().clear();
output_inner.borrow_mut().extend(res);
},
);
output
}
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
Context {
id: GlobalId::User(0),
df,
compute_state: state,
input_collection: BTreeMap::new(),
local_scope: Default::default(),
input_collection_batch: BTreeMap::new(),
local_scope_batch: Default::default(),
err_collector,
}
}
#[test]
fn test_render_constant() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::empty(), 1, 1),
(Row::empty(), 2, 1),
(Row::empty(), 3, 1),
];
let collection = ctx.render_constant(rows);
let collection = collection.collection.clone(ctx.df);
let cnt = Rc::new(RefCell::new(0));
let cnt_inner = cnt.clone();
let res_subgraph_id = ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
*cnt_inner.borrow_mut() += data.iter().map(|v| v.len()).sum::<usize>();
},
);
ctx.compute_state.set_current_ts(2);
ctx.compute_state.run_available_with_schedule(ctx.df);
assert_eq!(*cnt.borrow(), 2);
ctx.compute_state.set_current_ts(3);
ctx.compute_state.run_available_with_schedule(ctx.df);
ctx.df.schedule_subgraph(res_subgraph_id);
ctx.df.run_available();
assert_eq!(*cnt.borrow(), 3);
}
#[test]
fn example_source_sink() {
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
send.give(vec![i]);
}
});
let sum = Rc::new(RefCell::new(0));
let sum_move = sum.clone();
let sink = df.add_subgraph_sink("test_handoff_sink", recv_port, move |_ctx, recv| {
let data = recv.take_inner();
*sum_move.borrow_mut() += data.iter().sum::<i32>();
});
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
df.schedule_subgraph(sink);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
}
#[test]
fn test_tee_auto_schedule() {
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
send.give(vec![i]);
}
});
let teed_recv_port = recv_port.tee(&mut df);
let sum = Rc::new(RefCell::new(0));
let sum_move = sum.clone();
let _sink = df.add_subgraph_sink("test_handoff_sink", teed_recv_port, move |_ctx, recv| {
let data = recv.take_inner();
*sum_move.borrow_mut() += data.iter().flat_map(|i| i.iter()).sum::<i32>();
});
drop(recv_port);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
df.schedule_subgraph(source);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 90);
}
}