1mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use datafusion_common::DFSchemaRef;
25use snafu::{ResultExt, ensure};
26
27use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
28use crate::prelude::ConcreteDataType;
29pub use crate::schema::column_schema::{
30 COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
31 COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE,
32 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
33 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
34 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
35 FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
36 SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY, VECTOR_INDEX_KEY,
37 VectorDistanceMetric, VectorIndexEngineType, VectorIndexOptions,
38};
39pub use crate::schema::constraint::ColumnDefaultConstraint;
40pub use crate::schema::raw::RawSchema;
41
42pub const VERSION_KEY: &str = "greptime:version";
44pub const TYPE_KEY: &str = "greptime:type";
46
47#[derive(Clone, PartialEq, Eq)]
49pub struct Schema {
50 column_schemas: Vec<ColumnSchema>,
51 name_to_index: HashMap<String, usize>,
52 arrow_schema: Arc<ArrowSchema>,
53 timestamp_index: Option<usize>,
58 version: u32,
62}
63
64impl fmt::Debug for Schema {
65 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
66 f.debug_struct("Schema")
67 .field("column_schemas", &self.column_schemas)
68 .field("name_to_index", &self.name_to_index)
69 .field("timestamp_index", &self.timestamp_index)
70 .field("version", &self.version)
71 .finish()
72 }
73}
74
75impl Schema {
76 pub const INITIAL_VERSION: u32 = 0;
78
79 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
84 SchemaBuilder::try_from(column_schemas)
86 .unwrap()
87 .build()
88 .unwrap()
89 }
90
91 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
93 SchemaBuilder::try_from(column_schemas)?.build()
94 }
95
96 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
97 &self.arrow_schema
98 }
99
100 pub fn column_schemas(&self) -> &[ColumnSchema] {
101 &self.column_schemas
102 }
103
104 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
105 self.name_to_index
106 .get(name)
107 .map(|index| &self.column_schemas[*index])
108 }
109
110 pub fn column_name_by_index(&self, idx: usize) -> &str {
114 &self.column_schemas[idx].name
115 }
116
117 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
118 self.name_to_index.get(name).copied()
119 }
120
121 pub fn contains_column(&self, name: &str) -> bool {
122 self.name_to_index.contains_key(name)
123 }
124
125 pub fn num_columns(&self) -> usize {
126 self.column_schemas.len()
127 }
128
129 pub fn is_empty(&self) -> bool {
130 self.column_schemas.is_empty()
131 }
132
133 pub fn timestamp_index(&self) -> Option<usize> {
135 self.timestamp_index
136 }
137
138 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
139 self.timestamp_index.map(|idx| &self.column_schemas[idx])
140 }
141
142 pub fn version(&self) -> u32 {
143 self.version
144 }
145
146 pub fn metadata(&self) -> &HashMap<String, String> {
147 &self.arrow_schema.metadata
148 }
149
150 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
156 let mut column_schemas = Vec::with_capacity(indices.len());
157 let mut timestamp_index = None;
158 for index in indices {
159 if let Some(ts_index) = self.timestamp_index
160 && ts_index == *index
161 {
162 timestamp_index = Some(column_schemas.len());
163 }
164 column_schemas.push(self.column_schemas[*index].clone());
165 }
166 let arrow_schema = self
167 .arrow_schema
168 .project(indices)
169 .context(ProjectArrowSchemaSnafu)?;
170 let name_to_index = column_schemas
171 .iter()
172 .enumerate()
173 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
174 .collect();
175
176 Ok(Self {
177 column_schemas,
178 name_to_index,
179 arrow_schema: Arc::new(arrow_schema),
180 timestamp_index,
181 version: self.version,
182 })
183 }
184}
185
186#[derive(Default)]
187pub struct SchemaBuilder {
188 column_schemas: Vec<ColumnSchema>,
189 name_to_index: HashMap<String, usize>,
190 fields: Vec<Field>,
191 timestamp_index: Option<usize>,
192 version: u32,
193 metadata: HashMap<String, String>,
194}
195
196impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
197 type Error = Error;
198
199 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
200 SchemaBuilder::try_from_columns(column_schemas)
201 }
202}
203
204impl SchemaBuilder {
205 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
206 let FieldsAndIndices {
207 fields,
208 name_to_index,
209 timestamp_index,
210 } = collect_fields(&column_schemas)?;
211
212 Ok(Self {
213 column_schemas,
214 name_to_index,
215 fields,
216 timestamp_index,
217 ..Default::default()
218 })
219 }
220
221 pub fn version(mut self, version: u32) -> Self {
222 self.version = version;
223 self
224 }
225
226 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
230 let _ = self.metadata.insert(key.into(), value.into());
231 self
232 }
233
234 pub fn build(mut self) -> Result<Schema> {
235 if let Some(timestamp_index) = self.timestamp_index {
236 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
237 }
238
239 self.metadata
240 .insert(VERSION_KEY.to_string(), self.version.to_string());
241
242 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
243
244 Ok(Schema {
245 column_schemas: self.column_schemas,
246 name_to_index: self.name_to_index,
247 arrow_schema: Arc::new(arrow_schema),
248 timestamp_index: self.timestamp_index,
249 version: self.version,
250 })
251 }
252}
253
254struct FieldsAndIndices {
255 fields: Vec<Field>,
256 name_to_index: HashMap<String, usize>,
257 timestamp_index: Option<usize>,
258}
259
260fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
261 let mut fields = Vec::with_capacity(column_schemas.len());
262 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
263 let mut timestamp_index = None;
264 for (index, column_schema) in column_schemas.iter().enumerate() {
265 if column_schema.is_time_index() && timestamp_index.is_none() {
266 timestamp_index = Some(index);
267 }
268 let mut field = Field::try_from(column_schema)?;
269
270 let extype = match column_schema.data_type {
272 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
273 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
274 _ => None,
275 };
276 if let Some(extype) = extype {
277 field
278 .metadata_mut()
279 .insert(TYPE_KEY.to_string(), extype.to_string());
280 }
281 fields.push(field);
282 ensure!(
283 name_to_index
284 .insert(column_schema.name.clone(), index)
285 .is_none(),
286 DuplicateColumnSnafu {
287 column: &column_schema.name,
288 }
289 );
290 }
291
292 Ok(FieldsAndIndices {
293 fields,
294 name_to_index,
295 timestamp_index,
296 })
297}
298
299fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
300 ensure!(
301 timestamp_index < column_schemas.len(),
302 error::InvalidTimestampIndexSnafu {
303 index: timestamp_index,
304 }
305 );
306
307 let column_schema = &column_schemas[timestamp_index];
308 ensure!(
309 column_schema.data_type.is_timestamp(),
310 error::InvalidTimestampIndexSnafu {
311 index: timestamp_index,
312 }
313 );
314 ensure!(
315 column_schema.is_time_index(),
316 error::InvalidTimestampIndexSnafu {
317 index: timestamp_index,
318 }
319 );
320
321 Ok(())
322}
323
324pub type SchemaRef = Arc<Schema>;
325
326impl TryFrom<Arc<ArrowSchema>> for Schema {
327 type Error = Error;
328
329 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
330 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
331 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
332 for field in &arrow_schema.fields {
333 let column_schema = ColumnSchema::try_from(field.as_ref())?;
334 let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
335 column_schemas.push(column_schema);
336 }
337
338 let mut timestamp_index = None;
339 for (index, column_schema) in column_schemas.iter().enumerate() {
340 if column_schema.is_time_index() {
341 validate_timestamp_index(&column_schemas, index)?;
342 timestamp_index = Some(index);
343 break;
344 }
345 }
346
347 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
348
349 Ok(Self {
350 column_schemas,
351 name_to_index,
352 arrow_schema,
353 timestamp_index,
354 version,
355 })
356 }
357}
358
359impl TryFrom<ArrowSchema> for Schema {
360 type Error = Error;
361
362 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
363 let arrow_schema = Arc::new(arrow_schema);
364
365 Schema::try_from(arrow_schema)
366 }
367}
368
369impl TryFrom<DFSchemaRef> for Schema {
370 type Error = Error;
371
372 fn try_from(value: DFSchemaRef) -> Result<Self> {
373 value.inner().clone().try_into()
374 }
375}
376
377fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
378 if let Some(value) = metadata.get(key) {
379 let version = value
380 .parse()
381 .context(error::ParseSchemaVersionSnafu { value })?;
382
383 Ok(version)
384 } else {
385 Ok(Schema::INITIAL_VERSION)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::data_type::ConcreteDataType;
393
394 #[test]
395 fn test_build_empty_schema() {
396 let schema = SchemaBuilder::default().build().unwrap();
397 assert_eq!(0, schema.num_columns());
398 assert!(schema.is_empty());
399 }
400
401 #[test]
402 fn test_schema_no_timestamp() {
403 let column_schemas = vec![
404 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
405 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
406 ];
407 let schema = Schema::new(column_schemas.clone());
408
409 assert_eq!(2, schema.num_columns());
410 assert!(!schema.is_empty());
411 assert!(schema.timestamp_index().is_none());
412 assert!(schema.timestamp_column().is_none());
413 assert_eq!(Schema::INITIAL_VERSION, schema.version());
414
415 for column_schema in &column_schemas {
416 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
417 assert_eq!(column_schema, found);
418 }
419 assert!(schema.column_schema_by_name("col3").is_none());
420
421 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
422
423 assert_eq!(schema, new_schema);
424 assert_eq!(column_schemas, schema.column_schemas());
425 }
426
427 #[test]
428 fn test_schema_duplicate_column() {
429 let column_schemas = vec![
430 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
431 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
432 ];
433 let err = Schema::try_new(column_schemas).unwrap_err();
434
435 assert!(
436 matches!(err, Error::DuplicateColumn { .. }),
437 "expect DuplicateColumn, found {}",
438 err
439 );
440 }
441
442 #[test]
443 fn test_metadata() {
444 let column_schemas = vec![ColumnSchema::new(
445 "col1",
446 ConcreteDataType::int32_datatype(),
447 false,
448 )];
449 let schema = SchemaBuilder::try_from(column_schemas)
450 .unwrap()
451 .add_metadata("k1", "v1")
452 .build()
453 .unwrap();
454
455 assert_eq!("v1", schema.metadata().get("k1").unwrap());
456 }
457
458 #[test]
459 fn test_schema_with_timestamp() {
460 let column_schemas = vec![
461 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
462 ColumnSchema::new(
463 "ts",
464 ConcreteDataType::timestamp_millisecond_datatype(),
465 false,
466 )
467 .with_time_index(true),
468 ];
469 let schema = SchemaBuilder::try_from(column_schemas.clone())
470 .unwrap()
471 .version(123)
472 .build()
473 .unwrap();
474
475 assert_eq!(1, schema.timestamp_index().unwrap());
476 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
477 assert_eq!(123, schema.version());
478
479 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
480 assert_eq!(1, schema.timestamp_index().unwrap());
481 assert_eq!(schema, new_schema);
482 }
483
484 #[test]
485 fn test_schema_wrong_timestamp() {
486 let column_schemas = vec![
487 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
488 .with_time_index(true),
489 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
490 ];
491 assert!(
492 SchemaBuilder::try_from(column_schemas)
493 .unwrap()
494 .build()
495 .is_err()
496 );
497
498 let column_schemas = vec![
499 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
500 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
501 .with_time_index(true),
502 ];
503
504 assert!(
505 SchemaBuilder::try_from(column_schemas)
506 .unwrap()
507 .build()
508 .is_err()
509 );
510 }
511}