1mod column_schema;
16pub mod constraint;
17mod raw;
18
19use std::collections::HashMap;
20use std::fmt;
21use std::sync::Arc;
22
23use arrow::datatypes::{Field, Schema as ArrowSchema};
24use column_schema::ColumnExtType;
25use datafusion_common::DFSchemaRef;
26use snafu::{ensure, ResultExt};
27
28use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
29use crate::prelude::ConcreteDataType;
30pub use crate::schema::column_schema::{
31 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
32 SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
33 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
34 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
35 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
36 SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
37};
38pub use crate::schema::constraint::ColumnDefaultConstraint;
39pub use crate::schema::raw::RawSchema;
40
41pub const VERSION_KEY: &str = "greptime:version";
43pub const TYPE_KEY: &str = "greptime:type";
45
46#[derive(Clone, PartialEq, Eq)]
48pub struct Schema {
49 column_schemas: Vec<ColumnSchema>,
50 name_to_index: HashMap<String, usize>,
51 arrow_schema: Arc<ArrowSchema>,
52 timestamp_index: Option<usize>,
57 version: u32,
61}
62
63impl fmt::Debug for Schema {
64 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
65 f.debug_struct("Schema")
66 .field("column_schemas", &self.column_schemas)
67 .field("name_to_index", &self.name_to_index)
68 .field("timestamp_index", &self.timestamp_index)
69 .field("version", &self.version)
70 .finish()
71 }
72}
73
74impl Schema {
75 pub const INITIAL_VERSION: u32 = 0;
77
78 pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
83 SchemaBuilder::try_from(column_schemas)
85 .unwrap()
86 .build()
87 .unwrap()
88 }
89
90 pub fn try_new(column_schemas: Vec<ColumnSchema>) -> Result<Schema> {
92 SchemaBuilder::try_from(column_schemas)?.build()
93 }
94
95 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
96 &self.arrow_schema
97 }
98
99 pub fn column_schemas(&self) -> &[ColumnSchema] {
100 &self.column_schemas
101 }
102
103 pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
104 self.name_to_index
105 .get(name)
106 .map(|index| &self.column_schemas[*index])
107 }
108
109 pub fn column_name_by_index(&self, idx: usize) -> &str {
113 &self.column_schemas[idx].name
114 }
115
116 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
117 self.name_to_index.get(name).copied()
118 }
119
120 pub fn contains_column(&self, name: &str) -> bool {
121 self.name_to_index.contains_key(name)
122 }
123
124 pub fn num_columns(&self) -> usize {
125 self.column_schemas.len()
126 }
127
128 pub fn is_empty(&self) -> bool {
129 self.column_schemas.is_empty()
130 }
131
132 pub fn timestamp_index(&self) -> Option<usize> {
134 self.timestamp_index
135 }
136
137 pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
138 self.timestamp_index.map(|idx| &self.column_schemas[idx])
139 }
140
141 pub fn version(&self) -> u32 {
142 self.version
143 }
144
145 pub fn metadata(&self) -> &HashMap<String, String> {
146 &self.arrow_schema.metadata
147 }
148
149 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
155 let mut column_schemas = Vec::with_capacity(indices.len());
156 let mut timestamp_index = None;
157 for index in indices {
158 if let Some(ts_index) = self.timestamp_index
159 && ts_index == *index
160 {
161 timestamp_index = Some(column_schemas.len());
162 }
163 column_schemas.push(self.column_schemas[*index].clone());
164 }
165 let arrow_schema = self
166 .arrow_schema
167 .project(indices)
168 .context(ProjectArrowSchemaSnafu)?;
169 let name_to_index = column_schemas
170 .iter()
171 .enumerate()
172 .map(|(pos, column_schema)| (column_schema.name.clone(), pos))
173 .collect();
174
175 Ok(Self {
176 column_schemas,
177 name_to_index,
178 arrow_schema: Arc::new(arrow_schema),
179 timestamp_index,
180 version: self.version,
181 })
182 }
183}
184
185#[derive(Default)]
186pub struct SchemaBuilder {
187 column_schemas: Vec<ColumnSchema>,
188 name_to_index: HashMap<String, usize>,
189 fields: Vec<Field>,
190 timestamp_index: Option<usize>,
191 version: u32,
192 metadata: HashMap<String, String>,
193}
194
195impl TryFrom<Vec<ColumnSchema>> for SchemaBuilder {
196 type Error = Error;
197
198 fn try_from(column_schemas: Vec<ColumnSchema>) -> Result<SchemaBuilder> {
199 SchemaBuilder::try_from_columns(column_schemas)
200 }
201}
202
203impl SchemaBuilder {
204 pub fn try_from_columns(column_schemas: Vec<ColumnSchema>) -> Result<Self> {
205 let FieldsAndIndices {
206 fields,
207 name_to_index,
208 timestamp_index,
209 } = collect_fields(&column_schemas)?;
210
211 Ok(Self {
212 column_schemas,
213 name_to_index,
214 fields,
215 timestamp_index,
216 ..Default::default()
217 })
218 }
219
220 pub fn version(mut self, version: u32) -> Self {
221 self.version = version;
222 self
223 }
224
225 pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
229 let _ = self.metadata.insert(key.into(), value.into());
230 self
231 }
232
233 pub fn build(mut self) -> Result<Schema> {
234 if let Some(timestamp_index) = self.timestamp_index {
235 validate_timestamp_index(&self.column_schemas, timestamp_index)?;
236 }
237
238 self.metadata
239 .insert(VERSION_KEY.to_string(), self.version.to_string());
240
241 let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
242
243 Ok(Schema {
244 column_schemas: self.column_schemas,
245 name_to_index: self.name_to_index,
246 arrow_schema: Arc::new(arrow_schema),
247 timestamp_index: self.timestamp_index,
248 version: self.version,
249 })
250 }
251}
252
253struct FieldsAndIndices {
254 fields: Vec<Field>,
255 name_to_index: HashMap<String, usize>,
256 timestamp_index: Option<usize>,
257}
258
259fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
260 let mut fields = Vec::with_capacity(column_schemas.len());
261 let mut name_to_index = HashMap::with_capacity(column_schemas.len());
262 let mut timestamp_index = None;
263 for (index, column_schema) in column_schemas.iter().enumerate() {
264 if column_schema.is_time_index() && timestamp_index.is_none() {
265 timestamp_index = Some(index);
266 }
267 let mut field = Field::try_from(column_schema)?;
268
269 let extype = match column_schema.data_type {
271 ConcreteDataType::Json(_) => Some(ColumnExtType::Json),
272 ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)),
273 _ => None,
274 };
275 if let Some(extype) = extype {
276 let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]);
277 field = field.with_metadata(metadata);
278 }
279 fields.push(field);
280 ensure!(
281 name_to_index
282 .insert(column_schema.name.clone(), index)
283 .is_none(),
284 DuplicateColumnSnafu {
285 column: &column_schema.name,
286 }
287 );
288 }
289
290 Ok(FieldsAndIndices {
291 fields,
292 name_to_index,
293 timestamp_index,
294 })
295}
296
297fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
298 ensure!(
299 timestamp_index < column_schemas.len(),
300 error::InvalidTimestampIndexSnafu {
301 index: timestamp_index,
302 }
303 );
304
305 let column_schema = &column_schemas[timestamp_index];
306 ensure!(
307 column_schema.data_type.is_timestamp(),
308 error::InvalidTimestampIndexSnafu {
309 index: timestamp_index,
310 }
311 );
312 ensure!(
313 column_schema.is_time_index(),
314 error::InvalidTimestampIndexSnafu {
315 index: timestamp_index,
316 }
317 );
318
319 Ok(())
320}
321
322pub type SchemaRef = Arc<Schema>;
323
324impl TryFrom<Arc<ArrowSchema>> for Schema {
325 type Error = Error;
326
327 fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
328 let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
329 let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
330 for field in &arrow_schema.fields {
331 let column_schema = ColumnSchema::try_from(field.as_ref())?;
332 let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
333 column_schemas.push(column_schema);
334 }
335
336 let mut timestamp_index = None;
337 for (index, column_schema) in column_schemas.iter().enumerate() {
338 if column_schema.is_time_index() {
339 validate_timestamp_index(&column_schemas, index)?;
340 timestamp_index = Some(index);
341 break;
342 }
343 }
344
345 let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
346
347 Ok(Self {
348 column_schemas,
349 name_to_index,
350 arrow_schema,
351 timestamp_index,
352 version,
353 })
354 }
355}
356
357impl TryFrom<ArrowSchema> for Schema {
358 type Error = Error;
359
360 fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
361 let arrow_schema = Arc::new(arrow_schema);
362
363 Schema::try_from(arrow_schema)
364 }
365}
366
367impl TryFrom<DFSchemaRef> for Schema {
368 type Error = Error;
369
370 fn try_from(value: DFSchemaRef) -> Result<Self> {
371 let s: ArrowSchema = value.as_ref().into();
372 s.try_into()
373 }
374}
375
376fn try_parse_version(metadata: &HashMap<String, String>, key: &str) -> Result<u32> {
377 if let Some(value) = metadata.get(key) {
378 let version = value
379 .parse()
380 .context(error::ParseSchemaVersionSnafu { value })?;
381
382 Ok(version)
383 } else {
384 Ok(Schema::INITIAL_VERSION)
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::data_type::ConcreteDataType;
392
393 #[test]
394 fn test_build_empty_schema() {
395 let schema = SchemaBuilder::default().build().unwrap();
396 assert_eq!(0, schema.num_columns());
397 assert!(schema.is_empty());
398 }
399
400 #[test]
401 fn test_schema_no_timestamp() {
402 let column_schemas = vec![
403 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
404 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
405 ];
406 let schema = Schema::new(column_schemas.clone());
407
408 assert_eq!(2, schema.num_columns());
409 assert!(!schema.is_empty());
410 assert!(schema.timestamp_index().is_none());
411 assert!(schema.timestamp_column().is_none());
412 assert_eq!(Schema::INITIAL_VERSION, schema.version());
413
414 for column_schema in &column_schemas {
415 let found = schema.column_schema_by_name(&column_schema.name).unwrap();
416 assert_eq!(column_schema, found);
417 }
418 assert!(schema.column_schema_by_name("col3").is_none());
419
420 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
421
422 assert_eq!(schema, new_schema);
423 assert_eq!(column_schemas, schema.column_schemas());
424 }
425
426 #[test]
427 fn test_schema_duplicate_column() {
428 let column_schemas = vec![
429 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
430 ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
431 ];
432 let err = Schema::try_new(column_schemas).unwrap_err();
433
434 assert!(
435 matches!(err, Error::DuplicateColumn { .. }),
436 "expect DuplicateColumn, found {}",
437 err
438 );
439 }
440
441 #[test]
442 fn test_metadata() {
443 let column_schemas = vec![ColumnSchema::new(
444 "col1",
445 ConcreteDataType::int32_datatype(),
446 false,
447 )];
448 let schema = SchemaBuilder::try_from(column_schemas)
449 .unwrap()
450 .add_metadata("k1", "v1")
451 .build()
452 .unwrap();
453
454 assert_eq!("v1", schema.metadata().get("k1").unwrap());
455 }
456
457 #[test]
458 fn test_schema_with_timestamp() {
459 let column_schemas = vec![
460 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
461 ColumnSchema::new(
462 "ts",
463 ConcreteDataType::timestamp_millisecond_datatype(),
464 false,
465 )
466 .with_time_index(true),
467 ];
468 let schema = SchemaBuilder::try_from(column_schemas.clone())
469 .unwrap()
470 .version(123)
471 .build()
472 .unwrap();
473
474 assert_eq!(1, schema.timestamp_index().unwrap());
475 assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
476 assert_eq!(123, schema.version());
477
478 let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
479 assert_eq!(1, schema.timestamp_index().unwrap());
480 assert_eq!(schema, new_schema);
481 }
482
483 #[test]
484 fn test_schema_wrong_timestamp() {
485 let column_schemas = vec![
486 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true)
487 .with_time_index(true),
488 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
489 ];
490 assert!(SchemaBuilder::try_from(column_schemas)
491 .unwrap()
492 .build()
493 .is_err());
494
495 let column_schemas = vec![
496 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
497 ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false)
498 .with_time_index(true),
499 ];
500
501 assert!(SchemaBuilder::try_from(column_schemas)
502 .unwrap()
503 .build()
504 .is_err());
505 }
506}