flow/adapter/
refill.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains the refill flow task, which is used to refill flow with given table id and a time range.
16
17use std::collections::BTreeSet;
18use std::sync::Arc;
19
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_meta::key::flow::FlowMetadataManagerRef;
23use common_recordbatch::{RecordBatch, RecordBatches, SendableRecordBatchStream};
24use common_runtime::JoinHandle;
25use common_telemetry::error;
26use datatypes::value::Value;
27use futures::StreamExt;
28use query::parser::QueryLanguageParser;
29use session::context::QueryContextBuilder;
30use snafu::{ensure, OptionExt, ResultExt};
31use table::metadata::TableId;
32
33use crate::adapter::table_source::ManagedTableSource;
34use crate::adapter::{FlowId, FlowStreamingEngineRef, StreamingEngine};
35use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu};
36use crate::expr::error::ExternalSnafu;
37use crate::expr::utils::find_plan_time_window_expr_lower_bound;
38use crate::repr::RelationDesc;
39use crate::server::get_all_flow_ids;
40use crate::{Error, FrontendInvoker};
41
42impl StreamingEngine {
43    /// Create and start refill flow tasks in background
44    pub async fn create_and_start_refill_flow_tasks(
45        self: &FlowStreamingEngineRef,
46        flow_metadata_manager: &FlowMetadataManagerRef,
47        catalog_manager: &CatalogManagerRef,
48    ) -> Result<(), Error> {
49        let tasks = self
50            .create_refill_flow_tasks(flow_metadata_manager, catalog_manager)
51            .await?;
52        self.starting_refill_flows(tasks).await?;
53        Ok(())
54    }
55
56    /// Create a series of tasks to refill flow
57    pub async fn create_refill_flow_tasks(
58        &self,
59        flow_metadata_manager: &FlowMetadataManagerRef,
60        catalog_manager: &CatalogManagerRef,
61    ) -> Result<Vec<RefillTask>, Error> {
62        let nodeid = self.node_id.map(|c| c as u64);
63
64        let flow_ids = get_all_flow_ids(flow_metadata_manager, catalog_manager, nodeid).await?;
65        let mut refill_tasks = Vec::new();
66        'flow_id_loop: for flow_id in flow_ids {
67            let info = flow_metadata_manager
68                .flow_info_manager()
69                .get(flow_id)
70                .await
71                .map_err(BoxedError::new)
72                .context(ExternalSnafu)?
73                .context(FlowNotFoundSnafu { id: flow_id })?;
74
75            // TODO(discord9): also check flow is already running
76            for src_table in info.source_table_ids() {
77                // check if source table still exists
78                if !self.table_info_source.check_table_exist(src_table).await? {
79                    error!(
80                        "Source table id = {:?} not found while refill flow_id={}, consider re-create the flow if necessary",
81                        src_table, flow_id
82                    );
83                    continue 'flow_id_loop;
84                }
85            }
86
87            let expire_after = info.expire_after();
88            // TODO(discord9): better way to get last point
89            let now = self.tick_manager.tick();
90            let plan = self
91                .node_context
92                .read()
93                .await
94                .get_flow_plan(&FlowId::from(flow_id))
95                .context(FlowNotFoundSnafu { id: flow_id })?;
96            let time_range = if let Some(expire_after) = expire_after {
97                let low_bound = common_time::Timestamp::new_millisecond(now - expire_after);
98                let real_low_bound = find_plan_time_window_expr_lower_bound(&plan, low_bound)?;
99                real_low_bound.map(|l| (l, common_time::Timestamp::new_millisecond(now)))
100            } else {
101                None
102            };
103
104            common_telemetry::debug!(
105                "Time range for refill flow_id={} is {:?}",
106                flow_id,
107                time_range
108            );
109
110            for src_table in info.source_table_ids() {
111                let time_index_col = self
112                    .table_info_source
113                    .get_time_index_column_from_table_id(*src_table)
114                    .await?
115                    .1;
116                let time_index_name = time_index_col.name;
117                let task = RefillTask::create(
118                    flow_id as u64,
119                    *src_table,
120                    time_range,
121                    &time_index_name,
122                    &self.table_info_source,
123                )
124                .await?;
125                refill_tasks.push(task);
126            }
127        }
128        Ok(refill_tasks)
129    }
130
131    /// Starting to refill flows, if any error occurs, will rebuild the flow and retry
132    pub(crate) async fn starting_refill_flows(
133        self: &FlowStreamingEngineRef,
134        tasks: Vec<RefillTask>,
135    ) -> Result<(), Error> {
136        // TODO(discord9): add a back pressure mechanism
137        let frontend_invoker =
138            self.frontend_invoker
139                .read()
140                .await
141                .clone()
142                .context(UnexpectedSnafu {
143                    reason: "frontend invoker is not set",
144                })?;
145
146        for mut task in tasks {
147            task.start_running(self.clone(), &frontend_invoker).await?;
148            // TODO(discord9): save refill tasks to a map and check if it's finished when necessary
149            // i.e. when system table need query it's state
150            self.refill_tasks
151                .write()
152                .await
153                .insert(task.data.flow_id, task);
154        }
155        Ok(())
156    }
157}
158
159/// Task to refill flow with given table id and a time range
160pub struct RefillTask {
161    data: TaskData,
162    state: TaskState<()>,
163}
164
165#[derive(Clone)]
166struct TaskData {
167    flow_id: FlowId,
168    table_id: TableId,
169    table_schema: RelationDesc,
170}
171
172impl TaskData {
173    /// validate that incoming batch's schema is the same as table schema(by comparing types&names)
174    fn validate_schema(table_schema: &RelationDesc, rb: &RecordBatch) -> Result<(), Error> {
175        let rb_schema = &rb.schema;
176        ensure!(
177            rb_schema.column_schemas().len() == table_schema.len()?,
178            UnexpectedSnafu {
179                reason: format!(
180                    "RecordBatch schema length does not match table schema length, {}!={}",
181                    rb_schema.column_schemas().len(),
182                    table_schema.len()?
183                )
184            }
185        );
186        for (i, rb_col) in rb_schema.column_schemas().iter().enumerate() {
187            let (rb_name, rb_ty) = (rb_col.name.as_str(), &rb_col.data_type);
188            let (table_name, table_ty) = (
189                table_schema.names[i].as_ref(),
190                &table_schema.typ().column_types[i].scalar_type,
191            );
192            ensure!(
193                Some(rb_name) == table_name.map(|c| c.as_str()),
194                UnexpectedSnafu {
195                    reason: format!(
196                        "Mismatch in column names: expected {:?}, found {}",
197                        table_name, rb_name
198                    )
199                }
200            );
201
202            ensure!(
203                rb_ty == table_ty,
204                UnexpectedSnafu {
205                    reason: format!(
206                        "Mismatch in column types for {}: expected {:?}, found {:?}",
207                        rb_name, table_ty, rb_ty
208                    )
209                }
210            );
211        }
212        Ok(())
213    }
214}
215
216/// Refill task state
217enum TaskState<T> {
218    /// Task is not started
219    Prepared { sql: String },
220    /// Task is running
221    Running {
222        handle: JoinHandle<Result<T, Error>>,
223    },
224    /// Task is finished
225    Finished { res: Result<T, Error> },
226}
227
228impl<T> TaskState<T> {
229    fn new(sql: String) -> Self {
230        Self::Prepared { sql }
231    }
232}
233
234mod test_send {
235    use std::collections::BTreeMap;
236
237    use tokio::sync::RwLock;
238
239    use super::*;
240    fn is_send<T: Send + Sync>() {}
241    fn foo() {
242        is_send::<TaskState<()>>();
243        is_send::<RefillTask>();
244        is_send::<BTreeMap<FlowId, RefillTask>>();
245        is_send::<RwLock<BTreeMap<FlowId, RefillTask>>>();
246    }
247}
248
249impl TaskState<()> {
250    /// check if task is finished
251    async fn is_finished(&mut self) -> Result<bool, Error> {
252        match self {
253            Self::Finished { .. } => Ok(true),
254            Self::Running { handle } => Ok(if handle.is_finished() {
255                *self = Self::Finished {
256                    res: handle.await.context(JoinTaskSnafu)?,
257                };
258                true
259            } else {
260                false
261            }),
262            _ => Ok(false),
263        }
264    }
265
266    fn start_running(
267        &mut self,
268        task_data: &TaskData,
269        manager: FlowStreamingEngineRef,
270        mut output_stream: SendableRecordBatchStream,
271    ) -> Result<(), Error> {
272        let data = (*task_data).clone();
273        let handle: JoinHandle<Result<(), Error>> = common_runtime::spawn_global(async move {
274            while let Some(rb) = output_stream.next().await {
275                let rb = match rb {
276                    Ok(rb) => rb,
277                    Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu)?,
278                };
279                TaskData::validate_schema(&data.table_schema, &rb)?;
280
281                // send rb into flow node
282                manager
283                    .node_context
284                    .read()
285                    .await
286                    .send_rb(data.table_id, rb)
287                    .await?;
288            }
289            common_telemetry::info!(
290                "Refill successful for source table_id={}, flow_id={}",
291                data.table_id,
292                data.flow_id
293            );
294            Ok(())
295        });
296        *self = Self::Running { handle };
297
298        Ok(())
299    }
300}
301
302/// Query stream of RefillTask, simply wrap RecordBatches and RecordBatchStream and check output is not `AffectedRows`
303enum QueryStream {
304    Batches { batches: RecordBatches },
305    Stream { stream: SendableRecordBatchStream },
306}
307
308impl TryFrom<common_query::Output> for QueryStream {
309    type Error = Error;
310    fn try_from(value: common_query::Output) -> Result<Self, Self::Error> {
311        match value.data {
312            common_query::OutputData::Stream(stream) => Ok(QueryStream::Stream { stream }),
313            common_query::OutputData::RecordBatches(batches) => {
314                Ok(QueryStream::Batches { batches })
315            }
316            _ => UnexpectedSnafu {
317                reason: format!("Unexpected output data type: {:?}", value.data),
318            }
319            .fail(),
320        }
321    }
322}
323
324impl QueryStream {
325    fn try_into_stream(self) -> Result<SendableRecordBatchStream, Error> {
326        match self {
327            Self::Batches { batches } => Ok(batches.as_stream()),
328            Self::Stream { stream } => Ok(stream),
329        }
330    }
331}
332
333impl RefillTask {
334    /// Query with "select * from table WHERE time >= range_start and time < range_end"
335    pub async fn create(
336        flow_id: FlowId,
337        table_id: TableId,
338        time_range: Option<(common_time::Timestamp, common_time::Timestamp)>,
339        time_col_name: &str,
340        table_src: &ManagedTableSource,
341    ) -> Result<RefillTask, Error> {
342        let (table_name, table_schema) = table_src.get_table_name_schema(&table_id).await?;
343        let all_col_names: BTreeSet<_> = table_schema
344            .relation_desc
345            .iter_names()
346            .flatten()
347            .map(|s| s.as_str())
348            .collect();
349
350        if !all_col_names.contains(time_col_name) {
351            UnexpectedSnafu {
352                reason: format!(
353                    "Can't find column {} in table {} while refill flow",
354                    time_col_name,
355                    table_name.join(".")
356                ),
357            }
358            .fail()?;
359        }
360
361        let sql = if let Some(time_range) = time_range {
362            format!(
363                "select * from {0} where {1} >= {2} and {1} < {3}",
364                table_name.join("."),
365                time_col_name,
366                Value::from(time_range.0),
367                Value::from(time_range.1),
368            )
369        } else {
370            format!("select * from {0}", table_name.join("."))
371        };
372
373        Ok(RefillTask {
374            data: TaskData {
375                flow_id,
376                table_id,
377                table_schema: table_schema.relation_desc,
378            },
379            state: TaskState::new(sql),
380        })
381    }
382
383    /// Start running the task in background, non-blocking
384    pub async fn start_running(
385        &mut self,
386        manager: FlowStreamingEngineRef,
387        invoker: &FrontendInvoker,
388    ) -> Result<(), Error> {
389        let TaskState::Prepared { sql } = &mut self.state else {
390            UnexpectedSnafu {
391                reason: "task is not prepared",
392            }
393            .fail()?
394        };
395
396        // we don't need information from query context in this query so a default query context is enough
397        let query_ctx = Arc::new(
398            QueryContextBuilder::default()
399                .current_catalog("greptime".to_string())
400                .current_schema("public".to_string())
401                .build(),
402        );
403
404        let stmt_exec = invoker.statement_executor();
405
406        let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
407            .map_err(BoxedError::new)
408            .context(ExternalSnafu)?;
409        let plan = stmt_exec
410            .plan(&stmt, query_ctx.clone())
411            .await
412            .map_err(BoxedError::new)
413            .context(ExternalSnafu)?;
414
415        let output_data = stmt_exec
416            .exec_plan(plan, query_ctx)
417            .await
418            .map_err(BoxedError::new)
419            .context(ExternalSnafu)?;
420
421        let output_stream = QueryStream::try_from(output_data)?;
422        let output_stream = output_stream.try_into_stream()?;
423
424        self.state
425            .start_running(&self.data, manager, output_stream)?;
426        Ok(())
427    }
428
429    pub async fn is_finished(&mut self) -> Result<bool, Error> {
430        self.state.is_finished().await
431    }
432}