1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26 CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27 OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49 RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63 Bare,
67 Data,
71 Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79 Create(Vec<(RegionId, RegionCreateRequest)>),
80 Drop(Vec<(RegionId, RegionDropRequest)>),
81 Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87 match body {
88 region_request::Body::Creates(creates) => {
89 let requests = creates
90 .requests
91 .into_iter()
92 .map(parse_region_create)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Some(Self::Create(requests)))
95 }
96 region_request::Body::Drops(drops) => {
97 let requests = drops
98 .requests
99 .into_iter()
100 .map(parse_region_drop)
101 .collect::<Result<Vec<_>>>()?;
102 Ok(Some(Self::Drop(requests)))
103 }
104 region_request::Body::Alters(alters) => {
105 let requests = alters
106 .requests
107 .into_iter()
108 .map(parse_region_alter)
109 .collect::<Result<Vec<_>>>()?;
110 Ok(Some(Self::Alter(requests)))
111 }
112 _ => Ok(None),
113 }
114 }
115
116 pub fn request_type(&self) -> &'static str {
117 self.into()
118 }
119
120 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121 match self {
122 Self::Create(requests) => requests
123 .into_iter()
124 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125 .collect(),
126 Self::Drop(requests) => requests
127 .into_iter()
128 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129 .collect(),
130 Self::Alter(requests) => requests
131 .into_iter()
132 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133 .collect(),
134 }
135 }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140 Put(RegionPutRequest),
141 Delete(RegionDeleteRequest),
142 Create(RegionCreateRequest),
143 Drop(RegionDropRequest),
144 Open(RegionOpenRequest),
145 Close(RegionCloseRequest),
146 Alter(RegionAlterRequest),
147 Flush(RegionFlushRequest),
148 Compact(RegionCompactRequest),
149 Truncate(RegionTruncateRequest),
150 Catchup(RegionCatchupRequest),
151 BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158 match body {
159 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161 region_request::Body::Create(create) => make_region_create(create),
162 region_request::Body::Drop(drop) => make_region_drop(drop),
163 region_request::Body::Open(open) => make_region_open(open),
164 region_request::Body::Close(close) => make_region_close(close),
165 region_request::Body::Alter(alter) => make_region_alter(alter),
166 region_request::Body::Flush(flush) => make_region_flush(flush),
167 region_request::Body::Compact(compact) => make_region_compact(compact),
168 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169 region_request::Body::Creates(creates) => make_region_creates(creates),
170 region_request::Body::Drops(drops) => make_region_drops(drops),
171 region_request::Body::Alters(alters) => make_region_alters(alters),
172 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173 region_request::Body::Sync(_) => UnexpectedSnafu {
174 reason: "Sync request should be handled separately by RegionServer",
175 }
176 .fail(),
177 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178 reason: "ListMetadata request should be handled separately by RegionServer",
179 }
180 .fail(),
181 }
182 }
183
184 pub fn request_type(&self) -> &'static str {
186 self.into()
187 }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191 let requests = inserts
192 .requests
193 .into_iter()
194 .filter_map(|r| {
195 let region_id = r.region_id.into();
196 r.rows.map(|rows| {
197 (
198 region_id,
199 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200 )
201 })
202 })
203 .collect();
204 Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208 let requests = deletes
209 .requests
210 .into_iter()
211 .filter_map(|r| {
212 let region_id = r.region_id.into();
213 r.rows.map(|rows| {
214 (
215 region_id,
216 RegionRequest::Delete(RegionDeleteRequest { rows }),
217 )
218 })
219 })
220 .collect();
221 Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225 let column_metadatas = create
226 .column_defs
227 .into_iter()
228 .map(ColumnMetadata::try_from_column_def)
229 .collect::<Result<Vec<_>>>()?;
230 let region_id = RegionId::from(create.region_id);
231 let table_dir = table_dir(&create.path, region_id.table_id());
232 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233 Ok((
234 region_id,
235 RegionCreateRequest {
236 engine: create.engine,
237 column_metadatas,
238 primary_key: create.primary_key,
239 options: create.options,
240 table_dir,
241 path_type: PathType::Bare,
242 partition_expr_json,
243 },
244 ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248 let (region_id, request) = parse_region_create(create)?;
249 Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253 let mut requests = Vec::with_capacity(creates.requests.len());
254 for create in creates.requests {
255 requests.extend(make_region_create(create)?);
256 }
257 Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261 let region_id = drop.region_id.into();
262 Ok((
263 region_id,
264 RegionDropRequest {
265 fast_path: drop.fast_path,
266 },
267 ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271 let (region_id, request) = parse_region_drop(drop)?;
272 Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276 let mut requests = Vec::with_capacity(drops.requests.len());
277 for drop in drops.requests {
278 requests.extend(make_region_drop(drop)?);
279 }
280 Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284 let region_id = RegionId::from(open.region_id);
285 let table_dir = table_dir(&open.path, region_id.table_id());
286 Ok(vec![(
287 region_id,
288 RegionRequest::Open(RegionOpenRequest {
289 engine: open.engine,
290 table_dir,
291 path_type: PathType::Bare,
292 options: open.options,
293 skip_wal_replay: false,
294 checkpoint: None,
295 }),
296 )])
297}
298
299fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
300 let region_id = close.region_id.into();
301 Ok(vec![(
302 region_id,
303 RegionRequest::Close(RegionCloseRequest {}),
304 )])
305}
306
307fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
308 let region_id = alter.region_id.into();
309 let request = RegionAlterRequest::try_from(alter)?;
310 Ok((region_id, request))
311}
312
313fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
314 let (region_id, request) = parse_region_alter(alter)?;
315 Ok(vec![(region_id, RegionRequest::Alter(request))])
316}
317
318fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
319 let mut requests = Vec::with_capacity(alters.requests.len());
320 for alter in alters.requests {
321 requests.extend(make_region_alter(alter)?);
322 }
323 Ok(requests)
324}
325
326fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
327 let region_id = flush.region_id.into();
328 Ok(vec![(
329 region_id,
330 RegionRequest::Flush(RegionFlushRequest {
331 row_group_size: None,
332 }),
333 )])
334}
335
336fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337 let region_id = compact.region_id.into();
338 let options = compact
339 .options
340 .unwrap_or(compact_request::Options::Regular(Default::default()));
341 Ok(vec![(
342 region_id,
343 RegionRequest::Compact(RegionCompactRequest { options }),
344 )])
345}
346
347fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
348 let region_id = truncate.region_id.into();
349 match truncate.kind {
350 None => InvalidRawRegionRequestSnafu {
351 err: "missing kind in TruncateRequest".to_string(),
352 }
353 .fail(),
354 Some(truncate_request::Kind::All(_)) => Ok(vec![(
355 region_id,
356 RegionRequest::Truncate(RegionTruncateRequest::All),
357 )]),
358 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
359 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
360
361 Ok(vec![(
362 region_id,
363 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
364 )])
365 }
366 }
367}
368
369fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
371 let region_id = request.region_id.into();
372 let Some(Body::ArrowIpc(request)) = request.body else {
373 return Ok(vec![]);
374 };
375
376 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
377 .with_label_values(&["decode"])
378 .start_timer();
379 let mut decoder =
380 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
381 let payload = decoder
382 .try_decode_record_batch(&request.data_header, &request.payload)
383 .context(FlightCodecSnafu)?;
384 decoder_timer.observe_duration();
385 Ok(vec![(
386 region_id,
387 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
388 region_id,
389 payload,
390 raw_data: request,
391 }),
392 )])
393}
394
395#[derive(Debug)]
397pub struct RegionPutRequest {
398 pub rows: Rows,
400 pub hint: Option<WriteHint>,
402}
403
404#[derive(Debug)]
405pub struct RegionReadRequest {
406 pub request: ScanRequest,
407}
408
409#[derive(Debug)]
411pub struct RegionDeleteRequest {
412 pub rows: Rows,
416}
417
418#[derive(Debug, Clone)]
419pub struct RegionCreateRequest {
420 pub engine: String,
422 pub column_metadatas: Vec<ColumnMetadata>,
424 pub primary_key: Vec<ColumnId>,
426 pub options: HashMap<String, String>,
428 pub table_dir: String,
430 pub path_type: PathType,
432 pub partition_expr_json: Option<String>,
435}
436
437impl RegionCreateRequest {
438 pub fn validate(&self) -> Result<()> {
440 ensure!(
442 self.column_metadatas
443 .iter()
444 .any(|x| x.semantic_type == SemanticType::Timestamp),
445 InvalidRegionRequestSnafu {
446 region_id: RegionId::new(0, 0),
447 err: "missing timestamp column in create region request".to_string(),
448 }
449 );
450
451 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
453 for (i, c) in self.column_metadatas.iter().enumerate() {
454 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
455 return InvalidRegionRequestSnafu {
456 region_id: RegionId::new(0, 0),
457 err: format!(
458 "duplicate column id {} (at position {} and {}) in create region request",
459 c.column_id, previous, i
460 ),
461 }
462 .fail();
463 }
464 }
465
466 for column_id in &self.primary_key {
468 ensure!(
469 column_id_to_indices.contains_key(column_id),
470 InvalidRegionRequestSnafu {
471 region_id: RegionId::new(0, 0),
472 err: format!(
473 "missing primary key column {} in create region request",
474 column_id
475 ),
476 }
477 );
478 }
479
480 Ok(())
481 }
482
483 pub fn is_physical_table(&self) -> bool {
485 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
486 }
487}
488
489#[derive(Debug, Clone)]
490pub struct RegionDropRequest {
491 pub fast_path: bool,
492}
493
494#[derive(Debug, Clone)]
496pub struct RegionOpenRequest {
497 pub engine: String,
499 pub table_dir: String,
501 pub path_type: PathType,
503 pub options: HashMap<String, String>,
505 pub skip_wal_replay: bool,
507 pub checkpoint: Option<ReplayCheckpoint>,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct ReplayCheckpoint {
513 pub entry_id: u64,
514 pub metadata_entry_id: Option<u64>,
515}
516
517impl RegionOpenRequest {
518 pub fn is_physical_table(&self) -> bool {
520 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
521 }
522}
523
524#[derive(Debug)]
526pub struct RegionCloseRequest {}
527
528#[derive(Debug, PartialEq, Eq, Clone)]
530pub struct RegionAlterRequest {
531 pub kind: AlterKind,
533}
534
535impl RegionAlterRequest {
536 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
538 self.kind.validate(metadata)?;
539
540 Ok(())
541 }
542
543 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
547 debug_assert!(self.validate(metadata).is_ok());
548 self.kind.need_alter(metadata)
549 }
550}
551
552impl TryFrom<AlterRequest> for RegionAlterRequest {
553 type Error = MetadataError;
554
555 fn try_from(value: AlterRequest) -> Result<Self> {
556 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
557 err: "missing kind in AlterRequest",
558 })?;
559
560 let kind = AlterKind::try_from(kind)?;
561 Ok(RegionAlterRequest { kind })
562 }
563}
564
565#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
567pub enum AlterKind {
568 AddColumns {
570 columns: Vec<AddColumn>,
572 },
573 DropColumns {
575 names: Vec<String>,
577 },
578 ModifyColumnTypes {
580 columns: Vec<ModifyColumnType>,
582 },
583 SetRegionOptions { options: Vec<SetRegionOption> },
585 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
587 SetIndexes { options: Vec<SetIndexOption> },
589 UnsetIndexes { options: Vec<UnsetIndexOption> },
591 DropDefaults {
593 names: Vec<String>,
595 },
596 SetDefaults {
598 columns: Vec<SetDefault>,
600 },
601 SyncColumns {
603 column_metadatas: Vec<ColumnMetadata>,
604 },
605}
606#[derive(Debug, PartialEq, Eq, Clone)]
607pub struct SetDefault {
608 pub name: String,
609 pub default_constraint: Vec<u8>,
610}
611
612#[derive(Debug, PartialEq, Eq, Clone)]
613pub enum SetIndexOption {
614 Fulltext {
615 column_name: String,
616 options: FulltextOptions,
617 },
618 Inverted {
619 column_name: String,
620 },
621 Skipping {
622 column_name: String,
623 options: SkippingIndexOptions,
624 },
625}
626
627impl SetIndexOption {
628 pub fn column_name(&self) -> &String {
630 match self {
631 SetIndexOption::Fulltext { column_name, .. } => column_name,
632 SetIndexOption::Inverted { column_name } => column_name,
633 SetIndexOption::Skipping { column_name, .. } => column_name,
634 }
635 }
636
637 pub fn is_fulltext(&self) -> bool {
639 match self {
640 SetIndexOption::Fulltext { .. } => true,
641 SetIndexOption::Inverted { .. } => false,
642 SetIndexOption::Skipping { .. } => false,
643 }
644 }
645}
646
647impl TryFrom<v1::SetIndex> for SetIndexOption {
648 type Error = MetadataError;
649
650 fn try_from(value: v1::SetIndex) -> Result<Self> {
651 let option = value.options.context(InvalidRawRegionRequestSnafu {
652 err: "missing options in SetIndex",
653 })?;
654
655 let opt = match option {
656 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
657 column_name: x.column_name.clone(),
658 options: FulltextOptions::new(
659 x.enable,
660 as_fulltext_option_analyzer(
661 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
662 ),
663 x.case_sensitive,
664 as_fulltext_option_backend(
665 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
666 ),
667 x.granularity as u32,
668 x.false_positive_rate,
669 )
670 .context(InvalidIndexOptionSnafu)?,
671 },
672 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
673 column_name: i.column_name,
674 },
675 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
676 column_name: s.column_name,
677 options: SkippingIndexOptions::new(
678 s.granularity as u32,
679 s.false_positive_rate,
680 as_skipping_index_type(
681 PbSkippingIndexType::try_from(s.skipping_index_type)
682 .context(DecodeProtoSnafu)?,
683 ),
684 )
685 .context(InvalidIndexOptionSnafu)?,
686 },
687 };
688
689 Ok(opt)
690 }
691}
692
693#[derive(Debug, PartialEq, Eq, Clone)]
694pub enum UnsetIndexOption {
695 Fulltext { column_name: String },
696 Inverted { column_name: String },
697 Skipping { column_name: String },
698}
699
700impl UnsetIndexOption {
701 pub fn column_name(&self) -> &String {
702 match self {
703 UnsetIndexOption::Fulltext { column_name } => column_name,
704 UnsetIndexOption::Inverted { column_name } => column_name,
705 UnsetIndexOption::Skipping { column_name } => column_name,
706 }
707 }
708
709 pub fn is_fulltext(&self) -> bool {
710 match self {
711 UnsetIndexOption::Fulltext { .. } => true,
712 UnsetIndexOption::Inverted { .. } => false,
713 UnsetIndexOption::Skipping { .. } => false,
714 }
715 }
716}
717
718impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
719 type Error = MetadataError;
720
721 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
722 let option = value.options.context(InvalidRawRegionRequestSnafu {
723 err: "missing options in UnsetIndex",
724 })?;
725
726 let opt = match option {
727 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
728 column_name: f.column_name,
729 },
730 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
731 column_name: i.column_name,
732 },
733 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
734 column_name: s.column_name,
735 },
736 };
737
738 Ok(opt)
739 }
740}
741
742impl AlterKind {
743 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
747 match self {
748 AlterKind::AddColumns { columns } => {
749 for col_to_add in columns {
750 col_to_add.validate(metadata)?;
751 }
752 }
753 AlterKind::DropColumns { names } => {
754 for name in names {
755 Self::validate_column_to_drop(name, metadata)?;
756 }
757 }
758 AlterKind::ModifyColumnTypes { columns } => {
759 for col_to_change in columns {
760 col_to_change.validate(metadata)?;
761 }
762 }
763 AlterKind::SetRegionOptions { .. } => {}
764 AlterKind::UnsetRegionOptions { .. } => {}
765 AlterKind::SetIndexes { options } => {
766 for option in options {
767 Self::validate_column_alter_index_option(
768 option.column_name(),
769 metadata,
770 option.is_fulltext(),
771 )?;
772 }
773 }
774 AlterKind::UnsetIndexes { options } => {
775 for option in options {
776 Self::validate_column_alter_index_option(
777 option.column_name(),
778 metadata,
779 option.is_fulltext(),
780 )?;
781 }
782 }
783 AlterKind::DropDefaults { names } => {
784 names
785 .iter()
786 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
787 }
788 AlterKind::SetDefaults { columns } => {
789 columns
790 .iter()
791 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
792 }
793 AlterKind::SyncColumns { column_metadatas } => {
794 let new_primary_keys = column_metadatas
795 .iter()
796 .filter(|c| c.semantic_type == SemanticType::Tag)
797 .map(|c| (c.column_schema.name.as_str(), c.column_id))
798 .collect::<HashMap<_, _>>();
799
800 let old_primary_keys = metadata
801 .column_metadatas
802 .iter()
803 .filter(|c| c.semantic_type == SemanticType::Tag)
804 .map(|c| (c.column_schema.name.as_str(), c.column_id));
805
806 for (name, id) in old_primary_keys {
807 let primary_key =
808 new_primary_keys
809 .get(name)
810 .with_context(|| InvalidRegionRequestSnafu {
811 region_id: metadata.region_id,
812 err: format!("column {} is not a primary key", name),
813 })?;
814
815 ensure!(
816 *primary_key == id,
817 InvalidRegionRequestSnafu {
818 region_id: metadata.region_id,
819 err: format!(
820 "column with same name {} has different id, existing: {}, got: {}",
821 name, id, primary_key
822 ),
823 }
824 );
825 }
826
827 let new_ts_column = column_metadatas
828 .iter()
829 .find(|c| c.semantic_type == SemanticType::Timestamp)
830 .map(|c| (c.column_schema.name.as_str(), c.column_id))
831 .context(InvalidRegionRequestSnafu {
832 region_id: metadata.region_id,
833 err: "timestamp column not found",
834 })?;
835
836 let old_ts_column = metadata
838 .column_metadatas
839 .iter()
840 .find(|c| c.semantic_type == SemanticType::Timestamp)
841 .map(|c| (c.column_schema.name.as_str(), c.column_id))
842 .unwrap();
843
844 ensure!(
845 new_ts_column == old_ts_column,
846 InvalidRegionRequestSnafu {
847 region_id: metadata.region_id,
848 err: format!(
849 "timestamp column {} has different id, existing: {}, got: {}",
850 old_ts_column.0, old_ts_column.1, new_ts_column.1
851 ),
852 }
853 );
854 }
855 }
856 Ok(())
857 }
858
859 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
861 debug_assert!(self.validate(metadata).is_ok());
862 match self {
863 AlterKind::AddColumns { columns } => columns
864 .iter()
865 .any(|col_to_add| col_to_add.need_alter(metadata)),
866 AlterKind::DropColumns { names } => names
867 .iter()
868 .any(|name| metadata.column_by_name(name).is_some()),
869 AlterKind::ModifyColumnTypes { columns } => columns
870 .iter()
871 .any(|col_to_change| col_to_change.need_alter(metadata)),
872 AlterKind::SetRegionOptions { .. } => {
873 true
876 }
877 AlterKind::UnsetRegionOptions { .. } => true,
878 AlterKind::SetIndexes { options, .. } => options
879 .iter()
880 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
881 AlterKind::UnsetIndexes { options } => options
882 .iter()
883 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
884 AlterKind::DropDefaults { names } => names
885 .iter()
886 .any(|name| metadata.column_by_name(name).is_some()),
887
888 AlterKind::SetDefaults { columns } => columns
889 .iter()
890 .any(|x| metadata.column_by_name(&x.name).is_some()),
891 AlterKind::SyncColumns { column_metadatas } => {
892 metadata.column_metadatas != *column_metadatas
893 }
894 }
895 }
896
897 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
899 let Some(column) = metadata.column_by_name(name) else {
900 return Ok(());
901 };
902 ensure!(
903 column.semantic_type == SemanticType::Field,
904 InvalidRegionRequestSnafu {
905 region_id: metadata.region_id,
906 err: format!("column {} is not a field and could not be dropped", name),
907 }
908 );
909 Ok(())
910 }
911
912 fn validate_column_alter_index_option(
914 column_name: &String,
915 metadata: &RegionMetadata,
916 is_fulltext: bool,
917 ) -> Result<()> {
918 let column = metadata
919 .column_by_name(column_name)
920 .context(InvalidRegionRequestSnafu {
921 region_id: metadata.region_id,
922 err: format!("column {} not found", column_name),
923 })?;
924
925 if is_fulltext {
926 ensure!(
927 column.column_schema.data_type.is_string(),
928 InvalidRegionRequestSnafu {
929 region_id: metadata.region_id,
930 err: format!(
931 "cannot change alter index options for non-string column {}",
932 column_name
933 ),
934 }
935 );
936 }
937
938 Ok(())
939 }
940
941 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
943 metadata
944 .column_by_name(column_name)
945 .context(InvalidRegionRequestSnafu {
946 region_id: metadata.region_id,
947 err: format!("column {} not found", column_name),
948 })?;
949
950 Ok(())
951 }
952}
953
954impl TryFrom<alter_request::Kind> for AlterKind {
955 type Error = MetadataError;
956
957 fn try_from(kind: alter_request::Kind) -> Result<Self> {
958 let alter_kind = match kind {
959 alter_request::Kind::AddColumns(x) => {
960 let columns = x
961 .add_columns
962 .into_iter()
963 .map(|x| x.try_into())
964 .collect::<Result<Vec<_>>>()?;
965 AlterKind::AddColumns { columns }
966 }
967 alter_request::Kind::ModifyColumnTypes(x) => {
968 let columns = x
969 .modify_column_types
970 .into_iter()
971 .map(|x| x.into())
972 .collect::<Vec<_>>();
973 AlterKind::ModifyColumnTypes { columns }
974 }
975 alter_request::Kind::DropColumns(x) => {
976 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
977 AlterKind::DropColumns { names }
978 }
979 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
980 options: options
981 .table_options
982 .iter()
983 .map(TryFrom::try_from)
984 .collect::<Result<Vec<_>>>()?,
985 },
986 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
987 keys: options
988 .keys
989 .iter()
990 .map(|key| UnsetRegionOption::try_from(key.as_str()))
991 .collect::<Result<Vec<_>>>()?,
992 },
993 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
994 options: vec![SetIndexOption::try_from(o)?],
995 },
996 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
997 options: vec![UnsetIndexOption::try_from(o)?],
998 },
999 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1000 options: o
1001 .set_indexes
1002 .into_iter()
1003 .map(SetIndexOption::try_from)
1004 .collect::<Result<Vec<_>>>()?,
1005 },
1006 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1007 options: o
1008 .unset_indexes
1009 .into_iter()
1010 .map(UnsetIndexOption::try_from)
1011 .collect::<Result<Vec<_>>>()?,
1012 },
1013 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1014 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1015 },
1016 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1017 columns: x
1018 .set_defaults
1019 .into_iter()
1020 .map(|x| {
1021 Ok(SetDefault {
1022 name: x.column_name,
1023 default_constraint: x.default_constraint.clone(),
1024 })
1025 })
1026 .collect::<Result<Vec<_>>>()?,
1027 },
1028 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1029 column_metadatas: x
1030 .column_defs
1031 .into_iter()
1032 .map(ColumnMetadata::try_from_column_def)
1033 .collect::<Result<Vec<_>>>()?,
1034 },
1035 };
1036
1037 Ok(alter_kind)
1038 }
1039}
1040
1041#[derive(Debug, PartialEq, Eq, Clone)]
1043pub struct AddColumn {
1044 pub column_metadata: ColumnMetadata,
1046 pub location: Option<AddColumnLocation>,
1049}
1050
1051impl AddColumn {
1052 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1057 ensure!(
1058 self.column_metadata.column_schema.is_nullable()
1059 || self
1060 .column_metadata
1061 .column_schema
1062 .default_constraint()
1063 .is_some(),
1064 InvalidRegionRequestSnafu {
1065 region_id: metadata.region_id,
1066 err: format!(
1067 "no default value for column {}",
1068 self.column_metadata.column_schema.name
1069 ),
1070 }
1071 );
1072
1073 if let Some(existing_column) =
1074 metadata.column_by_name(&self.column_metadata.column_schema.name)
1075 {
1076 ensure!(
1078 *existing_column == self.column_metadata,
1079 InvalidRegionRequestSnafu {
1080 region_id: metadata.region_id,
1081 err: format!(
1082 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1083 self.column_metadata.column_schema.name,
1084 existing_column,
1085 self.column_metadata,
1086 ),
1087 }
1088 );
1089 ensure!(
1090 self.location.is_none(),
1091 InvalidRegionRequestSnafu {
1092 region_id: metadata.region_id,
1093 err: format!(
1094 "column {} already exists, but location is specified",
1095 self.column_metadata.column_schema.name
1096 ),
1097 }
1098 );
1099 }
1100
1101 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1102 ensure!(
1104 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1105 InvalidRegionRequestSnafu {
1106 region_id: metadata.region_id,
1107 err: format!(
1108 "column id {} already exists with different name {}",
1109 self.column_metadata.column_id, existing_column.column_schema.name
1110 ),
1111 }
1112 );
1113 }
1114
1115 Ok(())
1116 }
1117
1118 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1120 debug_assert!(self.validate(metadata).is_ok());
1121 metadata
1122 .column_by_name(&self.column_metadata.column_schema.name)
1123 .is_none()
1124 }
1125}
1126
1127impl TryFrom<v1::region::AddColumn> for AddColumn {
1128 type Error = MetadataError;
1129
1130 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1131 let column_def = add_column
1132 .column_def
1133 .context(InvalidRawRegionRequestSnafu {
1134 err: "missing column_def in AddColumn",
1135 })?;
1136
1137 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1138 let location = add_column
1139 .location
1140 .map(AddColumnLocation::try_from)
1141 .transpose()?;
1142
1143 Ok(AddColumn {
1144 column_metadata,
1145 location,
1146 })
1147 }
1148}
1149
1150#[derive(Debug, PartialEq, Eq, Clone)]
1152pub enum AddColumnLocation {
1153 First,
1155 After {
1157 column_name: String,
1159 },
1160}
1161
1162impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1163 type Error = MetadataError;
1164
1165 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1166 let location_type = LocationType::try_from(location.location_type)
1167 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1168 let add_column_location = match location_type {
1169 LocationType::First => AddColumnLocation::First,
1170 LocationType::After => AddColumnLocation::After {
1171 column_name: location.after_column_name,
1172 },
1173 };
1174
1175 Ok(add_column_location)
1176 }
1177}
1178
1179#[derive(Debug, PartialEq, Eq, Clone)]
1181pub struct ModifyColumnType {
1182 pub column_name: String,
1184 pub target_type: ConcreteDataType,
1186}
1187
1188impl ModifyColumnType {
1189 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1191 let column_meta = metadata
1192 .column_by_name(&self.column_name)
1193 .with_context(|| InvalidRegionRequestSnafu {
1194 region_id: metadata.region_id,
1195 err: format!("column {} not found", self.column_name),
1196 })?;
1197
1198 ensure!(
1199 matches!(column_meta.semantic_type, SemanticType::Field),
1200 InvalidRegionRequestSnafu {
1201 region_id: metadata.region_id,
1202 err: "'timestamp' or 'tag' column cannot change type".to_string()
1203 }
1204 );
1205 ensure!(
1206 column_meta
1207 .column_schema
1208 .data_type
1209 .can_arrow_type_cast_to(&self.target_type),
1210 InvalidRegionRequestSnafu {
1211 region_id: metadata.region_id,
1212 err: format!(
1213 "column '{}' cannot be cast automatically to type '{}'",
1214 self.column_name, self.target_type
1215 ),
1216 }
1217 );
1218
1219 Ok(())
1220 }
1221
1222 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1224 debug_assert!(self.validate(metadata).is_ok());
1225 metadata.column_by_name(&self.column_name).is_some()
1226 }
1227}
1228
1229impl From<v1::ModifyColumnType> for ModifyColumnType {
1230 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1231 let target_type = ColumnDataTypeWrapper::new(
1232 modify_column_type.target_type(),
1233 modify_column_type.target_type_extension,
1234 )
1235 .into();
1236
1237 ModifyColumnType {
1238 column_name: modify_column_type.column_name,
1239 target_type,
1240 }
1241 }
1242}
1243
1244#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1245pub enum SetRegionOption {
1246 Ttl(Option<TimeToLive>),
1247 Twsc(String, String),
1249}
1250
1251impl TryFrom<&PbOption> for SetRegionOption {
1252 type Error = MetadataError;
1253
1254 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1255 let PbOption { key, value } = value;
1256 match key.as_str() {
1257 TTL_KEY => {
1258 let ttl = TimeToLive::from_humantime_or_str(value)
1259 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1260
1261 Ok(Self::Ttl(Some(ttl)))
1262 }
1263 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1264 Ok(Self::Twsc(key.to_string(), value.to_string()))
1265 }
1266 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1267 }
1268 }
1269}
1270
1271impl From<&UnsetRegionOption> for SetRegionOption {
1272 fn from(unset_option: &UnsetRegionOption) -> Self {
1273 match unset_option {
1274 UnsetRegionOption::TwcsTriggerFileNum => {
1275 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1276 }
1277 UnsetRegionOption::TwcsMaxOutputFileSize => {
1278 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1279 }
1280 UnsetRegionOption::TwcsTimeWindow => {
1281 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1282 }
1283 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1284 }
1285 }
1286}
1287
1288impl TryFrom<&str> for UnsetRegionOption {
1289 type Error = MetadataError;
1290
1291 fn try_from(key: &str) -> Result<Self> {
1292 match key.to_ascii_lowercase().as_str() {
1293 TTL_KEY => Ok(Self::Ttl),
1294 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1295 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1296 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1297 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1298 }
1299 }
1300}
1301
1302#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1303pub enum UnsetRegionOption {
1304 TwcsTriggerFileNum,
1305 TwcsMaxOutputFileSize,
1306 TwcsTimeWindow,
1307 Ttl,
1308}
1309
1310impl UnsetRegionOption {
1311 pub fn as_str(&self) -> &str {
1312 match self {
1313 Self::Ttl => TTL_KEY,
1314 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1315 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1316 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1317 }
1318 }
1319}
1320
1321impl Display for UnsetRegionOption {
1322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1323 write!(f, "{}", self.as_str())
1324 }
1325}
1326
1327#[derive(Debug, Clone, Default)]
1328pub struct RegionFlushRequest {
1329 pub row_group_size: Option<usize>,
1330}
1331
1332#[derive(Debug)]
1333pub struct RegionCompactRequest {
1334 pub options: compact_request::Options,
1335}
1336
1337impl Default for RegionCompactRequest {
1338 fn default() -> Self {
1339 Self {
1340 options: compact_request::Options::Regular(Default::default()),
1342 }
1343 }
1344}
1345
1346#[derive(Debug)]
1348pub enum RegionTruncateRequest {
1349 All,
1351 ByTimeRanges {
1352 time_ranges: Vec<(Timestamp, Timestamp)>,
1356 },
1357}
1358
1359#[derive(Debug, Clone, Copy, Default)]
1364pub struct RegionCatchupRequest {
1365 pub set_writable: bool,
1367 pub entry_id: Option<entry::Id>,
1370 pub metadata_entry_id: Option<entry::Id>,
1374 pub location_id: Option<u64>,
1376 pub checkpoint: Option<ReplayCheckpoint>,
1378}
1379
1380#[derive(Debug, Clone)]
1382pub struct RegionSequencesRequest {
1383 pub region_ids: Vec<RegionId>,
1384}
1385
1386#[derive(Debug, Clone)]
1387pub struct RegionBulkInsertsRequest {
1388 pub region_id: RegionId,
1389 pub payload: DfRecordBatch,
1390 pub raw_data: ArrowIpc,
1391}
1392
1393impl RegionBulkInsertsRequest {
1394 pub fn estimated_size(&self) -> usize {
1395 self.payload.get_array_memory_size()
1396 }
1397}
1398
1399impl fmt::Display for RegionRequest {
1400 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1401 match self {
1402 RegionRequest::Put(_) => write!(f, "Put"),
1403 RegionRequest::Delete(_) => write!(f, "Delete"),
1404 RegionRequest::Create(_) => write!(f, "Create"),
1405 RegionRequest::Drop(_) => write!(f, "Drop"),
1406 RegionRequest::Open(_) => write!(f, "Open"),
1407 RegionRequest::Close(_) => write!(f, "Close"),
1408 RegionRequest::Alter(_) => write!(f, "Alter"),
1409 RegionRequest::Flush(_) => write!(f, "Flush"),
1410 RegionRequest::Compact(_) => write!(f, "Compact"),
1411 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1412 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1413 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1414 }
1415 }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420
1421 use api::v1::region::RegionColumnDef;
1422 use api::v1::{ColumnDataType, ColumnDef};
1423 use datatypes::prelude::ConcreteDataType;
1424 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1425
1426 use super::*;
1427 use crate::metadata::RegionMetadataBuilder;
1428
1429 #[test]
1430 fn test_from_proto_location() {
1431 let proto_location = v1::AddColumnLocation {
1432 location_type: LocationType::First as i32,
1433 after_column_name: String::default(),
1434 };
1435 let location = AddColumnLocation::try_from(proto_location).unwrap();
1436 assert_eq!(location, AddColumnLocation::First);
1437
1438 let proto_location = v1::AddColumnLocation {
1439 location_type: 10,
1440 after_column_name: String::default(),
1441 };
1442 AddColumnLocation::try_from(proto_location).unwrap_err();
1443
1444 let proto_location = v1::AddColumnLocation {
1445 location_type: LocationType::After as i32,
1446 after_column_name: "a".to_string(),
1447 };
1448 let location = AddColumnLocation::try_from(proto_location).unwrap();
1449 assert_eq!(
1450 location,
1451 AddColumnLocation::After {
1452 column_name: "a".to_string()
1453 }
1454 );
1455 }
1456
1457 #[test]
1458 fn test_from_none_proto_add_column() {
1459 AddColumn::try_from(v1::region::AddColumn {
1460 column_def: None,
1461 location: None,
1462 })
1463 .unwrap_err();
1464 }
1465
1466 #[test]
1467 fn test_from_proto_alter_request() {
1468 RegionAlterRequest::try_from(AlterRequest {
1469 region_id: 0,
1470 schema_version: 1,
1471 kind: None,
1472 })
1473 .unwrap_err();
1474
1475 let request = RegionAlterRequest::try_from(AlterRequest {
1476 region_id: 0,
1477 schema_version: 1,
1478 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1479 add_columns: vec![v1::region::AddColumn {
1480 column_def: Some(RegionColumnDef {
1481 column_def: Some(ColumnDef {
1482 name: "a".to_string(),
1483 data_type: ColumnDataType::String as i32,
1484 is_nullable: true,
1485 default_constraint: vec![],
1486 semantic_type: SemanticType::Field as i32,
1487 comment: String::new(),
1488 ..Default::default()
1489 }),
1490 column_id: 1,
1491 }),
1492 location: Some(v1::AddColumnLocation {
1493 location_type: LocationType::First as i32,
1494 after_column_name: String::default(),
1495 }),
1496 }],
1497 })),
1498 })
1499 .unwrap();
1500
1501 assert_eq!(
1502 request,
1503 RegionAlterRequest {
1504 kind: AlterKind::AddColumns {
1505 columns: vec![AddColumn {
1506 column_metadata: ColumnMetadata {
1507 column_schema: ColumnSchema::new(
1508 "a",
1509 ConcreteDataType::string_datatype(),
1510 true,
1511 ),
1512 semantic_type: SemanticType::Field,
1513 column_id: 1,
1514 },
1515 location: Some(AddColumnLocation::First),
1516 }]
1517 },
1518 }
1519 );
1520 }
1521
1522 fn new_metadata() -> RegionMetadata {
1525 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1526 builder
1527 .push_column_metadata(ColumnMetadata {
1528 column_schema: ColumnSchema::new(
1529 "ts",
1530 ConcreteDataType::timestamp_millisecond_datatype(),
1531 false,
1532 ),
1533 semantic_type: SemanticType::Timestamp,
1534 column_id: 1,
1535 })
1536 .push_column_metadata(ColumnMetadata {
1537 column_schema: ColumnSchema::new(
1538 "tag_0",
1539 ConcreteDataType::string_datatype(),
1540 true,
1541 ),
1542 semantic_type: SemanticType::Tag,
1543 column_id: 2,
1544 })
1545 .push_column_metadata(ColumnMetadata {
1546 column_schema: ColumnSchema::new(
1547 "field_0",
1548 ConcreteDataType::string_datatype(),
1549 true,
1550 ),
1551 semantic_type: SemanticType::Field,
1552 column_id: 3,
1553 })
1554 .push_column_metadata(ColumnMetadata {
1555 column_schema: ColumnSchema::new(
1556 "field_1",
1557 ConcreteDataType::boolean_datatype(),
1558 true,
1559 ),
1560 semantic_type: SemanticType::Field,
1561 column_id: 4,
1562 })
1563 .primary_key(vec![2]);
1564 builder.build().unwrap()
1565 }
1566
1567 #[test]
1568 fn test_add_column_validate() {
1569 let metadata = new_metadata();
1570 let add_column = AddColumn {
1571 column_metadata: ColumnMetadata {
1572 column_schema: ColumnSchema::new(
1573 "tag_1",
1574 ConcreteDataType::string_datatype(),
1575 true,
1576 ),
1577 semantic_type: SemanticType::Tag,
1578 column_id: 5,
1579 },
1580 location: None,
1581 };
1582 add_column.validate(&metadata).unwrap();
1583 assert!(add_column.need_alter(&metadata));
1584
1585 AddColumn {
1587 column_metadata: ColumnMetadata {
1588 column_schema: ColumnSchema::new(
1589 "tag_1",
1590 ConcreteDataType::string_datatype(),
1591 false,
1592 ),
1593 semantic_type: SemanticType::Tag,
1594 column_id: 5,
1595 },
1596 location: None,
1597 }
1598 .validate(&metadata)
1599 .unwrap_err();
1600
1601 let add_column = AddColumn {
1603 column_metadata: ColumnMetadata {
1604 column_schema: ColumnSchema::new(
1605 "tag_0",
1606 ConcreteDataType::string_datatype(),
1607 true,
1608 ),
1609 semantic_type: SemanticType::Tag,
1610 column_id: 2,
1611 },
1612 location: None,
1613 };
1614 add_column.validate(&metadata).unwrap();
1615 assert!(!add_column.need_alter(&metadata));
1616 }
1617
1618 #[test]
1619 fn test_add_duplicate_columns() {
1620 let kind = AlterKind::AddColumns {
1621 columns: vec![
1622 AddColumn {
1623 column_metadata: ColumnMetadata {
1624 column_schema: ColumnSchema::new(
1625 "tag_1",
1626 ConcreteDataType::string_datatype(),
1627 true,
1628 ),
1629 semantic_type: SemanticType::Tag,
1630 column_id: 5,
1631 },
1632 location: None,
1633 },
1634 AddColumn {
1635 column_metadata: ColumnMetadata {
1636 column_schema: ColumnSchema::new(
1637 "tag_1",
1638 ConcreteDataType::string_datatype(),
1639 true,
1640 ),
1641 semantic_type: SemanticType::Field,
1642 column_id: 6,
1643 },
1644 location: None,
1645 },
1646 ],
1647 };
1648 let metadata = new_metadata();
1649 kind.validate(&metadata).unwrap();
1650 assert!(kind.need_alter(&metadata));
1651 }
1652
1653 #[test]
1654 fn test_add_existing_column_different_metadata() {
1655 let metadata = new_metadata();
1656
1657 let kind = AlterKind::AddColumns {
1659 columns: vec![AddColumn {
1660 column_metadata: ColumnMetadata {
1661 column_schema: ColumnSchema::new(
1662 "tag_0",
1663 ConcreteDataType::string_datatype(),
1664 true,
1665 ),
1666 semantic_type: SemanticType::Tag,
1667 column_id: 4,
1668 },
1669 location: None,
1670 }],
1671 };
1672 kind.validate(&metadata).unwrap_err();
1673
1674 let kind = AlterKind::AddColumns {
1676 columns: vec![AddColumn {
1677 column_metadata: ColumnMetadata {
1678 column_schema: ColumnSchema::new(
1679 "tag_0",
1680 ConcreteDataType::int64_datatype(),
1681 true,
1682 ),
1683 semantic_type: SemanticType::Tag,
1684 column_id: 2,
1685 },
1686 location: None,
1687 }],
1688 };
1689 kind.validate(&metadata).unwrap_err();
1690
1691 let kind = AlterKind::AddColumns {
1693 columns: vec![AddColumn {
1694 column_metadata: ColumnMetadata {
1695 column_schema: ColumnSchema::new(
1696 "tag_1",
1697 ConcreteDataType::string_datatype(),
1698 true,
1699 ),
1700 semantic_type: SemanticType::Tag,
1701 column_id: 2,
1702 },
1703 location: None,
1704 }],
1705 };
1706 kind.validate(&metadata).unwrap_err();
1707 }
1708
1709 #[test]
1710 fn test_add_existing_column_with_location() {
1711 let metadata = new_metadata();
1712 let kind = AlterKind::AddColumns {
1713 columns: vec![AddColumn {
1714 column_metadata: ColumnMetadata {
1715 column_schema: ColumnSchema::new(
1716 "tag_0",
1717 ConcreteDataType::string_datatype(),
1718 true,
1719 ),
1720 semantic_type: SemanticType::Tag,
1721 column_id: 2,
1722 },
1723 location: Some(AddColumnLocation::First),
1724 }],
1725 };
1726 kind.validate(&metadata).unwrap_err();
1727 }
1728
1729 #[test]
1730 fn test_validate_drop_column() {
1731 let metadata = new_metadata();
1732 let kind = AlterKind::DropColumns {
1733 names: vec!["xxxx".to_string()],
1734 };
1735 kind.validate(&metadata).unwrap();
1736 assert!(!kind.need_alter(&metadata));
1737
1738 AlterKind::DropColumns {
1739 names: vec!["tag_0".to_string()],
1740 }
1741 .validate(&metadata)
1742 .unwrap_err();
1743
1744 let kind = AlterKind::DropColumns {
1745 names: vec!["field_0".to_string()],
1746 };
1747 kind.validate(&metadata).unwrap();
1748 assert!(kind.need_alter(&metadata));
1749 }
1750
1751 #[test]
1752 fn test_validate_modify_column_type() {
1753 let metadata = new_metadata();
1754 AlterKind::ModifyColumnTypes {
1755 columns: vec![ModifyColumnType {
1756 column_name: "xxxx".to_string(),
1757 target_type: ConcreteDataType::string_datatype(),
1758 }],
1759 }
1760 .validate(&metadata)
1761 .unwrap_err();
1762
1763 AlterKind::ModifyColumnTypes {
1764 columns: vec![ModifyColumnType {
1765 column_name: "field_1".to_string(),
1766 target_type: ConcreteDataType::date_datatype(),
1767 }],
1768 }
1769 .validate(&metadata)
1770 .unwrap_err();
1771
1772 AlterKind::ModifyColumnTypes {
1773 columns: vec![ModifyColumnType {
1774 column_name: "ts".to_string(),
1775 target_type: ConcreteDataType::date_datatype(),
1776 }],
1777 }
1778 .validate(&metadata)
1779 .unwrap_err();
1780
1781 AlterKind::ModifyColumnTypes {
1782 columns: vec![ModifyColumnType {
1783 column_name: "tag_0".to_string(),
1784 target_type: ConcreteDataType::date_datatype(),
1785 }],
1786 }
1787 .validate(&metadata)
1788 .unwrap_err();
1789
1790 let kind = AlterKind::ModifyColumnTypes {
1791 columns: vec![ModifyColumnType {
1792 column_name: "field_0".to_string(),
1793 target_type: ConcreteDataType::int32_datatype(),
1794 }],
1795 };
1796 kind.validate(&metadata).unwrap();
1797 assert!(kind.need_alter(&metadata));
1798 }
1799
1800 #[test]
1801 fn test_validate_add_columns() {
1802 let kind = AlterKind::AddColumns {
1803 columns: vec![
1804 AddColumn {
1805 column_metadata: ColumnMetadata {
1806 column_schema: ColumnSchema::new(
1807 "tag_1",
1808 ConcreteDataType::string_datatype(),
1809 true,
1810 ),
1811 semantic_type: SemanticType::Tag,
1812 column_id: 5,
1813 },
1814 location: None,
1815 },
1816 AddColumn {
1817 column_metadata: ColumnMetadata {
1818 column_schema: ColumnSchema::new(
1819 "field_2",
1820 ConcreteDataType::string_datatype(),
1821 true,
1822 ),
1823 semantic_type: SemanticType::Field,
1824 column_id: 6,
1825 },
1826 location: None,
1827 },
1828 ],
1829 };
1830 let request = RegionAlterRequest { kind };
1831 let mut metadata = new_metadata();
1832 metadata.schema_version = 1;
1833 request.validate(&metadata).unwrap();
1834 }
1835
1836 #[test]
1837 fn test_validate_create_region() {
1838 let column_metadatas = vec![
1839 ColumnMetadata {
1840 column_schema: ColumnSchema::new(
1841 "ts",
1842 ConcreteDataType::timestamp_millisecond_datatype(),
1843 false,
1844 ),
1845 semantic_type: SemanticType::Timestamp,
1846 column_id: 1,
1847 },
1848 ColumnMetadata {
1849 column_schema: ColumnSchema::new(
1850 "tag_0",
1851 ConcreteDataType::string_datatype(),
1852 true,
1853 ),
1854 semantic_type: SemanticType::Tag,
1855 column_id: 2,
1856 },
1857 ColumnMetadata {
1858 column_schema: ColumnSchema::new(
1859 "field_0",
1860 ConcreteDataType::string_datatype(),
1861 true,
1862 ),
1863 semantic_type: SemanticType::Field,
1864 column_id: 3,
1865 },
1866 ];
1867 let create = RegionCreateRequest {
1868 engine: "mito".to_string(),
1869 column_metadatas,
1870 primary_key: vec![3, 4],
1871 options: HashMap::new(),
1872 table_dir: "path".to_string(),
1873 path_type: PathType::Bare,
1874 partition_expr_json: Some("".to_string()),
1875 };
1876
1877 assert!(create.validate().is_err());
1878 }
1879
1880 #[test]
1881 fn test_validate_modify_column_fulltext_options() {
1882 let kind = AlterKind::SetIndexes {
1883 options: vec![SetIndexOption::Fulltext {
1884 column_name: "tag_0".to_string(),
1885 options: FulltextOptions::new_unchecked(
1886 true,
1887 FulltextAnalyzer::Chinese,
1888 false,
1889 FulltextBackend::Bloom,
1890 1000,
1891 0.01,
1892 ),
1893 }],
1894 };
1895 let request = RegionAlterRequest { kind };
1896 let mut metadata = new_metadata();
1897 metadata.schema_version = 1;
1898 request.validate(&metadata).unwrap();
1899
1900 let kind = AlterKind::UnsetIndexes {
1901 options: vec![UnsetIndexOption::Fulltext {
1902 column_name: "tag_0".to_string(),
1903 }],
1904 };
1905 let request = RegionAlterRequest { kind };
1906 let mut metadata = new_metadata();
1907 metadata.schema_version = 1;
1908 request.validate(&metadata).unwrap();
1909 }
1910
1911 #[test]
1912 fn test_validate_sync_columns() {
1913 let metadata = new_metadata();
1914 let kind = AlterKind::SyncColumns {
1915 column_metadatas: vec![
1916 ColumnMetadata {
1917 column_schema: ColumnSchema::new(
1918 "tag_1",
1919 ConcreteDataType::string_datatype(),
1920 true,
1921 ),
1922 semantic_type: SemanticType::Tag,
1923 column_id: 5,
1924 },
1925 ColumnMetadata {
1926 column_schema: ColumnSchema::new(
1927 "field_2",
1928 ConcreteDataType::string_datatype(),
1929 true,
1930 ),
1931 semantic_type: SemanticType::Field,
1932 column_id: 6,
1933 },
1934 ],
1935 };
1936 let err = kind.validate(&metadata).unwrap_err();
1937 assert!(err.to_string().contains("not a primary key"));
1938
1939 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1941 let ts_column = column_metadatas_with_different_ts_column
1942 .iter_mut()
1943 .find(|c| c.semantic_type == SemanticType::Timestamp)
1944 .unwrap();
1945 ts_column.column_schema.name = "ts1".to_string();
1946
1947 let kind = AlterKind::SyncColumns {
1948 column_metadatas: column_metadatas_with_different_ts_column,
1949 };
1950 let err = kind.validate(&metadata).unwrap_err();
1951 assert!(
1952 err.to_string()
1953 .contains("timestamp column ts has different id")
1954 );
1955
1956 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1958 let pk_column = column_metadatas_with_different_pk_column
1959 .iter_mut()
1960 .find(|c| c.column_schema.name == "tag_0")
1961 .unwrap();
1962 pk_column.column_id = 100;
1963 let kind = AlterKind::SyncColumns {
1964 column_metadatas: column_metadatas_with_different_pk_column,
1965 };
1966 let err = kind.validate(&metadata).unwrap_err();
1967 assert!(
1968 err.to_string()
1969 .contains("column with same name tag_0 has different id")
1970 );
1971
1972 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1974 column_metadatas_with_new_field_column.push(ColumnMetadata {
1975 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1976 semantic_type: SemanticType::Field,
1977 column_id: 4,
1978 });
1979 let kind = AlterKind::SyncColumns {
1980 column_metadatas: column_metadatas_with_new_field_column,
1981 };
1982 kind.validate(&metadata).unwrap();
1983 }
1984
1985 #[test]
1986 fn test_cast_path_type_to_primitive() {
1987 assert_eq!(PathType::Bare as u8, 0);
1988 assert_eq!(PathType::Data as u8, 1);
1989 assert_eq!(PathType::Metadata as u8, 2);
1990 assert_eq!(
1991 PathType::try_from(PathType::Bare as u8).unwrap(),
1992 PathType::Bare
1993 );
1994 assert_eq!(
1995 PathType::try_from(PathType::Data as u8).unwrap(),
1996 PathType::Data
1997 );
1998 assert_eq!(
1999 PathType::try_from(PathType::Metadata as u8).unwrap(),
2000 PathType::Metadata
2001 );
2002 }
2003}