pipeline/etl/
ctx_req.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::IntoIter;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
20use session::context::{QueryContext, QueryContextRef};
21use snafu::OptionExt;
22
23use crate::error::{Result, ValueMustBeMapSnafu};
24use crate::tablesuffix::TableSuffixTemplate;
25use crate::Value;
26
27const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
28const GREPTIME_TTL: &str = "greptime_ttl";
29const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
30const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
31const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
32const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
33const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
34
35pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
36pub(crate) const TTL_KEY: &str = "ttl";
37pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
38pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
39pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
40pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
41pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
42
43pub const PIPELINE_HINT_KEYS: [&str; 7] = [
44    GREPTIME_AUTO_CREATE_TABLE,
45    GREPTIME_TTL,
46    GREPTIME_APPEND_MODE,
47    GREPTIME_MERGE_MODE,
48    GREPTIME_PHYSICAL_TABLE,
49    GREPTIME_SKIP_WAL,
50    GREPTIME_TABLE_SUFFIX,
51];
52
53const PIPELINE_HINT_PREFIX: &str = "greptime_";
54
55/// ContextOpt is a collection of options(including table options and pipeline options)
56/// that should be extracted during the pipeline execution.
57///
58/// The options are set in the format of hint keys. See [`PIPELINE_HINT_KEYS`].
59/// It's is used as the key in [`ContextReq`] for grouping the row insert requests.
60#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct ContextOpt {
62    // table options, that need to be set in the query context before making row insert requests
63    auto_create_table: Option<String>,
64    ttl: Option<String>,
65    append_mode: Option<String>,
66    merge_mode: Option<String>,
67    physical_table: Option<String>,
68    skip_wal: Option<String>,
69
70    // reset the schema in query context
71    schema: Option<String>,
72
73    // pipeline options, not set in query context
74    // can be removed before the end of the pipeline execution
75    table_suffix: Option<String>,
76}
77
78impl ContextOpt {
79    pub fn set_physical_table(&mut self, physical_table: String) {
80        self.physical_table = Some(physical_table);
81    }
82
83    pub fn set_schema(&mut self, schema: String) {
84        self.schema = Some(schema);
85    }
86}
87
88impl ContextOpt {
89    pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result<Self> {
90        let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?;
91        let mut opt = Self::default();
92        for k in PIPELINE_HINT_KEYS {
93            if let Some(v) = pipeline_map.remove(k) {
94                match k {
95                    GREPTIME_AUTO_CREATE_TABLE => {
96                        opt.auto_create_table = Some(v.to_str_value());
97                    }
98                    GREPTIME_TTL => {
99                        opt.ttl = Some(v.to_str_value());
100                    }
101                    GREPTIME_APPEND_MODE => {
102                        opt.append_mode = Some(v.to_str_value());
103                    }
104                    GREPTIME_MERGE_MODE => {
105                        opt.merge_mode = Some(v.to_str_value());
106                    }
107                    GREPTIME_PHYSICAL_TABLE => {
108                        opt.physical_table = Some(v.to_str_value());
109                    }
110                    GREPTIME_SKIP_WAL => {
111                        opt.skip_wal = Some(v.to_str_value());
112                    }
113                    GREPTIME_TABLE_SUFFIX => {
114                        opt.table_suffix = Some(v.to_str_value());
115                    }
116                    _ => {}
117                }
118            }
119        }
120        Ok(opt)
121    }
122
123    pub(crate) fn resolve_table_suffix(
124        &mut self,
125        table_suffix: Option<&TableSuffixTemplate>,
126        pipeline_map: &Value,
127    ) -> Option<String> {
128        self.table_suffix
129            .take()
130            .or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
131    }
132
133    pub fn set_query_context(self, ctx: &mut QueryContext) {
134        if let Some(auto_create_table) = &self.auto_create_table {
135            ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
136        }
137        if let Some(ttl) = &self.ttl {
138            ctx.set_extension(TTL_KEY, ttl);
139        }
140        if let Some(append_mode) = &self.append_mode {
141            ctx.set_extension(APPEND_MODE_KEY, append_mode);
142        }
143        if let Some(merge_mode) = &self.merge_mode {
144            ctx.set_extension(MERGE_MODE_KEY, merge_mode);
145        }
146        if let Some(physical_table) = &self.physical_table {
147            ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
148        }
149        if let Some(skip_wal) = &self.skip_wal {
150            ctx.set_extension(SKIP_WAL_KEY, skip_wal);
151        }
152    }
153}
154
155/// ContextReq is a collection of row insert requests with different options.
156/// The default option is all empty.
157/// Because options are set in query context, we have to split them into sequential calls
158/// The key is a [`ContextOpt`] struct for strong type.
159/// e.g:
160/// {
161///     "skip_wal=true,ttl=1d": [RowInsertRequest],
162///     "ttl=1d": [RowInsertRequest],
163/// }
164#[derive(Debug, Default)]
165pub struct ContextReq {
166    req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
167}
168
169impl ContextReq {
170    pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
171        Self {
172            req: opt_map
173                .into_iter()
174                .map(|(opt, rows)| {
175                    (
176                        opt,
177                        vec![RowInsertRequest {
178                            table_name: table_name.clone(),
179                            rows: Some(rows),
180                        }],
181                    )
182                })
183                .collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
184        }
185    }
186
187    pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
188        let mut req_map = HashMap::new();
189        req_map.insert(ContextOpt::default(), reqs);
190        Self { req: req_map }
191    }
192
193    pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
194        self.req.entry(opt).or_default().push(req);
195    }
196
197    pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {
198        self.req.entry(opt).or_default().extend(reqs);
199    }
200
201    pub fn merge(&mut self, other: Self) {
202        for (opt, req) in other.req {
203            self.req.entry(opt).or_default().extend(req);
204        }
205    }
206
207    pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter {
208        let ctx = (*ctx).clone();
209
210        ContextReqIter {
211            opt_req: self.req.into_iter(),
212            ctx_template: ctx,
213        }
214    }
215
216    pub fn all_req(self) -> impl Iterator<Item = RowInsertRequest> {
217        self.req.into_iter().flat_map(|(_, req)| req)
218    }
219
220    pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
221        self.req.values().flatten()
222    }
223
224    pub fn map_len(&self) -> usize {
225        self.req.len()
226    }
227}
228
229// ContextReqIter is an iterator that iterates over the ContextReq.
230// The context template is cloned from the original query context.
231// It will clone the query context for each option and set the options to the context.
232// Then it will return the context and the row insert requests for actual insert.
233pub struct ContextReqIter {
234    opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
235    ctx_template: QueryContext,
236}
237
238impl Iterator for ContextReqIter {
239    type Item = (QueryContextRef, RowInsertRequests);
240
241    fn next(&mut self) -> Option<Self::Item> {
242        let (mut opt, req_vec) = self.opt_req.next()?;
243        let mut ctx = self.ctx_template.clone();
244        if let Some(schema) = opt.schema.take() {
245            ctx.set_current_schema(&schema);
246        }
247        opt.set_query_context(&mut ctx);
248
249        Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
250    }
251}