pipeline/etl/
ctx_req.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::IntoIter;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
20use session::context::{QueryContext, QueryContextRef};
21use snafu::OptionExt;
22use vrl::value::Value as VrlValue;
23
24use crate::error::{Result, ValueMustBeMapSnafu};
25use crate::tablesuffix::TableSuffixTemplate;
26
27const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
28const GREPTIME_TTL: &str = "greptime_ttl";
29const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
30const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
31const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
32const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
33const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
34
35pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
36pub(crate) const TTL_KEY: &str = "ttl";
37pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
38pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
39pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
40pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
41pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
42
43pub const PIPELINE_HINT_KEYS: [&str; 7] = [
44    GREPTIME_AUTO_CREATE_TABLE,
45    GREPTIME_TTL,
46    GREPTIME_APPEND_MODE,
47    GREPTIME_MERGE_MODE,
48    GREPTIME_PHYSICAL_TABLE,
49    GREPTIME_SKIP_WAL,
50    GREPTIME_TABLE_SUFFIX,
51];
52
53const PIPELINE_HINT_PREFIX: &str = "greptime_";
54
55/// ContextOpt is a collection of options(including table options and pipeline options)
56/// that should be extracted during the pipeline execution.
57///
58/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
59/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
60#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct ContextOpt {
62    // table options, that need to be set in the query context before making row insert requests
63    auto_create_table: Option<String>,
64    ttl: Option<String>,
65    append_mode: Option<String>,
66    merge_mode: Option<String>,
67    physical_table: Option<String>,
68    skip_wal: Option<String>,
69
70    // reset the schema in query context
71    schema: Option<String>,
72
73    // pipeline options, not set in query context
74    // can be removed before the end of the pipeline execution
75    table_suffix: Option<String>,
76}
77
78impl ContextOpt {
79    pub fn set_physical_table(&mut self, physical_table: String) {
80        self.physical_table = Some(physical_table);
81    }
82
83    pub fn set_schema(&mut self, schema: String) {
84        self.schema = Some(schema);
85    }
86}
87
88impl ContextOpt {
89    pub fn from_pipeline_map_to_opt(value: &mut VrlValue) -> Result<Self> {
90        let map = value.as_object_mut().context(ValueMustBeMapSnafu)?;
91
92        let mut opt = Self::default();
93        for k in PIPELINE_HINT_KEYS {
94            if let Some(v) = map.remove(k) {
95                let v = v.to_string_lossy().to_string();
96                match k {
97                    GREPTIME_AUTO_CREATE_TABLE => {
98                        opt.auto_create_table = Some(v);
99                    }
100                    GREPTIME_TTL => {
101                        opt.ttl = Some(v);
102                    }
103                    GREPTIME_APPEND_MODE => {
104                        opt.append_mode = Some(v);
105                    }
106                    GREPTIME_MERGE_MODE => {
107                        opt.merge_mode = Some(v);
108                    }
109                    GREPTIME_PHYSICAL_TABLE => {
110                        opt.physical_table = Some(v);
111                    }
112                    GREPTIME_SKIP_WAL => {
113                        opt.skip_wal = Some(v);
114                    }
115                    GREPTIME_TABLE_SUFFIX => {
116                        opt.table_suffix = Some(v);
117                    }
118                    _ => {}
119                }
120            }
121        }
122        Ok(opt)
123    }
124
125    pub(crate) fn resolve_table_suffix(
126        &mut self,
127        table_suffix: Option<&TableSuffixTemplate>,
128        pipeline_map: &VrlValue,
129    ) -> Option<String> {
130        self.table_suffix
131            .take()
132            .or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
133    }
134
135    pub fn set_query_context(self, ctx: &mut QueryContext) {
136        if let Some(auto_create_table) = &self.auto_create_table {
137            ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
138        }
139        if let Some(ttl) = &self.ttl {
140            ctx.set_extension(TTL_KEY, ttl);
141        }
142        if let Some(append_mode) = &self.append_mode {
143            ctx.set_extension(APPEND_MODE_KEY, append_mode);
144        }
145        if let Some(merge_mode) = &self.merge_mode {
146            ctx.set_extension(MERGE_MODE_KEY, merge_mode);
147        }
148        if let Some(physical_table) = &self.physical_table {
149            ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
150        }
151        if let Some(skip_wal) = &self.skip_wal {
152            ctx.set_extension(SKIP_WAL_KEY, skip_wal);
153        }
154    }
155}
156
157/// ContextReq is a collection of row insert requests with different options.
158/// The default option is all empty.
159/// Because options are set in query context, we have to split them into sequential calls
160/// The key is a [`ContextOpt`] struct for strong type.
161/// e.g:
162/// {
163///     "skip_wal=true,ttl=1d": [RowInsertRequest],
164///     "ttl=1d": [RowInsertRequest],
165/// }
166#[derive(Debug, Default)]
167pub struct ContextReq {
168    req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
169}
170
171impl ContextReq {
172    pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
173        Self {
174            req: opt_map
175                .into_iter()
176                .map(|(opt, rows)| {
177                    (
178                        opt,
179                        vec![RowInsertRequest {
180                            table_name: table_name.clone(),
181                            rows: Some(rows),
182                        }],
183                    )
184                })
185                .collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
186        }
187    }
188
189    pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
190        let mut req_map = HashMap::new();
191        req_map.insert(ContextOpt::default(), reqs);
192        Self { req: req_map }
193    }
194
195    pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
196        self.req.entry(opt).or_default().push(req);
197    }
198
199    pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {
200        self.req.entry(opt).or_default().extend(reqs);
201    }
202
203    pub fn merge(&mut self, other: Self) {
204        for (opt, req) in other.req {
205            self.req.entry(opt).or_default().extend(req);
206        }
207    }
208
209    pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter {
210        let ctx = (*ctx).clone();
211
212        ContextReqIter {
213            opt_req: self.req.into_iter(),
214            ctx_template: ctx,
215        }
216    }
217
218    pub fn all_req(self) -> impl Iterator<Item = RowInsertRequest> {
219        self.req.into_iter().flat_map(|(_, req)| req)
220    }
221
222    pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
223        self.req.values().flatten()
224    }
225
226    pub fn map_len(&self) -> usize {
227        self.req.len()
228    }
229}
230
231// ContextReqIter is an iterator that iterates over the ContextReq.
232// The context template is cloned from the original query context.
233// It will clone the query context for each option and set the options to the context.
234// Then it will return the context and the row insert requests for actual insert.
235pub struct ContextReqIter {
236    opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
237    ctx_template: QueryContext,
238}
239
240impl Iterator for ContextReqIter {
241    type Item = (QueryContextRef, RowInsertRequests);
242
243    fn next(&mut self) -> Option<Self::Item> {
244        let (mut opt, req_vec) = self.opt_req.next()?;
245        let mut ctx = self.ctx_template.clone();
246        if let Some(schema) = opt.schema.take() {
247            ctx.set_current_schema(&schema);
248        }
249        opt.set_query_context(&mut ctx);
250
251        Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
252    }
253}