cli/data/export_v2/
extractor.rs1use std::collections::{HashMap, HashSet};
20
21use serde_json::Value;
22use snafu::ResultExt;
23
24use crate::data::export_v2::error::{
25 DatabaseSnafu, EmptyResultSnafu, Result, SchemaNotFoundSnafu, UnexpectedValueTypeSnafu,
26};
27use crate::data::export_v2::schema::{SchemaDefinition, SchemaSnapshot};
28use crate::data::sql::escape_sql_literal;
29use crate::database::DatabaseClient;
30
31const SYSTEM_SCHEMAS: &[&str] = &["information_schema", "pg_catalog"];
33
34pub struct SchemaExtractor<'a> {
36 client: &'a DatabaseClient,
37 catalog: &'a str,
38}
39
40impl<'a> SchemaExtractor<'a> {
41 pub fn new(client: &'a DatabaseClient, catalog: &'a str) -> Self {
43 Self { client, catalog }
44 }
45
46 pub async fn extract(&self, schemas: Option<&[String]>) -> Result<SchemaSnapshot> {
50 let mut snapshot = SchemaSnapshot::new();
51
52 let schema_names = match schemas {
53 Some(names) => self.validate_schemas(names).await?,
54 None => self.get_all_schemas().await?,
55 };
56
57 for schema_name in &schema_names {
58 let schema_def = self.extract_schema_definition(schema_name).await?;
59 snapshot.add_schema(schema_def);
60 }
61
62 Ok(snapshot)
63 }
64
65 async fn get_all_schemas(&self) -> Result<Vec<String>> {
67 let sql = format!(
68 "SELECT schema_name FROM information_schema.schemata \
69 WHERE catalog_name = '{}'",
70 escape_sql_literal(self.catalog)
71 );
72
73 let records = self.query(&sql).await?;
74 let mut schemas = Vec::new();
75
76 for row in records {
77 let name = extract_string(&row, 0)?;
78 if !SYSTEM_SCHEMAS.contains(&name.as_str()) {
79 schemas.push(name);
80 }
81 }
82
83 Ok(schemas)
84 }
85
86 async fn validate_schemas(&self, schemas: &[String]) -> Result<Vec<String>> {
88 let all_schemas = self.get_all_schemas().await?;
89 dedupe_canonicalized_schemas(schemas, &all_schemas, self.catalog)
90 }
91
92 async fn extract_schema_definition(&self, schema: &str) -> Result<SchemaDefinition> {
94 let sql = format!(
95 "SELECT schema_name, options FROM information_schema.schemata \
96 WHERE catalog_name = '{}' AND schema_name = '{}'",
97 escape_sql_literal(self.catalog),
98 escape_sql_literal(schema)
99 );
100
101 let records = self.query(&sql).await?;
102 if records.is_empty() {
103 return SchemaNotFoundSnafu {
104 catalog: self.catalog,
105 schema,
106 }
107 .fail();
108 }
109
110 let name = extract_string(&records[0], 0)?;
111 let options = extract_optional_string(&records[0], 1)
112 .map(|opts| parse_options(&opts))
113 .unwrap_or_default();
114
115 Ok(SchemaDefinition {
116 catalog: self.catalog.to_string(),
117 name,
118 options,
119 })
120 }
121
122 async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>> {
124 self.client
125 .sql_in_public(sql)
126 .await
127 .context(DatabaseSnafu)?
128 .ok_or_else(|| EmptyResultSnafu.build())
129 }
130}
131
132fn extract_string(row: &[Value], index: usize) -> Result<String> {
134 match row.get(index) {
135 Some(Value::String(s)) => Ok(s.clone()),
136 Some(Value::Null) => UnexpectedValueTypeSnafu.fail(),
137 _ => UnexpectedValueTypeSnafu.fail(),
138 }
139}
140
141fn extract_optional_string(row: &[Value], index: usize) -> Option<String> {
143 match row.get(index) {
144 Some(Value::String(s)) if !s.is_empty() => Some(s.clone()),
145 _ => None,
146 }
147}
148
149fn parse_options(options_str: &str) -> HashMap<String, String> {
151 if let Ok(map) = serde_json::from_str::<HashMap<String, String>>(options_str) {
152 return map;
153 }
154
155 let mut options = HashMap::new();
156 for line in options_str.lines() {
157 let trimmed = line.trim();
158 if trimmed.is_empty() {
159 continue;
160 }
161
162 if let Some((key, value)) = parse_quoted_option_line(trimmed) {
163 options.insert(key, value);
164 continue;
165 }
166
167 for part in trimmed.split_whitespace() {
168 if let Some((key, value)) = part.split_once('=') {
169 options.insert(key.to_string(), value.to_string());
170 }
171 }
172 }
173 options
174}
175
176fn parse_quoted_option_line(line: &str) -> Option<(String, String)> {
177 let key = line.strip_prefix('\'')?;
178 let (key, rest) = key.split_once("'='")?;
179 let value = rest.strip_suffix('\'')?;
180 Some((key.to_string(), value.to_string()))
181}
182
183fn dedupe_canonicalized_schemas(
184 requested: &[String],
185 available: &[String],
186 catalog: &str,
187) -> Result<Vec<String>> {
188 let mut canonicalized = Vec::new();
189 let mut seen = HashSet::new();
190
191 for schema in requested {
192 let Some(canonical) = available.iter().find(|s| s.eq_ignore_ascii_case(schema)) else {
193 return SchemaNotFoundSnafu { catalog, schema }.fail();
194 };
195
196 if seen.insert(canonical.to_ascii_lowercase()) {
197 canonicalized.push(canonical.clone());
198 }
199 }
200
201 Ok(canonicalized)
202}
203
204#[cfg(test)]
205mod tests {
206 use serde_json::Value;
207
208 use super::*;
209
210 #[test]
211 fn test_parse_options_json() {
212 let opts = r#"{"ttl": "30d", "custom": "value"}"#;
213 let parsed = parse_options(opts);
214 assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
215 assert_eq!(parsed.get("custom"), Some(&"value".to_string()));
216 }
217
218 #[test]
219 fn test_parse_options_key_value() {
220 let opts = "ttl=30d custom=value";
221 let parsed = parse_options(opts);
222 assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
223 assert_eq!(parsed.get("custom"), Some(&"value".to_string()));
224 }
225
226 #[test]
227 fn test_parse_options_schema_display_format() {
228 let opts = "'ttl'='30d'\n'custom'='value with spaces'\n";
229 let parsed = parse_options(opts);
230 assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
231 assert_eq!(parsed.get("custom"), Some(&"value with spaces".to_string()));
232 }
233
234 #[test]
235 fn test_extract_string_rejects_null() {
236 let row = vec![Value::Null];
237 assert!(extract_string(&row, 0).is_err());
238 }
239
240 #[test]
241 fn test_dedupe_canonicalized_schemas() {
242 let available = vec!["public".to_string(), "test_db".to_string()];
243 let requested = vec![
244 "PUBLIC".to_string(),
245 "public".to_string(),
246 "Test_Db".to_string(),
247 ];
248
249 let canonicalized = dedupe_canonicalized_schemas(&requested, &available, "greptime")
250 .expect("schemas should be canonicalized");
251
252 assert_eq!(canonicalized, vec!["public", "test_db"]);
253 }
254}