flow/adapter/
refill.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains the refill flow task, which is used to refill flow with given table id and a time range.
16
17use std::collections::BTreeSet;
18use std::sync::Arc;
19
20use catalog::CatalogManagerRef;
21use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
22use common_error::ext::BoxedError;
23use common_meta::key::flow::FlowMetadataManagerRef;
24use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream};
25use common_runtime::JoinHandle;
26use common_telemetry::error;
27use datatypes::value::Value;
28use futures::StreamExt;
29use query::parser::QueryLanguageParser;
30use session::context::QueryContextBuilder;
31use snafu::{OptionExt, ResultExt, ensure};
32use table::metadata::TableId;
33
34use crate::adapter::table_source::ManagedTableSource;
35use crate::adapter::{FlowId, FlowStreamingEngineRef, StreamingEngine};
36use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
37use crate::expr::error::ExternalSnafu;
38use crate::expr::utils::find_plan_time_window_expr_lower_bound;
39use crate::repr::RelationDesc;
40use crate::server::get_all_flow_ids;
41use crate::{Error, FrontendInvoker};
42
43impl StreamingEngine {
44    /// Create and start refill flow tasks in background
45    pub async fn create_and_start_refill_flow_tasks(
46        self: &FlowStreamingEngineRef,
47        flow_metadata_manager: &FlowMetadataManagerRef,
48        catalog_manager: &CatalogManagerRef,
49    ) -> Result<(), Error> {
50        let tasks = self
51            .create_refill_flow_tasks(flow_metadata_manager, catalog_manager)
52            .await?;
53        self.starting_refill_flows(tasks).await?;
54        Ok(())
55    }
56
57    /// Create a series of tasks to refill flow
58    pub async fn create_refill_flow_tasks(
59        &self,
60        flow_metadata_manager: &FlowMetadataManagerRef,
61        catalog_manager: &CatalogManagerRef,
62    ) -> Result<Vec<RefillTask>, Error> {
63        let nodeid = self.node_id.map(|c| c as u64);
64
65        let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?;
66        let mut refill_tasks = Vec::new();
67        'flow_id_loop: for flow_id in flow_ids {
68            let info = flow_metadata_manager
69                .flow_info_manager()
70                .get(flow_id)
71                .await
72                .map_err(BoxedError::new)
73                .context(ExternalSnafu)?
74                .context(FlowNotFoundSnafu { id: flow_id })?;
75
76            // TODO(discord9): also check flow is already running
77            for src_table in info.source_table_ids() {
78                // check if source table still exists
79                if !self.table_info_source.check_table_exist(src_table).await? {
80                    error!(
81                        "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary",
82                        src_table, flow_id
83                    );
84                    continue 'flow_id_loop;
85                }
86            }
87
88            let expire_after = info.expire_after();
89            // TODO(discord9): better way to get last point
90            let now = self.tick_manager.tick();
91            let plan = self
92                .node_context
93                .read()
94                .await
95                .get_flow_plan(&FlowId::from(flow_id))
96                .context(FlowNotFoundSnafu { id: flow_id })?;
97            let time_range = if let Some(expire_after) = expire_after {
98                let low_bound = common_time::Timestamp::new_millisecond(now - expire_after);
99                let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?;
100                real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now)))
101            } else {
102                None
103            };
104
105            common_telemetry::debug!(
106                "Time range for refill flow_id={} is {:?}",
107                flow_id,
108                time_range
109            );
110
111            for src_table in info.source_table_ids() {
112                let time_index_col = self
113                    .table_info_source
114                    .get_time_index_column_from_table_id(*src_table)
115                    .await?
116                    .1;
117                let time_index_name = time_index_col.name;
118                let task = RefillTask::create(
119                    flow_id as u64,
120                    *src_table,
121                    time_range,
122                    &time_index_name,
123                    &self.table_info_source,
124                )
125                .await?;
126                refill_tasks.push(task);
127            }
128        }
129        Ok(refill_tasks)
130    }
131
132    /// Starting to refill flows, if any error occurs, will rebuild the flow and retry
133    pub(crate) async fn starting_refill_flows(
134        self: &FlowStreamingEngineRef,
135        tasks: Vec<RefillTask>,
136    ) -> Result<(), Error> {
137        // TODO(discord9): add a back pressure mechanism
138        let frontend_invoker =
139            self.frontend_invoker
140                .read()
141                .await
142                .clone()
143                .context(UnexpectedSnafu {
144                    reason: "frontend invoker is not set",
145                })?;
146
147        for mut task in tasks {
148            task.start_running(self.clone(), &frontend_invoker).await?;
149            // TODO(discord9): save refill tasks to a map and check if it's finished when necessary
150            // i.e. when system table need query it's state
151            self.refill_tasks
152                .write()
153                .await
154                .insert(task.data.flow_id, task);
155        }
156        Ok(())
157    }
158}
159
160/// Task to refill flow with given table id and a time range
161pub struct RefillTask {
162    data: TaskData,
163    state: TaskState<()>,
164}
165
166#[derive(Clone)]
167struct TaskData {
168    flow_id: FlowId,
169    table_id: TableId,
170    table_schema: RelationDesc,
171}
172
173impl TaskData {
174    /// validate that incoming batch's schema is the same as table schema(by comparing types&names)
175    fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> {
176        let rb_schema = &rb.schema;
177        ensure!(
178            rb_schema.column_schemas().len() == table_schema.len()?,
179            UnexpectedSnafu {
180                reason: format!(
181                    "RecordBatch schema length does not match table schema length, {}!={}",
182                    rb_schema.column_schemas().len(),
183                    table_schema.len()?
184                )
185            }
186        );
187        for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() {
188            let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type);
189            let (table_name, table_ty) = (
190                table_schema.names[i].as_ref(),
191                &table_schema.typ().column_types[i].scalar_type,
192            );
193            ensure!(
194                Some(rb_name) == table_name.map(|c| c.as_str()),
195                UnexpectedSnafu {
196                    reason: format!(
197                        "Mismatch in column names: expected {:?}, found {}",
198                        table_name, rb_name
199                    )
200                }
201            );
202
203            ensure!(
204                rb_ty == table_ty,
205                UnexpectedSnafu {
206                    reason: format!(
207                        "Mismatch in column types for {}: expected {:?}, found {:?}",
208                        rb_name, table_ty, rb_ty
209                    )
210                }
211            );
212        }
213        Ok(())
214    }
215}
216
217/// Refill task state
218enum TaskState<T> {
219    /// Task is not started
220    Prepared { sql: String },
221    /// Task is running
222    Running {
223        handle: JoinHandle<Result<T, Error>>,
224    },
225    /// Task is finished
226    Finished { res: Result<T, Error> },
227}
228
229impl<T> TaskState<T> {
230    fn new(sql: String) -> Self {
231        Self::Prepared { sql }
232    }
233}
234
235mod test_send {
236    use std::collections::BTreeMap;
237
238    use tokio::sync::RwLock;
239
240    use super::*;
241    fn is_send<T: Send + Sync>() {}
242    fn foo() {
243        is_send::<TaskState<()>>();
244        is_send::<RefillTask>();
245        is_send::<BTreeMap<FlowId, RefillTask>>();
246        is_send::<RwLock<BTreeMap<FlowId, RefillTask>>>();
247    }
248}
249
250impl TaskState<()> {
251    /// check if task is finished
252    async fn is_finished(&mut self) -> Result<bool, Error> {
253        match self {
254            Self::Finished { .. } => Ok(true),
255            Self::Running { handle } => Ok(if handle.is_finished() {
256                *self = Self::Finished {
257                    res: handle.await.context(JoinTaskSnafu)?,
258                };
259                true
260            } else {
261                false
262            }),
263            _ => Ok(false),
264        }
265    }
266
267    fn start_running(
268        &mut self,
269        task_data: &TaskData,
270        manager: FlowStreamingEngineRef,
271        mut output_stream: SendableRecordBatchStream,
272    ) -> Result<(), Error> {
273        let data = (*task_data).clone();
274        let handle: JoinHandle<Result<(), Error>> = common_runtime::spawn_global(async move {
275            while let Some(rb) = output_stream.next().await {
276                let rb = match rb {
277                    Ok(rb) => rb,
278                    Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?,
279                };
280                TaskData::validate_schema(&data.table_schema, &rb)?;
281
282                // send rb into flow node
283                manager
284                    .node_context
285                    .read()
286                    .await
287                    .send_rb(data.table_id, rb)
288                    .await?;
289            }
290            common_telemetry::info!(
291                "Refill successful for source table_id={}, flow_id={}",
292                data.table_id,
293                data.flow_id
294            );
295            Ok(())
296        });
297        *self = Self::Running { handle };
298
299        Ok(())
300    }
301}
302
303/// Query stream of RefillTask, simply wrap RecordBatches and RecordBatchStream and check output is not `AffectedRows`
304enum QueryStream {
305    Batches { batches: RecordBatches },
306    Stream { stream: SendableRecordBatchStream },
307}
308
309impl TryFrom<common_query::Output> for QueryStream {
310    type Error = Error;
311    fn try_from(value: common_query::Output) -> Result<Self, Self::Error> {
312        match value.data {
313            common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }),
314            common_query::OutputData::RecordBatches(batches) => {
315                Ok(QueryStream::Batches { batches })
316            }
317            _ => UnexpectedSnafu {
318                reason: format!("Unexpected output data type: {:?}", value.data),
319            }
320            .fail(),
321        }
322    }
323}
324
325impl QueryStream {
326    fn try_into_stream(self) -> Result<SendableRecordBatchStream, Error> {
327        match self {
328            Self::Batches { batches } => Ok(batches.as_stream()),
329            Self::Stream { stream } => Ok(stream),
330        }
331    }
332}
333
334impl RefillTask {
335    /// Query with "select * from table WHERE time >= range_start and time < range_end"
336    pub async fn create(
337        flow_id: FlowId,
338        table_id: TableId,
339        time_range: Option<(common_time::Timestamp, common_time::Timestamp)>,
340        time_col_name: &str,
341        table_src: &ManagedTableSource,
342    ) -> Result<RefillTask, Error> {
343        let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?;
344        let all_col_names: BTreeSet<_> = table_schema
345            .relation_desc
346            .iter_names()
347            .flatten()
348            .map(|s| s.as_str())
349            .collect();
350
351        if !all_col_names.contains(time_col_name) {
352            UnexpectedSnafu {
353                reason: format!(
354                    "Can't find column {} in table {} while refill flow",
355                    time_col_name,
356                    table_name.join(".")
357                ),
358            }
359            .fail()?;
360        }
361
362        let sql = if let Some(time_range) = time_range {
363            format!(
364                "select * from {0} where {1} >= {2} and {1} < {3}",
365                table_name.join("."),
366                time_col_name,
367                Value::from(time_range.0),
368                Value::from(time_range.1),
369            )
370        } else {
371            format!("select * from {0}", table_name.join("."))
372        };
373
374        Ok(RefillTask {
375            data: TaskData {
376                flow_id,
377                table_id,
378                table_schema: table_schema.relation_desc,
379            },
380            state: TaskState::new(sql),
381        })
382    }
383
384    /// Start running the task in background, non-blocking
385    pub async fn start_running(
386        &mut self,
387        manager: FlowStreamingEngineRef,
388        invoker: &FrontendInvoker,
389    ) -> Result<(), Error> {
390        let TaskState::Prepared { sql } = &mut self.state else {
391            UnexpectedSnafu {
392                reason: "task is not prepared",
393            }
394            .fail()?
395        };
396
397        // we don't need information from query context in this query so a default query context is enough
398        let query_ctx = Arc::new(
399            QueryContextBuilder::default()
400                .current_catalog(DEFAULT_CATALOG_NAME.to_string())
401                .current_schema(DEFAULT_SCHEMA_NAME.to_string())
402                .build(),
403        );
404
405        let stmt_exec = invoker.statement_executor();
406
407        let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
408            .map_err(BoxedError::new)
409            .context(ExternalSnafu)?;
410        let plan = stmt_exec
411            .plan(&stmt, query_ctx.clone())
412            .await
413            .map_err(BoxedError::new)
414            .context(ExternalSnafu)?;
415
416        let output_data = stmt_exec
417            .exec_plan(plan, query_ctx)
418            .await
419            .map_err(BoxedError::new)
420            .context(ExternalSnafu)?;
421
422        let output_stream = QueryStream::try_from(output_data)?;
423        let output_stream = output_stream.try_into_stream()?;
424
425        self.state
426            .start_running(&self.data, manager, output_stream)?;
427        Ok(())
428    }
429
430    pub async fn is_finished(&mut self) -> Result<bool, Error> {
431        self.state.is_finished().await
432    }
433}