1use std::collections::BTreeSet;
18use std::sync::Arc;
19
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_meta::key::flow::FlowMetadataManagerRef;
23use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream};
24use common_runtime::JoinHandle;
25use common_telemetry::error;
26use datatypes::value::Value;
27use futures::StreamExt;
28use query::parser::QueryLanguageParser;
29use session::context::QueryContextBuilder;
30use snafu::{ensure, OptionExt, ResultExt};
31use table::metadata::TableId;
32
33use crate::adapter::table_source::ManagedTableSource;
34use crate::adapter::{FlowId, FlowStreamingEngineRef, StreamingEngine};
35use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
36use crate::expr::error::ExternalSnafu;
37use crate::expr::utils::find_plan_time_window_expr_lower_bound;
38use crate::repr::RelationDesc;
39use crate::server::get_all_flow_ids;
40use crate::{Error, FrontendInvoker};
41
42impl StreamingEngine {
43 pub async fn create_and_start_refill_flow_tasks(
45 self: &FlowStreamingEngineRef,
46 flow_metadata_manager: &FlowMetadataManagerRef,
47 catalog_manager: &CatalogManagerRef,
48 ) -> Result<(), Error> {
49 let tasks = self
50 .create_refill_flow_tasks(flow_metadata_manager, catalog_manager)
51 .await?;
52 self.starting_refill_flows(tasks).await?;
53 Ok(())
54 }
55
56 pub async fn create_refill_flow_tasks(
58 &self,
59 flow_metadata_manager: &FlowMetadataManagerRef,
60 catalog_manager: &CatalogManagerRef,
61 ) -> Result<Vec<RefillTask>, Error> {
62 let nodeid = self.node_id.map(|c| c as u64);
63
64 let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?;
65 let mut refill_tasks = Vec::new();
66 'flow_id_loop: for flow_id in flow_ids {
67 let info = flow_metadata_manager
68 .flow_info_manager()
69 .get(flow_id)
70 .await
71 .map_err(BoxedError::new)
72 .context(ExternalSnafu)?
73 .context(FlowNotFoundSnafu { id: flow_id })?;
74
75 for src_table in info.source_table_ids() {
77 if !self.table_info_source.check_table_exist(src_table).await? {
79 error!(
80 "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary",
81 src_table, flow_id
82 );
83 continue 'flow_id_loop;
84 }
85 }
86
87 let expire_after = info.expire_after();
88 let now = self.tick_manager.tick();
90 let plan = self
91 .node_context
92 .read()
93 .await
94 .get_flow_plan(&FlowId::from(flow_id))
95 .context(FlowNotFoundSnafu { id: flow_id })?;
96 let time_range = if let Some(expire_after) = expire_after {
97 let low_bound = common_time::Timestamp::new_millisecond(now - expire_after);
98 let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?;
99 real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now)))
100 } else {
101 None
102 };
103
104 common_telemetry::debug!(
105 "Time range for refill flow_id={} is {:?}",
106 flow_id,
107 time_range
108 );
109
110 for src_table in info.source_table_ids() {
111 let time_index_col = self
112 .table_info_source
113 .get_time_index_column_from_table_id(*src_table)
114 .await?
115 .1;
116 let time_index_name = time_index_col.name;
117 let task = RefillTask::create(
118 flow_id as u64,
119 *src_table,
120 time_range,
121 &time_index_name,
122 &self.table_info_source,
123 )
124 .await?;
125 refill_tasks.push(task);
126 }
127 }
128 Ok(refill_tasks)
129 }
130
131 pub(crate) async fn starting_refill_flows(
133 self: &FlowStreamingEngineRef,
134 tasks: Vec<RefillTask>,
135 ) -> Result<(), Error> {
136 let frontend_invoker =
138 self.frontend_invoker
139 .read()
140 .await
141 .clone()
142 .context(UnexpectedSnafu {
143 reason: "frontend invoker is not set",
144 })?;
145
146 for mut task in tasks {
147 task.start_running(self.clone(), &frontend_invoker).await?;
148 self.refill_tasks
151 .write()
152 .await
153 .insert(task.data.flow_id, task);
154 }
155 Ok(())
156 }
157}
158
159pub struct RefillTask {
161 data: TaskData,
162 state: TaskState<()>,
163}
164
165#[derive(Clone)]
166struct TaskData {
167 flow_id: FlowId,
168 table_id: TableId,
169 table_schema: RelationDesc,
170}
171
172impl TaskData {
173 fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> {
175 let rb_schema = &rb.schema;
176 ensure!(
177 rb_schema.column_schemas().len() == table_schema.len()?,
178 UnexpectedSnafu {
179 reason: format!(
180 "RecordBatch schema length does not match table schema length, {}!={}",
181 rb_schema.column_schemas().len(),
182 table_schema.len()?
183 )
184 }
185 );
186 for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() {
187 let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type);
188 let (table_name, table_ty) = (
189 table_schema.names[i].as_ref(),
190 &table_schema.typ().column_types[i].scalar_type,
191 );
192 ensure!(
193 Some(rb_name) == table_name.map(|c| c.as_str()),
194 UnexpectedSnafu {
195 reason: format!(
196 "Mismatch in column names: expected {:?}, found {}",
197 table_name, rb_name
198 )
199 }
200 );
201
202 ensure!(
203 rb_ty == table_ty,
204 UnexpectedSnafu {
205 reason: format!(
206 "Mismatch in column types for {}: expected {:?}, found {:?}",
207 rb_name, table_ty, rb_ty
208 )
209 }
210 );
211 }
212 Ok(())
213 }
214}
215
216enum TaskState<T> {
218 Prepared { sql: String },
220 Running {
222 handle: JoinHandle<Result<T, Error>>,
223 },
224 Finished { res: Result<T, Error> },
226}
227
228impl<T> TaskState<T> {
229 fn new(sql: String) -> Self {
230 Self::Prepared { sql }
231 }
232}
233
234mod test_send {
235 use std::collections::BTreeMap;
236
237 use tokio::sync::RwLock;
238
239 use super::*;
240 fn is_send<T: Send + Sync>() {}
241 fn foo() {
242 is_send::<TaskState<()>>();
243 is_send::<RefillTask>();
244 is_send::<BTreeMap<FlowId, RefillTask>>();
245 is_send::<RwLock<BTreeMap<FlowId, RefillTask>>>();
246 }
247}
248
249impl TaskState<()> {
250 async fn is_finished(&mut self) -> Result<bool, Error> {
252 match self {
253 Self::Finished { .. } => Ok(true),
254 Self::Running { handle } => Ok(if handle.is_finished() {
255 *self = Self::Finished {
256 res: handle.await.context(JoinTaskSnafu)?,
257 };
258 true
259 } else {
260 false
261 }),
262 _ => Ok(false),
263 }
264 }
265
266 fn start_running(
267 &mut self,
268 task_data: &TaskData,
269 manager: FlowStreamingEngineRef,
270 mut output_stream: SendableRecordBatchStream,
271 ) -> Result<(), Error> {
272 let data = (*task_data).clone();
273 let handle: JoinHandle<Result<(), Error>> = common_runtime::spawn_global(async move {
274 while let Some(rb) = output_stream.next().await {
275 let rb = match rb {
276 Ok(rb) => rb,
277 Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?,
278 };
279 TaskData::validate_schema(&data.table_schema, &rb)?;
280
281 manager
283 .node_context
284 .read()
285 .await
286 .send_rb(data.table_id, rb)
287 .await?;
288 }
289 common_telemetry::info!(
290 "Refill successful for source table_id={}, flow_id={}",
291 data.table_id,
292 data.flow_id
293 );
294 Ok(())
295 });
296 *self = Self::Running { handle };
297
298 Ok(())
299 }
300}
301
302enum QueryStream {
304 Batches { batches: RecordBatches },
305 Stream { stream: SendableRecordBatchStream },
306}
307
308impl TryFrom<common_query::Output> for QueryStream {
309 type Error = Error;
310 fn try_from(value: common_query::Output) -> Result<Self, Self::Error> {
311 match value.data {
312 common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }),
313 common_query::OutputData::RecordBatches(batches) => {
314 Ok(QueryStream::Batches { batches })
315 }
316 _ => UnexpectedSnafu {
317 reason: format!("Unexpected output data type: {:?}", value.data),
318 }
319 .fail(),
320 }
321 }
322}
323
324impl QueryStream {
325 fn try_into_stream(self) -> Result<SendableRecordBatchStream, Error> {
326 match self {
327 Self::Batches { batches } => Ok(batches.as_stream()),
328 Self::Stream { stream } => Ok(stream),
329 }
330 }
331}
332
333impl RefillTask {
334 pub async fn create(
336 flow_id: FlowId,
337 table_id: TableId,
338 time_range: Option<(common_time::Timestamp, common_time::Timestamp)>,
339 time_col_name: &str,
340 table_src: &ManagedTableSource,
341 ) -> Result<RefillTask, Error> {
342 let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?;
343 let all_col_names: BTreeSet<_> = table_schema
344 .relation_desc
345 .iter_names()
346 .flatten()
347 .map(|s| s.as_str())
348 .collect();
349
350 if !all_col_names.contains(time_col_name) {
351 UnexpectedSnafu {
352 reason: format!(
353 "Can't find column {} in table {} while refill flow",
354 time_col_name,
355 table_name.join(".")
356 ),
357 }
358 .fail()?;
359 }
360
361 let sql = if let Some(time_range) = time_range {
362 format!(
363 "select * from {0} where {1} >= {2} and {1} < {3}",
364 table_name.join("."),
365 time_col_name,
366 Value::from(time_range.0),
367 Value::from(time_range.1),
368 )
369 } else {
370 format!("select * from {0}", table_name.join("."))
371 };
372
373 Ok(RefillTask {
374 data: TaskData {
375 flow_id,
376 table_id,
377 table_schema: table_schema.relation_desc,
378 },
379 state: TaskState::new(sql),
380 })
381 }
382
383 pub async fn start_running(
385 &mut self,
386 manager: FlowStreamingEngineRef,
387 invoker: &FrontendInvoker,
388 ) -> Result<(), Error> {
389 let TaskState::Prepared { sql } = &mut self.state else {
390 UnexpectedSnafu {
391 reason: "task is not prepared",
392 }
393 .fail()?
394 };
395
396 let query_ctx = Arc::new(
398 QueryContextBuilder::default()
399 .current_catalog("greptime".to_string())
400 .current_schema("public".to_string())
401 .build(),
402 );
403
404 let stmt_exec = invoker.statement_executor();
405
406 let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
407 .map_err(BoxedError::new)
408 .context(ExternalSnafu)?;
409 let plan = stmt_exec
410 .plan(&stmt, query_ctx.clone())
411 .await
412 .map_err(BoxedError::new)
413 .context(ExternalSnafu)?;
414
415 let output_data = stmt_exec
416 .exec_plan(plan, query_ctx)
417 .await
418 .map_err(BoxedError::new)
419 .context(ExternalSnafu)?;
420
421 let output_stream = QueryStream::try_from(output_data)?;
422 let output_stream = output_stream.try_into_stream()?;
423
424 self.state
425 .start_running(&self.data, manager, output_stream)?;
426 Ok(())
427 }
428
429 pub async fn is_finished(&mut self) -> Result<bool, Error> {
430 self.state.is_finished().await
431 }
432}