1use std::collections::BTreeSet;
18use std::sync::Arc;
19
20use catalog::CatalogManagerRef;
21use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
22use common_error::ext::BoxedError;
23use common_meta::key::flow::FlowMetadataManagerRef;
24use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream};
25use common_runtime::JoinHandle;
26use common_telemetry::error;
27use datatypes::value::Value;
28use futures::StreamExt;
29use query::parser::QueryLanguageParser;
30use session::context::QueryContextBuilder;
31use snafu::{OptionExt, ResultExt, ensure};
32use table::metadata::TableId;
33
34use crate::adapter::table_source::ManagedTableSource;
35use crate::adapter::{FlowId, FlowStreamingEngineRef, StreamingEngine};
36use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
37use crate::expr::error::ExternalSnafu;
38use crate::expr::utils::find_plan_time_window_expr_lower_bound;
39use crate::repr::RelationDesc;
40use crate::server::get_all_flow_ids;
41use crate::{Error, FrontendInvoker};
42
43impl StreamingEngine {
44 pub async fn create_and_start_refill_flow_tasks(
46 self: &FlowStreamingEngineRef,
47 flow_metadata_manager: &FlowMetadataManagerRef,
48 catalog_manager: &CatalogManagerRef,
49 ) -> Result<(), Error> {
50 let tasks = self
51 .create_refill_flow_tasks(flow_metadata_manager, catalog_manager)
52 .await?;
53 self.starting_refill_flows(tasks).await?;
54 Ok(())
55 }
56
57 pub async fn create_refill_flow_tasks(
59 &self,
60 flow_metadata_manager: &FlowMetadataManagerRef,
61 catalog_manager: &CatalogManagerRef,
62 ) -> Result<Vec<RefillTask>, Error> {
63 let nodeid = self.node_id.map(|c| c as u64);
64
65 let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?;
66 let mut refill_tasks = Vec::new();
67 'flow_id_loop: for flow_id in flow_ids {
68 let info = flow_metadata_manager
69 .flow_info_manager()
70 .get(flow_id)
71 .await
72 .map_err(BoxedError::new)
73 .context(ExternalSnafu)?
74 .context(FlowNotFoundSnafu { id: flow_id })?;
75
76 for src_table in info.source_table_ids() {
78 if !self.table_info_source.check_table_exist(src_table).await? {
80 error!(
81 "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary",
82 src_table, flow_id
83 );
84 continue 'flow_id_loop;
85 }
86 }
87
88 let expire_after = info.expire_after();
89 let now = self.tick_manager.tick();
91 let plan = self
92 .node_context
93 .read()
94 .await
95 .get_flow_plan(&FlowId::from(flow_id))
96 .context(FlowNotFoundSnafu { id: flow_id })?;
97 let time_range = if let Some(expire_after) = expire_after {
98 let low_bound = common_time::Timestamp::new_millisecond(now - expire_after);
99 let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?;
100 real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now)))
101 } else {
102 None
103 };
104
105 common_telemetry::debug!(
106 "Time range for refill flow_id={} is {:?}",
107 flow_id,
108 time_range
109 );
110
111 for src_table in info.source_table_ids() {
112 let time_index_col = self
113 .table_info_source
114 .get_time_index_column_from_table_id(*src_table)
115 .await?
116 .1;
117 let time_index_name = time_index_col.name;
118 let task = RefillTask::create(
119 flow_id as u64,
120 *src_table,
121 time_range,
122 &time_index_name,
123 &self.table_info_source,
124 )
125 .await?;
126 refill_tasks.push(task);
127 }
128 }
129 Ok(refill_tasks)
130 }
131
132 pub(crate) async fn starting_refill_flows(
134 self: &FlowStreamingEngineRef,
135 tasks: Vec<RefillTask>,
136 ) -> Result<(), Error> {
137 let frontend_invoker =
139 self.frontend_invoker
140 .read()
141 .await
142 .clone()
143 .context(UnexpectedSnafu {
144 reason: "frontend invoker is not set",
145 })?;
146
147 for mut task in tasks {
148 task.start_running(self.clone(), &frontend_invoker).await?;
149 self.refill_tasks
152 .write()
153 .await
154 .insert(task.data.flow_id, task);
155 }
156 Ok(())
157 }
158}
159
160pub struct RefillTask {
162 data: TaskData,
163 state: TaskState<()>,
164}
165
166#[derive(Clone)]
167struct TaskData {
168 flow_id: FlowId,
169 table_id: TableId,
170 table_schema: RelationDesc,
171}
172
173impl TaskData {
174 fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> {
176 let rb_schema = &rb.schema;
177 ensure!(
178 rb_schema.column_schemas().len() == table_schema.len()?,
179 UnexpectedSnafu {
180 reason: format!(
181 "RecordBatch schema length does not match table schema length, {}!={}",
182 rb_schema.column_schemas().len(),
183 table_schema.len()?
184 )
185 }
186 );
187 for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() {
188 let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type);
189 let (table_name, table_ty) = (
190 table_schema.names[i].as_ref(),
191 &table_schema.typ().column_types[i].scalar_type,
192 );
193 ensure!(
194 Some(rb_name) == table_name.map(|c| c.as_str()),
195 UnexpectedSnafu {
196 reason: format!(
197 "Mismatch in column names: expected {:?}, found {}",
198 table_name, rb_name
199 )
200 }
201 );
202
203 ensure!(
204 rb_ty == table_ty,
205 UnexpectedSnafu {
206 reason: format!(
207 "Mismatch in column types for {}: expected {:?}, found {:?}",
208 rb_name, table_ty, rb_ty
209 )
210 }
211 );
212 }
213 Ok(())
214 }
215}
216
217enum TaskState<T> {
219 Prepared { sql: String },
221 Running {
223 handle: JoinHandle<Result<T, Error>>,
224 },
225 Finished { res: Result<T, Error> },
227}
228
229impl<T> TaskState<T> {
230 fn new(sql: String) -> Self {
231 Self::Prepared { sql }
232 }
233}
234
235mod test_send {
236 use std::collections::BTreeMap;
237
238 use tokio::sync::RwLock;
239
240 use super::*;
241 fn is_send<T: Send + Sync>() {}
242 fn foo() {
243 is_send::<TaskState<()>>();
244 is_send::<RefillTask>();
245 is_send::<BTreeMap<FlowId, RefillTask>>();
246 is_send::<RwLock<BTreeMap<FlowId, RefillTask>>>();
247 }
248}
249
250impl TaskState<()> {
251 async fn is_finished(&mut self) -> Result<bool, Error> {
253 match self {
254 Self::Finished { .. } => Ok(true),
255 Self::Running { handle } => Ok(if handle.is_finished() {
256 *self = Self::Finished {
257 res: handle.await.context(JoinTaskSnafu)?,
258 };
259 true
260 } else {
261 false
262 }),
263 _ => Ok(false),
264 }
265 }
266
267 fn start_running(
268 &mut self,
269 task_data: &TaskData,
270 manager: FlowStreamingEngineRef,
271 mut output_stream: SendableRecordBatchStream,
272 ) -> Result<(), Error> {
273 let data = (*task_data).clone();
274 let handle: JoinHandle<Result<(), Error>> = common_runtime::spawn_global(async move {
275 while let Some(rb) = output_stream.next().await {
276 let rb = match rb {
277 Ok(rb) => rb,
278 Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?,
279 };
280 TaskData::validate_schema(&data.table_schema, &rb)?;
281
282 manager
284 .node_context
285 .read()
286 .await
287 .send_rb(data.table_id, rb)
288 .await?;
289 }
290 common_telemetry::info!(
291 "Refill successful for source table_id={}, flow_id={}",
292 data.table_id,
293 data.flow_id
294 );
295 Ok(())
296 });
297 *self = Self::Running { handle };
298
299 Ok(())
300 }
301}
302
303enum QueryStream {
305 Batches { batches: RecordBatches },
306 Stream { stream: SendableRecordBatchStream },
307}
308
309impl TryFrom<common_query::Output> for QueryStream {
310 type Error = Error;
311 fn try_from(value: common_query::Output) -> Result<Self, Self::Error> {
312 match value.data {
313 common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }),
314 common_query::OutputData::RecordBatches(batches) => {
315 Ok(QueryStream::Batches { batches })
316 }
317 _ => UnexpectedSnafu {
318 reason: format!("Unexpected output data type: {:?}", value.data),
319 }
320 .fail(),
321 }
322 }
323}
324
325impl QueryStream {
326 fn try_into_stream(self) -> Result<SendableRecordBatchStream, Error> {
327 match self {
328 Self::Batches { batches } => Ok(batches.as_stream()),
329 Self::Stream { stream } => Ok(stream),
330 }
331 }
332}
333
334impl RefillTask {
335 pub async fn create(
337 flow_id: FlowId,
338 table_id: TableId,
339 time_range: Option<(common_time::Timestamp, common_time::Timestamp)>,
340 time_col_name: &str,
341 table_src: &ManagedTableSource,
342 ) -> Result<RefillTask, Error> {
343 let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?;
344 let all_col_names: BTreeSet<_> = table_schema
345 .relation_desc
346 .iter_names()
347 .flatten()
348 .map(|s| s.as_str())
349 .collect();
350
351 if !all_col_names.contains(time_col_name) {
352 UnexpectedSnafu {
353 reason: format!(
354 "Can't find column {} in table {} while refill flow",
355 time_col_name,
356 table_name.join(".")
357 ),
358 }
359 .fail()?;
360 }
361
362 let sql = if let Some(time_range) = time_range {
363 format!(
364 "select * from {0} where {1} >= {2} and {1} < {3}",
365 table_name.join("."),
366 time_col_name,
367 Value::from(time_range.0),
368 Value::from(time_range.1),
369 )
370 } else {
371 format!("select * from {0}", table_name.join("."))
372 };
373
374 Ok(RefillTask {
375 data: TaskData {
376 flow_id,
377 table_id,
378 table_schema: table_schema.relation_desc,
379 },
380 state: TaskState::new(sql),
381 })
382 }
383
384 pub async fn start_running(
386 &mut self,
387 manager: FlowStreamingEngineRef,
388 invoker: &FrontendInvoker,
389 ) -> Result<(), Error> {
390 let TaskState::Prepared { sql } = &mut self.state else {
391 UnexpectedSnafu {
392 reason: "task is not prepared",
393 }
394 .fail()?
395 };
396
397 let query_ctx = Arc::new(
399 QueryContextBuilder::default()
400 .current_catalog(DEFAULT_CATALOG_NAME.to_string())
401 .current_schema(DEFAULT_SCHEMA_NAME.to_string())
402 .build(),
403 );
404
405 let stmt_exec = invoker.statement_executor();
406
407 let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
408 .map_err(BoxedError::new)
409 .context(ExternalSnafu)?;
410 let plan = stmt_exec
411 .plan(&stmt, query_ctx.clone())
412 .await
413 .map_err(BoxedError::new)
414 .context(ExternalSnafu)?;
415
416 let output_data = stmt_exec
417 .exec_plan(plan, query_ctx)
418 .await
419 .map_err(BoxedError::new)
420 .context(ExternalSnafu)?;
421
422 let output_stream = QueryStream::try_from(output_data)?;
423 let output_stream = output_stream.try_into_stream()?;
424
425 self.state
426 .start_running(&self.data, manager, output_stream)?;
427 Ok(())
428 }
429
430 pub async fn is_finished(&mut self) -> Result<bool, Error> {
431 self.state.is_finished().await
432 }
433}