1mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30 COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35 COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36 COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37 ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38 INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39 TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40 VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43pub use crate::schema::raw::RawSchema;
44
45pub const VERSION_KEY: &str = "greptime:version";
47pub const TYPE_KEY: &str = "greptime:type";
49
50#[derive(Clone, PartialEq, Eq)]
52pub struct Schema {
53 column_schemas: Vec<ColumnSchema>,
54 name_to_index: HashMap<String, usize>,
55 arrow_schema: Arc<ArrowSchema>,
56 timestamp_index: Option<usize>,
61 version: u32,
65}
66
67impl fmt::Debug for Schema {
68 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69 f.debug_struct("Schema")
70 .field("column_schemas", &self.column_schemas)
71 .field("name_to_index", &self.name_to_index)
72 .field("timestamp_index", &self.timestamp_index)
73 .field("version", &self.version)
74 .finish()
75 }
76}
77
78impl Schema {
79 pub const INITIAL_VERSION: u32 = 0;
81
82 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
87 SchemaBuilder::try_from(column_schemas)
89 .unwrap()
90 .build()
91 .unwrap()
92 }
93
94 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
96 SchemaBuilder::try_from(column_schemas)?.build()
97 }
98
99 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
100 &self.arrow_schema
101 }
102
103 pub fn column_schemas(&self) -> &[ColumnSchema] {
104 &self.column_schemas
105 }
106
107 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
108 self.name_to_index
109 .get(name)
110 .map(|index| &self.column_schemas[*index])
111 }
112
113 pub fn column_name_by_index(&self, idx: usize) -> &str {
117 &self.column_schemas[idx].name
118 }
119
120 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
121 self.name_to_index.get(name).copied()
122 }
123
124 pub fn contains_column(&self, name: &str) -> bool {
125 self.name_to_index.contains_key(name)
126 }
127
128 pub fn num_columns(&self) -> usize {
129 self.column_schemas.len()
130 }
131
132 pub fn is_empty(&self) -> bool {
133 self.column_schemas.is_empty()
134 }
135
136 pub fn timestamp_index(&self) -> Option<usize> {
138 self.timestamp_index
139 }
140
141 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
142 self.timestamp_index.map(|idx| &self.column_schemas[idx])
143 }
144
145 pub fn version(&self) -> u32 {
146 self.version
147 }
148
149 pub fn metadata(&self) -> &HashMap<String, String> {
150 &self.arrow_schema.metadata
151 }
152
153 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
159 let mut column_schemas = Vec::with_capacity(indices.len());
160 let mut timestamp_index = None;
161 for index in indices {
162 if let Some(ts_index) = self.timestamp_index
163 && ts_index == *index
164 {
165 timestamp_index = Some(column_schemas.len());
166 }
167 column_schemas.push(self.column_schemas[*index].clone());
168 }
169 let arrow_schema = self
170 .arrow_schema
171 .project(indices)
172 .context(ProjectArrowSchemaSnafu)?;
173 let name_to_index = column_schemas
174 .iter()
175 .enumerate()
176 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
177 .collect();
178
179 Ok(Self {
180 column_schemas,
181 name_to_index,
182 arrow_schema: Arc::new(arrow_schema),
183 timestamp_index,
184 version: self.version,
185 })
186 }
187}
188
189#[derive(Default)]
190pub struct SchemaBuilder {
191 column_schemas: Vec<ColumnSchema>,
192 name_to_index: HashMap<String, usize>,
193 fields: Vec<Field>,
194 timestamp_index: Option<usize>,
195 version: u32,
196 metadata: HashMap<String, String>,
197}
198
199impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
200 type Error = Error;
201
202 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
203 SchemaBuilder::try_from_columns(column_schemas)
204 }
205}
206
207impl SchemaBuilder {
208 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
209 let FieldsAndIndices {
210 fields,
211 name_to_index,
212 timestamp_index,
213 } = collect_fields(&column_schemas)?;
214
215 Ok(Self {
216 column_schemas,
217 name_to_index,
218 fields,
219 timestamp_index,
220 ..Default::default()
221 })
222 }
223
224 pub fn version(mut self, version: u32) -> Self {
225 self.version = version;
226 self
227 }
228
229 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
233 let _ = self.metadata.insert(key.into(), value.into());
234 self
235 }
236
237 pub fn build(mut self) -> Result<Schema> {
238 if let Some(timestamp_index) = self.timestamp_index {
239 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
240 }
241
242 self.metadata
243 .insert(VERSION_KEY.to_string(), self.version.to_string());
244
245 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
246
247 Ok(Schema {
248 column_schemas: self.column_schemas,
249 name_to_index: self.name_to_index,
250 arrow_schema: Arc::new(arrow_schema),
251 timestamp_index: self.timestamp_index,
252 version: self.version,
253 })
254 }
255}
256
257struct FieldsAndIndices {
258 fields: Vec<Field>,
259 name_to_index: HashMap<String, usize>,
260 timestamp_index: Option<usize>,
261}
262
263fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
264 let mut fields = Vec::with_capacity(column_schemas.len());
265 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
266 let mut timestamp_index = None;
267 for (index, column_schema) in column_schemas.iter().enumerate() {
268 if column_schema.is_time_index() && timestamp_index.is_none() {
269 timestamp_index = Some(index);
270 }
271 let mut field = Field::try_from(column_schema)?;
272
273 let extype = match column_schema.data_type {
275 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
276 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
277 _ => None,
278 };
279 if let Some(extype) = extype {
280 field
281 .metadata_mut()
282 .insert(TYPE_KEY.to_string(), extype.to_string());
283 }
284 fields.push(field);
285 ensure!(
286 name_to_index
287 .insert(column_schema.name.clone(), index)
288 .is_none(),
289 DuplicateColumnSnafu {
290 column: &column_schema.name,
291 }
292 );
293 }
294
295 Ok(FieldsAndIndices {
296 fields,
297 name_to_index,
298 timestamp_index,
299 })
300}
301
302fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
303 ensure!(
304 timestamp_index < column_schemas.len(),
305 error::InvalidTimestampIndexSnafu {
306 index: timestamp_index,
307 }
308 );
309
310 let column_schema = &column_schemas[timestamp_index];
311 ensure!(
312 column_schema.data_type.is_timestamp(),
313 error::InvalidTimestampIndexSnafu {
314 index: timestamp_index,
315 }
316 );
317 ensure!(
318 column_schema.is_time_index(),
319 error::InvalidTimestampIndexSnafu {
320 index: timestamp_index,
321 }
322 );
323
324 Ok(())
325}
326
327pub type SchemaRef = Arc<Schema>;
328
329impl TryFrom<Arc<ArrowSchema>> for Schema {
330 type Error = Error;
331
332 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
333 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
334 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
335 for field in &arrow_schema.fields {
336 let column_schema = ColumnSchema::try_from(field.as_ref())?;
337 let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
338 column_schemas.push(column_schema);
339 }
340
341 let mut timestamp_index = None;
342 for (index, column_schema) in column_schemas.iter().enumerate() {
343 if column_schema.is_time_index() {
344 validate_timestamp_index(&column_schemas, index)?;
345 timestamp_index = Some(index);
346 break;
347 }
348 }
349
350 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
351
352 Ok(Self {
353 column_schemas,
354 name_to_index,
355 arrow_schema,
356 timestamp_index,
357 version,
358 })
359 }
360}
361
362impl TryFrom<ArrowSchema> for Schema {
363 type Error = Error;
364
365 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
366 let arrow_schema = Arc::new(arrow_schema);
367
368 Schema::try_from(arrow_schema)
369 }
370}
371
372impl TryFrom<DFSchemaRef> for Schema {
373 type Error = Error;
374
375 fn try_from(value: DFSchemaRef) -> Result<Self> {
376 value.inner().clone().try_into()
377 }
378}
379
380fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
381 if let Some(value) = metadata.get(key) {
382 let version = value
383 .parse()
384 .context(error::ParseSchemaVersionSnafu { value })?;
385
386 Ok(version)
387 } else {
388 Ok(Schema::INITIAL_VERSION)
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::data_type::ConcreteDataType;
396
397 #[test]
398 fn test_build_empty_schema() {
399 let schema = SchemaBuilder::default().build().unwrap();
400 assert_eq!(0, schema.num_columns());
401 assert!(schema.is_empty());
402 }
403
404 #[test]
405 fn test_schema_no_timestamp() {
406 let column_schemas = vec![
407 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
408 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
409 ];
410 let schema = Schema::new(column_schemas.clone());
411
412 assert_eq!(2, schema.num_columns());
413 assert!(!schema.is_empty());
414 assert!(schema.timestamp_index().is_none());
415 assert!(schema.timestamp_column().is_none());
416 assert_eq!(Schema::INITIAL_VERSION, schema.version());
417
418 for column_schema in &column_schemas {
419 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
420 assert_eq!(column_schema, found);
421 }
422 assert!(schema.column_schema_by_name("col3").is_none());
423
424 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
425
426 assert_eq!(schema, new_schema);
427 assert_eq!(column_schemas, schema.column_schemas());
428 }
429
430 #[test]
431 fn test_schema_duplicate_column() {
432 let column_schemas = vec![
433 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
434 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
435 ];
436 let err = Schema::try_new(column_schemas).unwrap_err();
437
438 assert!(
439 matches!(err, Error::DuplicateColumn { .. }),
440 "expect DuplicateColumn, found {}",
441 err
442 );
443 }
444
445 #[test]
446 fn test_metadata() {
447 let column_schemas = vec![ColumnSchema::new(
448 "col1",
449 ConcreteDataType::int32_datatype(),
450 false,
451 )];
452 let schema = SchemaBuilder::try_from(column_schemas)
453 .unwrap()
454 .add_metadata("k1", "v1")
455 .build()
456 .unwrap();
457
458 assert_eq!("v1", schema.metadata().get("k1").unwrap());
459 }
460
461 #[test]
462 fn test_schema_with_timestamp() {
463 let column_schemas = vec![
464 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
465 ColumnSchema::new(
466 "ts",
467 ConcreteDataType::timestamp_millisecond_datatype(),
468 false,
469 )
470 .with_time_index(true),
471 ];
472 let schema = SchemaBuilder::try_from(column_schemas.clone())
473 .unwrap()
474 .version(123)
475 .build()
476 .unwrap();
477
478 assert_eq!(1, schema.timestamp_index().unwrap());
479 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
480 assert_eq!(123, schema.version());
481
482 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
483 assert_eq!(1, schema.timestamp_index().unwrap());
484 assert_eq!(schema, new_schema);
485 }
486
487 #[test]
488 fn test_schema_wrong_timestamp() {
489 let column_schemas = vec![
490 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
491 .with_time_index(true),
492 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
493 ];
494 assert!(
495 SchemaBuilder::try_from(column_schemas)
496 .unwrap()
497 .build()
498 .is_err()
499 );
500
501 let column_schemas = vec![
502 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
503 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
504 .with_time_index(true),
505 ];
506
507 assert!(
508 SchemaBuilder::try_from(column_schemas)
509 .unwrap()
510 .build()
511 .is_err()
512 );
513 }
514}