Skip to main content

cli/data/export_v2/
extractor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema extraction from information_schema.
16//!
17//! For V2 DDL-only snapshots, extractor only persists the schema index.
18
19use std::collections::{HashMap, HashSet};
20
21use serde_json::Value;
22use snafu::ResultExt;
23
24use crate::data::export_v2::error::{
25    DatabaseSnafu, EmptyResultSnafu, Result, SchemaNotFoundSnafu, UnexpectedValueTypeSnafu,
26};
27use crate::data::export_v2::schema::{SchemaDefinition, SchemaSnapshot};
28use crate::data::sql::escape_sql_literal;
29use crate::database::DatabaseClient;
30
31/// System schemas that should be excluded from export.
32const SYSTEM_SCHEMAS: &[&str] = &["information_schema", "pg_catalog"];
33
34/// Extracts schema definitions from information_schema.
35pub struct SchemaExtractor<'a> {
36    client: &'a DatabaseClient,
37    catalog: &'a str,
38}
39
40impl<'a> SchemaExtractor<'a> {
41    /// Creates a new schema extractor.
42    pub fn new(client: &'a DatabaseClient, catalog: &'a str) -> Self {
43        Self { client, catalog }
44    }
45
46    /// Extracts the schema index for the given schemas.
47    ///
48    /// If `schemas` is None, extracts all non-system schemas.
49    pub async fn extract(&self, schemas: Option<&[String]>) -> Result<SchemaSnapshot> {
50        let mut snapshot = SchemaSnapshot::new();
51
52        let schema_names = match schemas {
53            Some(names) => self.validate_schemas(names).await?,
54            None => self.get_all_schemas().await?,
55        };
56
57        for schema_name in &schema_names {
58            let schema_def = self.extract_schema_definition(schema_name).await?;
59            snapshot.add_schema(schema_def);
60        }
61
62        Ok(snapshot)
63    }
64
65    /// Gets all non-system schemas in the catalog.
66    async fn get_all_schemas(&self) -> Result<Vec<String>> {
67        let sql = format!(
68            "SELECT schema_name FROM information_schema.schemata \
69             WHERE catalog_name = '{}'",
70            escape_sql_literal(self.catalog)
71        );
72
73        let records = self.query(&sql).await?;
74        let mut schemas = Vec::new();
75
76        for row in records {
77            let name = extract_string(&row, 0)?;
78            if !SYSTEM_SCHEMAS.contains(&name.as_str()) {
79                schemas.push(name);
80            }
81        }
82
83        Ok(schemas)
84    }
85
86    /// Validates that all specified schemas exist.
87    async fn validate_schemas(&self, schemas: &[String]) -> Result<Vec<String>> {
88        let all_schemas = self.get_all_schemas().await?;
89        dedupe_canonicalized_schemas(schemas, &all_schemas, self.catalog)
90    }
91
92    /// Extracts schema (database) definition.
93    async fn extract_schema_definition(&self, schema: &str) -> Result<SchemaDefinition> {
94        let sql = format!(
95            "SELECT schema_name, options FROM information_schema.schemata \
96             WHERE catalog_name = '{}' AND schema_name = '{}'",
97            escape_sql_literal(self.catalog),
98            escape_sql_literal(schema)
99        );
100
101        let records = self.query(&sql).await?;
102        if records.is_empty() {
103            return SchemaNotFoundSnafu {
104                catalog: self.catalog,
105                schema,
106            }
107            .fail();
108        }
109
110        let name = extract_string(&records[0], 0)?;
111        let options = extract_optional_string(&records[0], 1)
112            .map(|opts| parse_options(&opts))
113            .unwrap_or_default();
114
115        Ok(SchemaDefinition {
116            catalog: self.catalog.to_string(),
117            name,
118            options,
119        })
120    }
121
122    /// Executes a SQL query and returns the results.
123    async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>> {
124        self.client
125            .sql_in_public(sql)
126            .await
127            .context(DatabaseSnafu)?
128            .ok_or_else(|| EmptyResultSnafu.build())
129    }
130}
131
132/// Extracts a string value from a row.
133fn extract_string(row: &[Value], index: usize) -> Result<String> {
134    match row.get(index) {
135        Some(Value::String(s)) => Ok(s.clone()),
136        Some(Value::Null) => UnexpectedValueTypeSnafu.fail(),
137        _ => UnexpectedValueTypeSnafu.fail(),
138    }
139}
140
141/// Extracts an optional string value from a row.
142fn extract_optional_string(row: &[Value], index: usize) -> Option<String> {
143    match row.get(index) {
144        Some(Value::String(s)) if !s.is_empty() => Some(s.clone()),
145        _ => None,
146    }
147}
148
149/// Parses options string into a HashMap.
150fn parse_options(options_str: &str) -> HashMap<String, String> {
151    if let Ok(map) = serde_json::from_str::<HashMap<String, String>>(options_str) {
152        return map;
153    }
154
155    let mut options = HashMap::new();
156    for line in options_str.lines() {
157        let trimmed = line.trim();
158        if trimmed.is_empty() {
159            continue;
160        }
161
162        if let Some((key, value)) = parse_quoted_option_line(trimmed) {
163            options.insert(key, value);
164            continue;
165        }
166
167        for part in trimmed.split_whitespace() {
168            if let Some((key, value)) = part.split_once('=') {
169                options.insert(key.to_string(), value.to_string());
170            }
171        }
172    }
173    options
174}
175
176fn parse_quoted_option_line(line: &str) -> Option<(String, String)> {
177    let key = line.strip_prefix('\'')?;
178    let (key, rest) = key.split_once("'='")?;
179    let value = rest.strip_suffix('\'')?;
180    Some((key.to_string(), value.to_string()))
181}
182
183fn dedupe_canonicalized_schemas(
184    requested: &[String],
185    available: &[String],
186    catalog: &str,
187) -> Result<Vec<String>> {
188    let mut canonicalized = Vec::new();
189    let mut seen = HashSet::new();
190
191    for schema in requested {
192        let Some(canonical) = available.iter().find(|s| s.eq_ignore_ascii_case(schema)) else {
193            return SchemaNotFoundSnafu { catalog, schema }.fail();
194        };
195
196        if seen.insert(canonical.to_ascii_lowercase()) {
197            canonicalized.push(canonical.clone());
198        }
199    }
200
201    Ok(canonicalized)
202}
203
204#[cfg(test)]
205mod tests {
206    use serde_json::Value;
207
208    use super::*;
209
210    #[test]
211    fn test_parse_options_json() {
212        let opts = r#"{"ttl": "30d", "custom": "value"}"#;
213        let parsed = parse_options(opts);
214        assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
215        assert_eq!(parsed.get("custom"), Some(&"value".to_string()));
216    }
217
218    #[test]
219    fn test_parse_options_key_value() {
220        let opts = "ttl=30d custom=value";
221        let parsed = parse_options(opts);
222        assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
223        assert_eq!(parsed.get("custom"), Some(&"value".to_string()));
224    }
225
226    #[test]
227    fn test_parse_options_schema_display_format() {
228        let opts = "'ttl'='30d'\n'custom'='value with spaces'\n";
229        let parsed = parse_options(opts);
230        assert_eq!(parsed.get("ttl"), Some(&"30d".to_string()));
231        assert_eq!(parsed.get("custom"), Some(&"value with spaces".to_string()));
232    }
233
234    #[test]
235    fn test_extract_string_rejects_null() {
236        let row = vec![Value::Null];
237        assert!(extract_string(&row, 0).is_err());
238    }
239
240    #[test]
241    fn test_dedupe_canonicalized_schemas() {
242        let available = vec!["public".to_string(), "test_db".to_string()];
243        let requested = vec![
244            "PUBLIC".to_string(),
245            "public".to_string(),
246            "Test_Db".to_string(),
247        ];
248
249        let canonicalized = dedupe_canonicalized_schemas(&requested, &available, "greptime")
250            .expect("schemas should be canonicalized");
251
252        assert_eq!(canonicalized, vec!["public", "test_db"]);
253    }
254}