1use std::collections::hash_map::IntoIter;
16use std::sync::Arc;
17
18use ahash::{HashMap, HashMapExt};
19use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
20use session::context::{QueryContext, QueryContextRef};
21use snafu::OptionExt;
22use vrl::value::Value as VrlValue;
23
24use crate::error::{Result, ValueMustBeMapSnafu};
25use crate::tablesuffix::TableSuffixTemplate;
26
27const GREPTIME_AUTO_CREATE_TABLE: &str = "greptime_auto_create_table";
28const GREPTIME_TTL: &str = "greptime_ttl";
29const GREPTIME_APPEND_MODE: &str = "greptime_append_mode";
30const GREPTIME_MERGE_MODE: &str = "greptime_merge_mode";
31const GREPTIME_PHYSICAL_TABLE: &str = "greptime_physical_table";
32const GREPTIME_SKIP_WAL: &str = "greptime_skip_wal";
33const GREPTIME_TABLE_SUFFIX: &str = "greptime_table_suffix";
34
35pub(crate) const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
36pub(crate) const TTL_KEY: &str = "ttl";
37pub(crate) const APPEND_MODE_KEY: &str = "append_mode";
38pub(crate) const MERGE_MODE_KEY: &str = "merge_mode";
39pub(crate) const PHYSICAL_TABLE_KEY: &str = "physical_table";
40pub(crate) const SKIP_WAL_KEY: &str = "skip_wal";
41pub(crate) const TABLE_SUFFIX_KEY: &str = "table_suffix";
42
43pub const PIPELINE_HINT_KEYS: [&str; 7] = [
44 GREPTIME_AUTO_CREATE_TABLE,
45 GREPTIME_TTL,
46 GREPTIME_APPEND_MODE,
47 GREPTIME_MERGE_MODE,
48 GREPTIME_PHYSICAL_TABLE,
49 GREPTIME_SKIP_WAL,
50 GREPTIME_TABLE_SUFFIX,
51];
52
53const PIPELINE_HINT_PREFIX: &str = "greptime_";
54
55#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
61pub struct ContextOpt {
62 auto_create_table: Option<String>,
64 ttl: Option<String>,
65 append_mode: Option<String>,
66 merge_mode: Option<String>,
67 physical_table: Option<String>,
68 skip_wal: Option<String>,
69
70 schema: Option<String>,
72
73 table_suffix: Option<String>,
76}
77
78impl ContextOpt {
79 pub fn set_physical_table(&mut self, physical_table: String) {
80 self.physical_table = Some(physical_table);
81 }
82
83 pub fn set_schema(&mut self, schema: String) {
84 self.schema = Some(schema);
85 }
86}
87
88impl ContextOpt {
89 pub fn from_pipeline_map_to_opt(value: &mut VrlValue) -> Result<Self> {
90 let map = value.as_object_mut().context(ValueMustBeMapSnafu)?;
91
92 let mut opt = Self::default();
93 for k in PIPELINE_HINT_KEYS {
94 if let Some(v) = map.remove(k) {
95 let v = v.to_string_lossy().to_string();
96 match k {
97 GREPTIME_AUTO_CREATE_TABLE => {
98 opt.auto_create_table = Some(v);
99 }
100 GREPTIME_TTL => {
101 opt.ttl = Some(v);
102 }
103 GREPTIME_APPEND_MODE => {
104 opt.append_mode = Some(v);
105 }
106 GREPTIME_MERGE_MODE => {
107 opt.merge_mode = Some(v);
108 }
109 GREPTIME_PHYSICAL_TABLE => {
110 opt.physical_table = Some(v);
111 }
112 GREPTIME_SKIP_WAL => {
113 opt.skip_wal = Some(v);
114 }
115 GREPTIME_TABLE_SUFFIX => {
116 opt.table_suffix = Some(v);
117 }
118 _ => {}
119 }
120 }
121 }
122 Ok(opt)
123 }
124
125 pub(crate) fn resolve_table_suffix(
126 &mut self,
127 table_suffix: Option<&TableSuffixTemplate>,
128 pipeline_map: &VrlValue,
129 ) -> Option<String> {
130 self.table_suffix
131 .take()
132 .or_else(|| table_suffix.and_then(|s| s.apply(pipeline_map)))
133 }
134
135 pub fn set_query_context(self, ctx: &mut QueryContext) {
136 if let Some(auto_create_table) = &self.auto_create_table {
137 ctx.set_extension(AUTO_CREATE_TABLE_KEY, auto_create_table);
138 }
139 if let Some(ttl) = &self.ttl {
140 ctx.set_extension(TTL_KEY, ttl);
141 }
142 if let Some(append_mode) = &self.append_mode {
143 ctx.set_extension(APPEND_MODE_KEY, append_mode);
144 }
145 if let Some(merge_mode) = &self.merge_mode {
146 ctx.set_extension(MERGE_MODE_KEY, merge_mode);
147 }
148 if let Some(physical_table) = &self.physical_table {
149 ctx.set_extension(PHYSICAL_TABLE_KEY, physical_table);
150 }
151 if let Some(skip_wal) = &self.skip_wal {
152 ctx.set_extension(SKIP_WAL_KEY, skip_wal);
153 }
154 }
155}
156
157#[derive(Debug, Default)]
167pub struct ContextReq {
168 req: HashMap<ContextOpt, Vec<RowInsertRequest>>,
169}
170
171impl ContextReq {
172 pub fn from_opt_map(opt_map: HashMap<ContextOpt, Rows>, table_name: String) -> Self {
173 Self {
174 req: opt_map
175 .into_iter()
176 .map(|(opt, rows)| {
177 (
178 opt,
179 vec![RowInsertRequest {
180 table_name: table_name.clone(),
181 rows: Some(rows),
182 }],
183 )
184 })
185 .collect::<HashMap<ContextOpt, Vec<RowInsertRequest>>>(),
186 }
187 }
188
189 pub fn default_opt_with_reqs(reqs: Vec<RowInsertRequest>) -> Self {
190 let mut req_map = HashMap::new();
191 req_map.insert(ContextOpt::default(), reqs);
192 Self { req: req_map }
193 }
194
195 pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
196 self.req.entry(opt).or_default().push(req);
197 }
198
199 pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {
200 self.req.entry(opt).or_default().extend(reqs);
201 }
202
203 pub fn merge(&mut self, other: Self) {
204 for (opt, req) in other.req {
205 self.req.entry(opt).or_default().extend(req);
206 }
207 }
208
209 pub fn as_req_iter(self, ctx: QueryContextRef) -> ContextReqIter {
210 let ctx = (*ctx).clone();
211
212 ContextReqIter {
213 opt_req: self.req.into_iter(),
214 ctx_template: ctx,
215 }
216 }
217
218 pub fn all_req(self) -> impl Iterator<Item = RowInsertRequest> {
219 self.req.into_iter().flat_map(|(_, req)| req)
220 }
221
222 pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
223 self.req.values().flatten()
224 }
225
226 pub fn map_len(&self) -> usize {
227 self.req.len()
228 }
229}
230
231pub struct ContextReqIter {
236 opt_req: IntoIter<ContextOpt, Vec<RowInsertRequest>>,
237 ctx_template: QueryContext,
238}
239
240impl Iterator for ContextReqIter {
241 type Item = (QueryContextRef, RowInsertRequests);
242
243 fn next(&mut self) -> Option<Self::Item> {
244 let (mut opt, req_vec) = self.opt_req.next()?;
245 let mut ctx = self.ctx_template.clone();
246 if let Some(schema) = opt.schema.take() {
247 ctx.set_current_schema(&schema);
248 }
249 opt.set_query_context(&mut ctx);
250
251 Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
252 }
253}