1mod column_schema;
16pub mod constraint;
17
18use std::collections::HashMap;
19use std::fmt;
20use std::sync::Arc;
21
22use arrow::datatypes::{Field, Schema as ArrowSchema};
23use datafusion_common::DFSchemaRef;
24use serde::{Deserialize, Deserializer, Serialize};
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30 COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COLUMN_VECTOR_INDEX_OPT_KEY_CONNECTIVITY,
35 COLUMN_VECTOR_INDEX_OPT_KEY_ENGINE, COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_ADD,
36 COLUMN_VECTOR_INDEX_OPT_KEY_EXPANSION_SEARCH, COLUMN_VECTOR_INDEX_OPT_KEY_METRIC, COMMENT_KEY,
37 ColumnExtType, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, FulltextBackend, FulltextOptions,
38 INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
39 TIME_INDEX_KEY, VECTOR_INDEX_KEY, VectorDistanceMetric, VectorIndexEngineType,
40 VectorIndexOptions,
41};
42pub use crate::schema::constraint::ColumnDefaultConstraint;
43
44pub const VERSION_KEY: &str = "greptime:version";
46pub const TYPE_KEY: &str = "greptime:type";
48
49#[derive(Clone, PartialEq, Eq, Serialize)]
51pub struct Schema {
52 column_schemas: Vec<ColumnSchema>,
53 #[serde(skip_serializing)]
54 name_to_index: HashMap<String, usize>,
55 #[serde(skip_serializing)]
56 arrow_schema: Arc<ArrowSchema>,
57 #[serde(skip_serializing)]
62 timestamp_index: Option<usize>,
63 version: u32,
67}
68
69impl<'de> Deserialize<'de> for Schema {
70 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
71 where
72 D: Deserializer<'de>,
73 {
74 use serde::de::Error;
75
76 #[derive(Deserialize)]
77 struct RawSchema {
78 column_schemas: Vec<ColumnSchema>,
79 version: u32,
80 }
81 let raw = RawSchema::deserialize(deserializer)?;
82
83 SchemaBuilder::try_from(raw.column_schemas)
84 .and_then(|x| x.version(raw.version).build())
85 .map_err(D::Error::custom)
86 }
87}
88
89impl fmt::Debug for Schema {
90 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91 f.debug_struct("Schema")
92 .field("column_schemas", &self.column_schemas)
93 .field("name_to_index", &self.name_to_index)
94 .field("timestamp_index", &self.timestamp_index)
95 .field("version", &self.version)
96 .finish()
97 }
98}
99
100impl Schema {
101 pub const INITIAL_VERSION: u32 = 0;
103
104 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
109 Self::new_with_version(column_schemas, Self::INITIAL_VERSION)
111 }
112
113 pub fn new_with_version(column_schemas: Vec<ColumnSchema>, version: u32) -> Schema {
114 SchemaBuilder::try_from(column_schemas)
115 .unwrap()
116 .version(version)
117 .build()
118 .unwrap()
119 }
120
121 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
123 SchemaBuilder::try_from(column_schemas)?.build()
124 }
125
126 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
127 &self.arrow_schema
128 }
129
130 pub fn column_schemas(&self) -> &[ColumnSchema] {
131 &self.column_schemas
132 }
133
134 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
135 self.name_to_index
136 .get(name)
137 .map(|index| &self.column_schemas[*index])
138 }
139
140 pub fn column_name_by_index(&self, idx: usize) -> &str {
144 &self.column_schemas[idx].name
145 }
146
147 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
148 self.name_to_index.get(name).copied()
149 }
150
151 pub fn contains_column(&self, name: &str) -> bool {
152 self.name_to_index.contains_key(name)
153 }
154
155 pub fn num_columns(&self) -> usize {
156 self.column_schemas.len()
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.column_schemas.is_empty()
161 }
162
163 pub fn timestamp_index(&self) -> Option<usize> {
165 self.timestamp_index
166 }
167
168 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
169 self.timestamp_index.map(|idx| &self.column_schemas[idx])
170 }
171
172 pub fn version(&self) -> u32 {
173 self.version
174 }
175
176 pub fn metadata(&self) -> &HashMap<String, String> {
177 &self.arrow_schema.metadata
178 }
179
180 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
186 let mut column_schemas = Vec::with_capacity(indices.len());
187 let mut timestamp_index = None;
188 for index in indices {
189 if let Some(ts_index) = self.timestamp_index
190 && ts_index == *index
191 {
192 timestamp_index = Some(column_schemas.len());
193 }
194 column_schemas.push(self.column_schemas[*index].clone());
195 }
196 let arrow_schema = self
197 .arrow_schema
198 .project(indices)
199 .context(ProjectArrowSchemaSnafu)?;
200 let name_to_index = column_schemas
201 .iter()
202 .enumerate()
203 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
204 .collect();
205
206 Ok(Self {
207 column_schemas,
208 name_to_index,
209 arrow_schema: Arc::new(arrow_schema),
210 timestamp_index,
211 version: self.version,
212 })
213 }
214}
215
216#[derive(Default)]
217pub struct SchemaBuilder {
218 column_schemas: Vec<ColumnSchema>,
219 name_to_index: HashMap<String, usize>,
220 fields: Vec<Field>,
221 timestamp_index: Option<usize>,
222 version: u32,
223 metadata: HashMap<String, String>,
224}
225
226impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
227 type Error = Error;
228
229 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
230 SchemaBuilder::try_from_columns(column_schemas)
231 }
232}
233
234impl SchemaBuilder {
235 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
236 let FieldsAndIndices {
237 fields,
238 name_to_index,
239 timestamp_index,
240 } = collect_fields(&column_schemas)?;
241
242 Ok(Self {
243 column_schemas,
244 name_to_index,
245 fields,
246 timestamp_index,
247 ..Default::default()
248 })
249 }
250
251 pub fn version(mut self, version: u32) -> Self {
252 self.version = version;
253 self
254 }
255
256 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
260 let _ = self.metadata.insert(key.into(), value.into());
261 self
262 }
263
264 pub fn build(mut self) -> Result<Schema> {
265 if let Some(timestamp_index) = self.timestamp_index {
266 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
267 }
268
269 self.metadata
270 .insert(VERSION_KEY.to_string(), self.version.to_string());
271
272 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
273
274 Ok(Schema {
275 column_schemas: self.column_schemas,
276 name_to_index: self.name_to_index,
277 arrow_schema: Arc::new(arrow_schema),
278 timestamp_index: self.timestamp_index,
279 version: self.version,
280 })
281 }
282}
283
284struct FieldsAndIndices {
285 fields: Vec<Field>,
286 name_to_index: HashMap<String, usize>,
287 timestamp_index: Option<usize>,
288}
289
290fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
291 let mut fields = Vec::with_capacity(column_schemas.len());
292 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
293 let mut timestamp_index = None;
294 for (index, column_schema) in column_schemas.iter().enumerate() {
295 if column_schema.is_time_index() && timestamp_index.is_none() {
296 timestamp_index = Some(index);
297 }
298 let mut field = Field::try_from(column_schema)?;
299
300 let extype = match column_schema.data_type {
302 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
303 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
304 _ => None,
305 };
306 if let Some(extype) = extype {
307 field
308 .metadata_mut()
309 .insert(TYPE_KEY.to_string(), extype.to_string());
310 }
311 fields.push(field);
312 ensure!(
313 name_to_index
314 .insert(column_schema.name.clone(), index)
315 .is_none(),
316 DuplicateColumnSnafu {
317 column: &column_schema.name,
318 }
319 );
320 }
321
322 Ok(FieldsAndIndices {
323 fields,
324 name_to_index,
325 timestamp_index,
326 })
327}
328
329fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
330 ensure!(
331 timestamp_index < column_schemas.len(),
332 error::InvalidTimestampIndexSnafu {
333 index: timestamp_index,
334 }
335 );
336
337 let column_schema = &column_schemas[timestamp_index];
338 ensure!(
339 column_schema.data_type.is_timestamp(),
340 error::InvalidTimestampIndexSnafu {
341 index: timestamp_index,
342 }
343 );
344 ensure!(
345 column_schema.is_time_index(),
346 error::InvalidTimestampIndexSnafu {
347 index: timestamp_index,
348 }
349 );
350
351 Ok(())
352}
353
354pub type SchemaRef = Arc<Schema>;
355
356impl TryFrom<Arc<ArrowSchema>> for Schema {
357 type Error = Error;
358
359 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
360 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
361 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
362 for field in &arrow_schema.fields {
363 let column_schema = ColumnSchema::try_from(field.as_ref())?;
364 let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
365 column_schemas.push(column_schema);
366 }
367
368 let mut timestamp_index = None;
369 for (index, column_schema) in column_schemas.iter().enumerate() {
370 if column_schema.is_time_index() {
371 validate_timestamp_index(&column_schemas, index)?;
372 timestamp_index = Some(index);
373 break;
374 }
375 }
376
377 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
378
379 Ok(Self {
380 column_schemas,
381 name_to_index,
382 arrow_schema,
383 timestamp_index,
384 version,
385 })
386 }
387}
388
389impl TryFrom<ArrowSchema> for Schema {
390 type Error = Error;
391
392 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
393 let arrow_schema = Arc::new(arrow_schema);
394
395 Schema::try_from(arrow_schema)
396 }
397}
398
399impl TryFrom<DFSchemaRef> for Schema {
400 type Error = Error;
401
402 fn try_from(value: DFSchemaRef) -> Result<Self> {
403 value.inner().clone().try_into()
404 }
405}
406
407fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
408 if let Some(value) = metadata.get(key) {
409 let version = value
410 .parse()
411 .context(error::ParseSchemaVersionSnafu { value })?;
412
413 Ok(version)
414 } else {
415 Ok(Schema::INITIAL_VERSION)
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use crate::data_type::ConcreteDataType;
423
424 #[test]
425 fn test_build_empty_schema() {
426 let schema = SchemaBuilder::default().build().unwrap();
427 assert_eq!(0, schema.num_columns());
428 assert!(schema.is_empty());
429 }
430
431 #[test]
432 fn test_schema_no_timestamp() {
433 let column_schemas = vec![
434 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
435 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
436 ];
437 let schema = Schema::new(column_schemas.clone());
438
439 assert_eq!(2, schema.num_columns());
440 assert!(!schema.is_empty());
441 assert!(schema.timestamp_index().is_none());
442 assert!(schema.timestamp_column().is_none());
443 assert_eq!(Schema::INITIAL_VERSION, schema.version());
444
445 for column_schema in &column_schemas {
446 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
447 assert_eq!(column_schema, found);
448 }
449 assert!(schema.column_schema_by_name("col3").is_none());
450
451 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
452
453 assert_eq!(schema, new_schema);
454 assert_eq!(column_schemas, schema.column_schemas());
455 }
456
457 #[test]
458 fn test_schema_duplicate_column() {
459 let column_schemas = vec![
460 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
461 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
462 ];
463 let err = Schema::try_new(column_schemas).unwrap_err();
464
465 assert!(
466 matches!(err, Error::DuplicateColumn { .. }),
467 "expect DuplicateColumn, found {}",
468 err
469 );
470 }
471
472 #[test]
473 fn test_metadata() {
474 let column_schemas = vec![ColumnSchema::new(
475 "col1",
476 ConcreteDataType::int32_datatype(),
477 false,
478 )];
479 let schema = SchemaBuilder::try_from(column_schemas)
480 .unwrap()
481 .add_metadata("k1", "v1")
482 .build()
483 .unwrap();
484
485 assert_eq!("v1", schema.metadata().get("k1").unwrap());
486 }
487
488 #[test]
489 fn test_schema_with_timestamp() {
490 let column_schemas = vec![
491 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
492 ColumnSchema::new(
493 "ts",
494 ConcreteDataType::timestamp_millisecond_datatype(),
495 false,
496 )
497 .with_time_index(true),
498 ];
499 let schema = SchemaBuilder::try_from(column_schemas.clone())
500 .unwrap()
501 .version(123)
502 .build()
503 .unwrap();
504
505 assert_eq!(1, schema.timestamp_index().unwrap());
506 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
507 assert_eq!(123, schema.version());
508
509 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
510 assert_eq!(1, schema.timestamp_index().unwrap());
511 assert_eq!(schema, new_schema);
512 }
513
514 #[test]
515 fn test_schema_wrong_timestamp() {
516 let column_schemas = vec![
517 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
518 .with_time_index(true),
519 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
520 ];
521 assert!(
522 SchemaBuilder::try_from(column_schemas)
523 .unwrap()
524 .build()
525 .is_err()
526 );
527
528 let column_schemas = vec![
529 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
530 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
531 .with_time_index(true),
532 ];
533
534 assert!(
535 SchemaBuilder::try_from(column_schemas)
536 .unwrap()
537 .build()
538 .is_err()
539 );
540 }
541}