1use std::collections::hash_map::IntoIter;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
20use session::context::{QueryContext, QueryContextRef};
21use snafu::OptionExt;
22
23use crate::error::{Result, ValueMustBeMapSnafu};
24use crate::tablesuffix::TableSuffixTemplate;
25use crate::Value;
26
27const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
28const GREPTIME_TTL: &str = "greptime_ttl";
29const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
30const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
31const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
32const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
33const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
34
35pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
36pub(crate) const TTL_KEY: &str = "ttl";
37pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
38pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
39pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
40pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
41pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
42
43pub const PIPELINE_HINT_KEYS: [&str; 7] = [
44 GREPTIME_AUTO_CREATE_TABLE,
45 GREPTIME_TTL,
46 GREPTIME_APPEND_MODE,
47 GREPTIME_MERGE_MODE,
48 GREPTIME_PHYSICAL_TABLE,
49 GREPTIME_SKIP_WAL,
50 GREPTIME_TABLE_SUFFIX,
51];
52
53const PIPELINE_HINT_PREFIX: &str = "greptime_";
54
55#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct ContextOpt {
62 auto_create_table: Option<String>,
64 ttl: Option<String>,
65 append_mode: Option<String>,
66 merge_mode: Option<String>,
67 physical_table: Option<String>,
68 skip_wal: Option<String>,
69
70 schema: Option<String>,
72
73 table_suffix: Option<String>,
76}
77
78impl ContextOpt {
79 pub fn set_physical_table(&mut self, physical_table: String) {
80 self.physical_table = Some(physical_table);
81 }
82
83 pub fn set_schema(&mut self, schema: String) {
84 self.schema = Some(schema);
85 }
86}
87
88impl ContextOpt {
89 pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result<Self> {
90 let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?;
91 let mut opt = Self::default();
92 for k in PIPELINE_HINT_KEYS {
93 if let Some(v) = pipeline_map.remove(k) {
94 match k {
95 GREPTIME_AUTO_CREATE_TABLE => {
96 opt.auto_create_table = Some(v.to_str_value());
97 }
98 GREPTIME_TTL => {
99 opt.ttl = Some(v.to_str_value());
100 }
101 GREPTIME_APPEND_MODE => {
102 opt.append_mode = Some(v.to_str_value());
103 }
104 GREPTIME_MERGE_MODE => {
105 opt.merge_mode = Some(v.to_str_value());
106 }
107 GREPTIME_PHYSICAL_TABLE => {
108 opt.physical_table = Some(v.to_str_value());
109 }
110 GREPTIME_SKIP_WAL => {
111 opt.skip_wal = Some(v.to_str_value());
112 }
113 GREPTIME_TABLE_SUFFIX => {
114 opt.table_suffix = Some(v.to_str_value());
115 }
116 _ => {}
117 }
118 }
119 }
120 Ok(opt)
121 }
122
123 pub(crate) fn resolve_table_suffix(
124 &mut self,
125 table_suffix: Option<&TableSuffixTemplate>,
126 pipeline_map: &Value,
127 ) -> Option<String> {
128 self.table_suffix
129 .take()
130 .or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
131 }
132
133 pub fn set_query_context(self, ctx: &mut QueryContext) {
134 if let Some(auto_create_table) = &self.auto_create_table {
135 ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
136 }
137 if let Some(ttl) = &self.ttl {
138 ctx.set_extension(TTL_KEY, ttl);
139 }
140 if let Some(append_mode) = &self.append_mode {
141 ctx.set_extension(APPEND_MODE_KEY, append_mode);
142 }
143 if let Some(merge_mode) = &self.merge_mode {
144 ctx.set_extension(MERGE_MODE_KEY, merge_mode);
145 }
146 if let Some(physical_table) = &self.physical_table {
147 ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
148 }
149 if let Some(skip_wal) = &self.skip_wal {
150 ctx.set_extension(SKIP_WAL_KEY, skip_wal);
151 }
152 }
153}
154
155#[derive(Debug, Default)]
165pub struct ContextReq {
166 req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
167}
168
169impl ContextReq {
170 pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
171 Self {
172 req: opt_map
173 .into_iter()
174 .map(|(opt, rows)| {
175 (
176 opt,
177 vec![RowInsertRequest {
178 table_name: table_name.clone(),
179 rows: Some(rows),
180 }],
181 )
182 })
183 .collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
184 }
185 }
186
187 pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
188 let mut req_map = HashMap::new();
189 req_map.insert(ContextOpt::default(), reqs);
190 Self { req: req_map }
191 }
192
193 pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
194 self.req.entry(opt).or_default().push(req);
195 }
196
197 pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {
198 self.req.entry(opt).or_default().extend(reqs);
199 }
200
201 pub fn merge(&mut self, other: Self) {
202 for (opt, req) in other.req {
203 self.req.entry(opt).or_default().extend(req);
204 }
205 }
206
207 pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter {
208 let ctx = (*ctx).clone();
209
210 ContextReqIter {
211 opt_req: self.req.into_iter(),
212 ctx_template: ctx,
213 }
214 }
215
216 pub fn all_req(self) -> impl Iterator<Item = RowInsertRequest> {
217 self.req.into_iter().flat_map(|(_, req)| req)
218 }
219
220 pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
221 self.req.values().flatten()
222 }
223
224 pub fn map_len(&self) -> usize {
225 self.req.len()
226 }
227}
228
229pub struct ContextReqIter {
234 opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
235 ctx_template: QueryContext,
236}
237
238impl Iterator for ContextReqIter {
239 type Item = (QueryContextRef, RowInsertRequests);
240
241 fn next(&mut self) -> Option<Self::Item> {
242 let (mut opt, req_vec) = self.opt_req.next()?;
243 let mut ctx = self.ctx_template.clone();
244 if let Some(schema) = opt.schema.take() {
245 ctx.set_current_schema(&schema);
246 }
247 opt.set_query_context(&mut ctx);
248
249 Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
250 }
251}