Skip to main content

rewrite_incremental_aggregate_with_sink_merge

Function rewrite_incremental_aggregate_with_sink_merge 

Source
pub async fn rewrite_incremental_aggregate_with_sink_merge(
    delta_plan: &LogicalPlan,
    analysis: &IncrementalAggregateAnalysis,
    sink_table: TableRef,
    sink_table_name: &[String; 3],
    sink_dirty_filter: Option<Expr>,
) -> Result<LogicalPlan, Error>
Expand description

Rewrites one incremental aggregate delta plan by left-joining it with the existing sink-table state and projecting merged aggregate outputs.

For a grouped aggregate such as:

SELECT max(number) AS number, ts FROM numbers_with_ts GROUP BY ts

the rewrite is roughly:

delta = SELECT ts, number FROM <delta_plan> AS __flow_delta
sink_scan = SELECT * FROM <sink_table> [WHERE <sink_dirty_filter>]
sink  = SELECT ts, number FROM sink_scan AS __flow_sink
SELECT
  CASE
    WHEN __flow_sink.number IS NULL THEN __flow_delta.number
    WHEN __flow_delta.number >= __flow_sink.number THEN __flow_delta.number
    ELSE __flow_sink.number
  END AS number,
  __flow_delta.ts AS ts
FROM delta
LEFT JOIN sink
  ON __flow_delta.ts IS NOT DISTINCT FROM __flow_sink.ts

If sink_dirty_filter is provided, it is applied to the sink table scan before projection, aliasing, and the left join. The predicate must reference raw sink table columns structurally (unqualified), before the __flow_sink alias exists.