pub async fn rewrite_incremental_aggregate_with_sink_merge(
delta_plan: &LogicalPlan,
analysis: &IncrementalAggregateAnalysis,
sink_table: TableRef,
sink_table_name: &[String; 3],
sink_dirty_filter: Option<Expr>,
) -> Result<LogicalPlan, Error>Expand description
Rewrites one incremental aggregate delta plan by left-joining it with the existing sink-table state and projecting merged aggregate outputs.
For a grouped aggregate such as:
SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY tsthe rewrite is roughly:
delta = SELECT ts, number FROM <delta_plan> AS __flow_delta
sink_scan = SELECT * FROM <sink_table> [WHERE <sink_dirty_filter>]
sink = SELECT ts, number FROM sink_scan AS __flow_sink
SELECT
CASE
WHEN __flow_sink.number IS NULL THEN __flow_delta.number
WHEN __flow_delta.number >= __flow_sink.number THEN __flow_delta.number
ELSE __flow_sink.number
END AS number,
__flow_delta.ts AS ts
FROM delta
LEFT JOIN sink
ON __flow_delta.ts IS NOT DISTINCT FROM __flow_sink.tsIf sink_dirty_filter is provided, it is applied to the sink table scan
before projection, aliasing, and the left join. The predicate must reference
raw sink table columns structurally (unqualified), before the __flow_sink
alias exists.