1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26 CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27 OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49 RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54 SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63 Bare,
67 Data,
71 Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79 Create(Vec<(RegionId, RegionCreateRequest)>),
80 Drop(Vec<(RegionId, RegionDropRequest)>),
81 Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87 match body {
88 region_request::Body::Creates(creates) => {
89 let requests = creates
90 .requests
91 .into_iter()
92 .map(parse_region_create)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Some(Self::Create(requests)))
95 }
96 region_request::Body::Drops(drops) => {
97 let requests = drops
98 .requests
99 .into_iter()
100 .map(parse_region_drop)
101 .collect::<Result<Vec<_>>>()?;
102 Ok(Some(Self::Drop(requests)))
103 }
104 region_request::Body::Alters(alters) => {
105 let requests = alters
106 .requests
107 .into_iter()
108 .map(parse_region_alter)
109 .collect::<Result<Vec<_>>>()?;
110 Ok(Some(Self::Alter(requests)))
111 }
112 _ => Ok(None),
113 }
114 }
115
116 pub fn request_type(&self) -> &'static str {
117 self.into()
118 }
119
120 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121 match self {
122 Self::Create(requests) => requests
123 .into_iter()
124 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125 .collect(),
126 Self::Drop(requests) => requests
127 .into_iter()
128 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129 .collect(),
130 Self::Alter(requests) => requests
131 .into_iter()
132 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133 .collect(),
134 }
135 }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140 Put(RegionPutRequest),
141 Delete(RegionDeleteRequest),
142 Create(RegionCreateRequest),
143 Drop(RegionDropRequest),
144 Open(RegionOpenRequest),
145 Close(RegionCloseRequest),
146 Alter(RegionAlterRequest),
147 Flush(RegionFlushRequest),
148 Compact(RegionCompactRequest),
149 BuildIndex(RegionBuildIndexRequest),
150 Truncate(RegionTruncateRequest),
151 Catchup(RegionCatchupRequest),
152 BulkInserts(RegionBulkInsertsRequest),
153}
154
155impl RegionRequest {
156 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
159 match body {
160 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
161 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
162 region_request::Body::Create(create) => make_region_create(create),
163 region_request::Body::Drop(drop) => make_region_drop(drop),
164 region_request::Body::Open(open) => make_region_open(open),
165 region_request::Body::Close(close) => make_region_close(close),
166 region_request::Body::Alter(alter) => make_region_alter(alter),
167 region_request::Body::Flush(flush) => make_region_flush(flush),
168 region_request::Body::Compact(compact) => make_region_compact(compact),
169 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
170 region_request::Body::Creates(creates) => make_region_creates(creates),
171 region_request::Body::Drops(drops) => make_region_drops(drops),
172 region_request::Body::Alters(alters) => make_region_alters(alters),
173 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
174 region_request::Body::Sync(_) => UnexpectedSnafu {
175 reason: "Sync request should be handled separately by RegionServer",
176 }
177 .fail(),
178 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
179 reason: "ListMetadata request should be handled separately by RegionServer",
180 }
181 .fail(),
182 }
183 }
184
185 pub fn request_type(&self) -> &'static str {
187 self.into()
188 }
189}
190
191fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
192 let requests = inserts
193 .requests
194 .into_iter()
195 .filter_map(|r| {
196 let region_id = r.region_id.into();
197 r.rows.map(|rows| {
198 (
199 region_id,
200 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
201 )
202 })
203 })
204 .collect();
205 Ok(requests)
206}
207
208fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
209 let requests = deletes
210 .requests
211 .into_iter()
212 .filter_map(|r| {
213 let region_id = r.region_id.into();
214 r.rows.map(|rows| {
215 (
216 region_id,
217 RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
218 )
219 })
220 })
221 .collect();
222 Ok(requests)
223}
224
225fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
226 let column_metadatas = create
227 .column_defs
228 .into_iter()
229 .map(ColumnMetadata::try_from_column_def)
230 .collect::<Result<Vec<_>>>()?;
231 let region_id = RegionId::from(create.region_id);
232 let table_dir = table_dir(&create.path, region_id.table_id());
233 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
234 Ok((
235 region_id,
236 RegionCreateRequest {
237 engine: create.engine,
238 column_metadatas,
239 primary_key: create.primary_key,
240 options: create.options,
241 table_dir,
242 path_type: PathType::Bare,
243 partition_expr_json,
244 },
245 ))
246}
247
248fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249 let (region_id, request) = parse_region_create(create)?;
250 Ok(vec![(region_id, RegionRequest::Create(request))])
251}
252
253fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254 let mut requests = Vec::with_capacity(creates.requests.len());
255 for create in creates.requests {
256 requests.extend(make_region_create(create)?);
257 }
258 Ok(requests)
259}
260
261fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
262 let region_id = drop.region_id.into();
263 Ok((
264 region_id,
265 RegionDropRequest {
266 fast_path: drop.fast_path,
267 },
268 ))
269}
270
271fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
272 let (region_id, request) = parse_region_drop(drop)?;
273 Ok(vec![(region_id, RegionRequest::Drop(request))])
274}
275
276fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
277 let mut requests = Vec::with_capacity(drops.requests.len());
278 for drop in drops.requests {
279 requests.extend(make_region_drop(drop)?);
280 }
281 Ok(requests)
282}
283
284fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285 let region_id = RegionId::from(open.region_id);
286 let table_dir = table_dir(&open.path, region_id.table_id());
287 Ok(vec![(
288 region_id,
289 RegionRequest::Open(RegionOpenRequest {
290 engine: open.engine,
291 table_dir,
292 path_type: PathType::Bare,
293 options: open.options,
294 skip_wal_replay: false,
295 checkpoint: None,
296 }),
297 )])
298}
299
300fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301 let region_id = close.region_id.into();
302 Ok(vec![(
303 region_id,
304 RegionRequest::Close(RegionCloseRequest {}),
305 )])
306}
307
308fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
309 let region_id = alter.region_id.into();
310 let request = RegionAlterRequest::try_from(alter)?;
311 Ok((region_id, request))
312}
313
314fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
315 let (region_id, request) = parse_region_alter(alter)?;
316 Ok(vec![(region_id, RegionRequest::Alter(request))])
317}
318
319fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
320 let mut requests = Vec::with_capacity(alters.requests.len());
321 for alter in alters.requests {
322 requests.extend(make_region_alter(alter)?);
323 }
324 Ok(requests)
325}
326
327fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328 let region_id = flush.region_id.into();
329 Ok(vec![(
330 region_id,
331 RegionRequest::Flush(RegionFlushRequest {
332 row_group_size: None,
333 }),
334 )])
335}
336
337fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
338 let region_id = compact.region_id.into();
339 let options = compact
340 .options
341 .unwrap_or(compact_request::Options::Regular(Default::default()));
342 let parallelism = if compact.parallelism == 0 {
344 None
345 } else {
346 Some(compact.parallelism)
347 };
348 Ok(vec![(
349 region_id,
350 RegionRequest::Compact(RegionCompactRequest {
351 options,
352 parallelism,
353 }),
354 )])
355}
356
357fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
358 let region_id = truncate.region_id.into();
359 match truncate.kind {
360 None => InvalidRawRegionRequestSnafu {
361 err: "missing kind in TruncateRequest".to_string(),
362 }
363 .fail(),
364 Some(truncate_request::Kind::All(_)) => Ok(vec![(
365 region_id,
366 RegionRequest::Truncate(RegionTruncateRequest::All),
367 )]),
368 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
369 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
370
371 Ok(vec![(
372 region_id,
373 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
374 )])
375 }
376 }
377}
378
379fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
381 let region_id = request.region_id.into();
382 let Some(Body::ArrowIpc(request)) = request.body else {
383 return Ok(vec![]);
384 };
385
386 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
387 .with_label_values(&["decode"])
388 .start_timer();
389 let mut decoder =
390 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
391 let payload = decoder
392 .try_decode_record_batch(&request.data_header, &request.payload)
393 .context(FlightCodecSnafu)?;
394 decoder_timer.observe_duration();
395 Ok(vec![(
396 region_id,
397 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
398 region_id,
399 payload,
400 raw_data: request,
401 }),
402 )])
403}
404
405#[derive(Debug)]
407pub struct RegionPutRequest {
408 pub rows: Rows,
410 pub hint: Option<WriteHint>,
412}
413
414#[derive(Debug)]
415pub struct RegionReadRequest {
416 pub request: ScanRequest,
417}
418
419#[derive(Debug)]
421pub struct RegionDeleteRequest {
422 pub rows: Rows,
426 pub hint: Option<WriteHint>,
428}
429
430#[derive(Debug, Clone)]
431pub struct RegionCreateRequest {
432 pub engine: String,
434 pub column_metadatas: Vec<ColumnMetadata>,
436 pub primary_key: Vec<ColumnId>,
438 pub options: HashMap<String, String>,
440 pub table_dir: String,
442 pub path_type: PathType,
444 pub partition_expr_json: Option<String>,
447}
448
449impl RegionCreateRequest {
450 pub fn validate(&self) -> Result<()> {
452 ensure!(
454 self.column_metadatas
455 .iter()
456 .any(|x| x.semantic_type == SemanticType::Timestamp),
457 InvalidRegionRequestSnafu {
458 region_id: RegionId::new(0, 0),
459 err: "missing timestamp column in create region request".to_string(),
460 }
461 );
462
463 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
465 for (i, c) in self.column_metadatas.iter().enumerate() {
466 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
467 return InvalidRegionRequestSnafu {
468 region_id: RegionId::new(0, 0),
469 err: format!(
470 "duplicate column id {} (at position {} and {}) in create region request",
471 c.column_id, previous, i
472 ),
473 }
474 .fail();
475 }
476 }
477
478 for column_id in &self.primary_key {
480 ensure!(
481 column_id_to_indices.contains_key(column_id),
482 InvalidRegionRequestSnafu {
483 region_id: RegionId::new(0, 0),
484 err: format!(
485 "missing primary key column {} in create region request",
486 column_id
487 ),
488 }
489 );
490 }
491
492 Ok(())
493 }
494
495 pub fn is_physical_table(&self) -> bool {
497 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
498 }
499}
500
501#[derive(Debug, Clone)]
502pub struct RegionDropRequest {
503 pub fast_path: bool,
504}
505
506#[derive(Debug, Clone)]
508pub struct RegionOpenRequest {
509 pub engine: String,
511 pub table_dir: String,
513 pub path_type: PathType,
515 pub options: HashMap<String, String>,
517 pub skip_wal_replay: bool,
519 pub checkpoint: Option<ReplayCheckpoint>,
521}
522
523#[derive(Debug, Clone, Copy, PartialEq, Eq)]
524pub struct ReplayCheckpoint {
525 pub entry_id: u64,
526 pub metadata_entry_id: Option<u64>,
527}
528
529impl RegionOpenRequest {
530 pub fn is_physical_table(&self) -> bool {
532 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
533 }
534}
535
536#[derive(Debug)]
538pub struct RegionCloseRequest {}
539
540#[derive(Debug, PartialEq, Eq, Clone)]
542pub struct RegionAlterRequest {
543 pub kind: AlterKind,
545}
546
547impl RegionAlterRequest {
548 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
550 self.kind.validate(metadata)?;
551
552 Ok(())
553 }
554
555 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
559 debug_assert!(self.validate(metadata).is_ok());
560 self.kind.need_alter(metadata)
561 }
562}
563
564impl TryFrom<AlterRequest> for RegionAlterRequest {
565 type Error = MetadataError;
566
567 fn try_from(value: AlterRequest) -> Result<Self> {
568 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
569 err: "missing kind in AlterRequest",
570 })?;
571
572 let kind = AlterKind::try_from(kind)?;
573 Ok(RegionAlterRequest { kind })
574 }
575}
576
577#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
579pub enum AlterKind {
580 AddColumns {
582 columns: Vec<AddColumn>,
584 },
585 DropColumns {
587 names: Vec<String>,
589 },
590 ModifyColumnTypes {
592 columns: Vec<ModifyColumnType>,
594 },
595 SetRegionOptions { options: Vec<SetRegionOption> },
597 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
599 SetIndexes { options: Vec<SetIndexOption> },
601 UnsetIndexes { options: Vec<UnsetIndexOption> },
603 DropDefaults {
605 names: Vec<String>,
607 },
608 SetDefaults {
610 columns: Vec<SetDefault>,
612 },
613 SyncColumns {
615 column_metadatas: Vec<ColumnMetadata>,
616 },
617}
618#[derive(Debug, PartialEq, Eq, Clone)]
619pub struct SetDefault {
620 pub name: String,
621 pub default_constraint: Vec<u8>,
622}
623
624#[derive(Debug, PartialEq, Eq, Clone)]
625pub enum SetIndexOption {
626 Fulltext {
627 column_name: String,
628 options: FulltextOptions,
629 },
630 Inverted {
631 column_name: String,
632 },
633 Skipping {
634 column_name: String,
635 options: SkippingIndexOptions,
636 },
637}
638
639impl SetIndexOption {
640 pub fn column_name(&self) -> &String {
642 match self {
643 SetIndexOption::Fulltext { column_name, .. } => column_name,
644 SetIndexOption::Inverted { column_name } => column_name,
645 SetIndexOption::Skipping { column_name, .. } => column_name,
646 }
647 }
648
649 pub fn is_fulltext(&self) -> bool {
651 match self {
652 SetIndexOption::Fulltext { .. } => true,
653 SetIndexOption::Inverted { .. } => false,
654 SetIndexOption::Skipping { .. } => false,
655 }
656 }
657}
658
659impl TryFrom<v1::SetIndex> for SetIndexOption {
660 type Error = MetadataError;
661
662 fn try_from(value: v1::SetIndex) -> Result<Self> {
663 let option = value.options.context(InvalidRawRegionRequestSnafu {
664 err: "missing options in SetIndex",
665 })?;
666
667 let opt = match option {
668 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
669 column_name: x.column_name.clone(),
670 options: FulltextOptions::new(
671 x.enable,
672 as_fulltext_option_analyzer(
673 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
674 ),
675 x.case_sensitive,
676 as_fulltext_option_backend(
677 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
678 ),
679 x.granularity as u32,
680 x.false_positive_rate,
681 )
682 .context(InvalidIndexOptionSnafu)?,
683 },
684 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
685 column_name: i.column_name,
686 },
687 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
688 column_name: s.column_name,
689 options: SkippingIndexOptions::new(
690 s.granularity as u32,
691 s.false_positive_rate,
692 as_skipping_index_type(
693 PbSkippingIndexType::try_from(s.skipping_index_type)
694 .context(DecodeProtoSnafu)?,
695 ),
696 )
697 .context(InvalidIndexOptionSnafu)?,
698 },
699 };
700
701 Ok(opt)
702 }
703}
704
705#[derive(Debug, PartialEq, Eq, Clone)]
706pub enum UnsetIndexOption {
707 Fulltext { column_name: String },
708 Inverted { column_name: String },
709 Skipping { column_name: String },
710}
711
712impl UnsetIndexOption {
713 pub fn column_name(&self) -> &String {
714 match self {
715 UnsetIndexOption::Fulltext { column_name } => column_name,
716 UnsetIndexOption::Inverted { column_name } => column_name,
717 UnsetIndexOption::Skipping { column_name } => column_name,
718 }
719 }
720
721 pub fn is_fulltext(&self) -> bool {
722 match self {
723 UnsetIndexOption::Fulltext { .. } => true,
724 UnsetIndexOption::Inverted { .. } => false,
725 UnsetIndexOption::Skipping { .. } => false,
726 }
727 }
728}
729
730impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
731 type Error = MetadataError;
732
733 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
734 let option = value.options.context(InvalidRawRegionRequestSnafu {
735 err: "missing options in UnsetIndex",
736 })?;
737
738 let opt = match option {
739 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
740 column_name: f.column_name,
741 },
742 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
743 column_name: i.column_name,
744 },
745 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
746 column_name: s.column_name,
747 },
748 };
749
750 Ok(opt)
751 }
752}
753
754impl AlterKind {
755 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
759 match self {
760 AlterKind::AddColumns { columns } => {
761 for col_to_add in columns {
762 col_to_add.validate(metadata)?;
763 }
764 }
765 AlterKind::DropColumns { names } => {
766 for name in names {
767 Self::validate_column_to_drop(name, metadata)?;
768 }
769 }
770 AlterKind::ModifyColumnTypes { columns } => {
771 for col_to_change in columns {
772 col_to_change.validate(metadata)?;
773 }
774 }
775 AlterKind::SetRegionOptions { .. } => {}
776 AlterKind::UnsetRegionOptions { .. } => {}
777 AlterKind::SetIndexes { options } => {
778 for option in options {
779 Self::validate_column_alter_index_option(
780 option.column_name(),
781 metadata,
782 option.is_fulltext(),
783 )?;
784 }
785 }
786 AlterKind::UnsetIndexes { options } => {
787 for option in options {
788 Self::validate_column_alter_index_option(
789 option.column_name(),
790 metadata,
791 option.is_fulltext(),
792 )?;
793 }
794 }
795 AlterKind::DropDefaults { names } => {
796 names
797 .iter()
798 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
799 }
800 AlterKind::SetDefaults { columns } => {
801 columns
802 .iter()
803 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
804 }
805 AlterKind::SyncColumns { column_metadatas } => {
806 let new_primary_keys = column_metadatas
807 .iter()
808 .filter(|c| c.semantic_type == SemanticType::Tag)
809 .map(|c| (c.column_schema.name.as_str(), c.column_id))
810 .collect::<HashMap<_, _>>();
811
812 let old_primary_keys = metadata
813 .column_metadatas
814 .iter()
815 .filter(|c| c.semantic_type == SemanticType::Tag)
816 .map(|c| (c.column_schema.name.as_str(), c.column_id));
817
818 for (name, id) in old_primary_keys {
819 let primary_key =
820 new_primary_keys
821 .get(name)
822 .with_context(|| InvalidRegionRequestSnafu {
823 region_id: metadata.region_id,
824 err: format!("column {} is not a primary key", name),
825 })?;
826
827 ensure!(
828 *primary_key == id,
829 InvalidRegionRequestSnafu {
830 region_id: metadata.region_id,
831 err: format!(
832 "column with same name {} has different id, existing: {}, got: {}",
833 name, id, primary_key
834 ),
835 }
836 );
837 }
838
839 let new_ts_column = column_metadatas
840 .iter()
841 .find(|c| c.semantic_type == SemanticType::Timestamp)
842 .map(|c| (c.column_schema.name.as_str(), c.column_id))
843 .context(InvalidRegionRequestSnafu {
844 region_id: metadata.region_id,
845 err: "timestamp column not found",
846 })?;
847
848 let old_ts_column = metadata
850 .column_metadatas
851 .iter()
852 .find(|c| c.semantic_type == SemanticType::Timestamp)
853 .map(|c| (c.column_schema.name.as_str(), c.column_id))
854 .unwrap();
855
856 ensure!(
857 new_ts_column == old_ts_column,
858 InvalidRegionRequestSnafu {
859 region_id: metadata.region_id,
860 err: format!(
861 "timestamp column {} has different id, existing: {}, got: {}",
862 old_ts_column.0, old_ts_column.1, new_ts_column.1
863 ),
864 }
865 );
866 }
867 }
868 Ok(())
869 }
870
871 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
873 debug_assert!(self.validate(metadata).is_ok());
874 match self {
875 AlterKind::AddColumns { columns } => columns
876 .iter()
877 .any(|col_to_add| col_to_add.need_alter(metadata)),
878 AlterKind::DropColumns { names } => names
879 .iter()
880 .any(|name| metadata.column_by_name(name).is_some()),
881 AlterKind::ModifyColumnTypes { columns } => columns
882 .iter()
883 .any(|col_to_change| col_to_change.need_alter(metadata)),
884 AlterKind::SetRegionOptions { .. } => true,
885 AlterKind::UnsetRegionOptions { .. } => true,
886 AlterKind::SetIndexes { options, .. } => options
887 .iter()
888 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
889 AlterKind::UnsetIndexes { options } => options
890 .iter()
891 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
892 AlterKind::DropDefaults { names } => names
893 .iter()
894 .any(|name| metadata.column_by_name(name).is_some()),
895
896 AlterKind::SetDefaults { columns } => columns
897 .iter()
898 .any(|x| metadata.column_by_name(&x.name).is_some()),
899 AlterKind::SyncColumns { column_metadatas } => {
900 metadata.column_metadatas != *column_metadatas
901 }
902 }
903 }
904
905 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
907 let Some(column) = metadata.column_by_name(name) else {
908 return Ok(());
909 };
910 ensure!(
911 column.semantic_type == SemanticType::Field,
912 InvalidRegionRequestSnafu {
913 region_id: metadata.region_id,
914 err: format!("column {} is not a field and could not be dropped", name),
915 }
916 );
917 Ok(())
918 }
919
920 fn validate_column_alter_index_option(
922 column_name: &String,
923 metadata: &RegionMetadata,
924 is_fulltext: bool,
925 ) -> Result<()> {
926 let column = metadata
927 .column_by_name(column_name)
928 .context(InvalidRegionRequestSnafu {
929 region_id: metadata.region_id,
930 err: format!("column {} not found", column_name),
931 })?;
932
933 if is_fulltext {
934 ensure!(
935 column.column_schema.data_type.is_string(),
936 InvalidRegionRequestSnafu {
937 region_id: metadata.region_id,
938 err: format!(
939 "cannot change alter index options for non-string column {}",
940 column_name
941 ),
942 }
943 );
944 }
945
946 Ok(())
947 }
948
949 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
951 metadata
952 .column_by_name(column_name)
953 .context(InvalidRegionRequestSnafu {
954 region_id: metadata.region_id,
955 err: format!("column {} not found", column_name),
956 })?;
957
958 Ok(())
959 }
960}
961
962impl TryFrom<alter_request::Kind> for AlterKind {
963 type Error = MetadataError;
964
965 fn try_from(kind: alter_request::Kind) -> Result<Self> {
966 let alter_kind = match kind {
967 alter_request::Kind::AddColumns(x) => {
968 let columns = x
969 .add_columns
970 .into_iter()
971 .map(|x| x.try_into())
972 .collect::<Result<Vec<_>>>()?;
973 AlterKind::AddColumns { columns }
974 }
975 alter_request::Kind::ModifyColumnTypes(x) => {
976 let columns = x
977 .modify_column_types
978 .into_iter()
979 .map(|x| x.into())
980 .collect::<Vec<_>>();
981 AlterKind::ModifyColumnTypes { columns }
982 }
983 alter_request::Kind::DropColumns(x) => {
984 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
985 AlterKind::DropColumns { names }
986 }
987 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
988 options: options
989 .table_options
990 .iter()
991 .map(TryFrom::try_from)
992 .collect::<Result<Vec<_>>>()?,
993 },
994 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
995 keys: options
996 .keys
997 .iter()
998 .map(|key| UnsetRegionOption::try_from(key.as_str()))
999 .collect::<Result<Vec<_>>>()?,
1000 },
1001 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1002 options: vec![SetIndexOption::try_from(o)?],
1003 },
1004 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1005 options: vec![UnsetIndexOption::try_from(o)?],
1006 },
1007 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1008 options: o
1009 .set_indexes
1010 .into_iter()
1011 .map(SetIndexOption::try_from)
1012 .collect::<Result<Vec<_>>>()?,
1013 },
1014 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1015 options: o
1016 .unset_indexes
1017 .into_iter()
1018 .map(UnsetIndexOption::try_from)
1019 .collect::<Result<Vec<_>>>()?,
1020 },
1021 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1022 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1023 },
1024 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1025 columns: x
1026 .set_defaults
1027 .into_iter()
1028 .map(|x| {
1029 Ok(SetDefault {
1030 name: x.column_name,
1031 default_constraint: x.default_constraint.clone(),
1032 })
1033 })
1034 .collect::<Result<Vec<_>>>()?,
1035 },
1036 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1037 column_metadatas: x
1038 .column_defs
1039 .into_iter()
1040 .map(ColumnMetadata::try_from_column_def)
1041 .collect::<Result<Vec<_>>>()?,
1042 },
1043 };
1044
1045 Ok(alter_kind)
1046 }
1047}
1048
1049#[derive(Debug, PartialEq, Eq, Clone)]
1051pub struct AddColumn {
1052 pub column_metadata: ColumnMetadata,
1054 pub location: Option<AddColumnLocation>,
1057}
1058
1059impl AddColumn {
1060 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1065 ensure!(
1066 self.column_metadata.column_schema.is_nullable()
1067 || self
1068 .column_metadata
1069 .column_schema
1070 .default_constraint()
1071 .is_some(),
1072 InvalidRegionRequestSnafu {
1073 region_id: metadata.region_id,
1074 err: format!(
1075 "no default value for column {}",
1076 self.column_metadata.column_schema.name
1077 ),
1078 }
1079 );
1080
1081 if let Some(existing_column) =
1082 metadata.column_by_name(&self.column_metadata.column_schema.name)
1083 {
1084 ensure!(
1086 *existing_column == self.column_metadata,
1087 InvalidRegionRequestSnafu {
1088 region_id: metadata.region_id,
1089 err: format!(
1090 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1091 self.column_metadata.column_schema.name,
1092 existing_column,
1093 self.column_metadata,
1094 ),
1095 }
1096 );
1097 ensure!(
1098 self.location.is_none(),
1099 InvalidRegionRequestSnafu {
1100 region_id: metadata.region_id,
1101 err: format!(
1102 "column {} already exists, but location is specified",
1103 self.column_metadata.column_schema.name
1104 ),
1105 }
1106 );
1107 }
1108
1109 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1110 ensure!(
1112 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1113 InvalidRegionRequestSnafu {
1114 region_id: metadata.region_id,
1115 err: format!(
1116 "column id {} already exists with different name {}",
1117 self.column_metadata.column_id, existing_column.column_schema.name
1118 ),
1119 }
1120 );
1121 }
1122
1123 Ok(())
1124 }
1125
1126 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1128 debug_assert!(self.validate(metadata).is_ok());
1129 metadata
1130 .column_by_name(&self.column_metadata.column_schema.name)
1131 .is_none()
1132 }
1133}
1134
1135impl TryFrom<v1::region::AddColumn> for AddColumn {
1136 type Error = MetadataError;
1137
1138 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1139 let column_def = add_column
1140 .column_def
1141 .context(InvalidRawRegionRequestSnafu {
1142 err: "missing column_def in AddColumn",
1143 })?;
1144
1145 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1146 let location = add_column
1147 .location
1148 .map(AddColumnLocation::try_from)
1149 .transpose()?;
1150
1151 Ok(AddColumn {
1152 column_metadata,
1153 location,
1154 })
1155 }
1156}
1157
1158#[derive(Debug, PartialEq, Eq, Clone)]
1160pub enum AddColumnLocation {
1161 First,
1163 After {
1165 column_name: String,
1167 },
1168}
1169
1170impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1171 type Error = MetadataError;
1172
1173 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1174 let location_type = LocationType::try_from(location.location_type)
1175 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1176 let add_column_location = match location_type {
1177 LocationType::First => AddColumnLocation::First,
1178 LocationType::After => AddColumnLocation::After {
1179 column_name: location.after_column_name,
1180 },
1181 };
1182
1183 Ok(add_column_location)
1184 }
1185}
1186
1187#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct ModifyColumnType {
1190 pub column_name: String,
1192 pub target_type: ConcreteDataType,
1194}
1195
1196impl ModifyColumnType {
1197 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1199 let column_meta = metadata
1200 .column_by_name(&self.column_name)
1201 .with_context(|| InvalidRegionRequestSnafu {
1202 region_id: metadata.region_id,
1203 err: format!("column {} not found", self.column_name),
1204 })?;
1205
1206 ensure!(
1207 matches!(column_meta.semantic_type, SemanticType::Field),
1208 InvalidRegionRequestSnafu {
1209 region_id: metadata.region_id,
1210 err: "'timestamp' or 'tag' column cannot change type".to_string()
1211 }
1212 );
1213 ensure!(
1214 column_meta
1215 .column_schema
1216 .data_type
1217 .can_arrow_type_cast_to(&self.target_type),
1218 InvalidRegionRequestSnafu {
1219 region_id: metadata.region_id,
1220 err: format!(
1221 "column '{}' cannot be cast automatically to type '{}'",
1222 self.column_name, self.target_type
1223 ),
1224 }
1225 );
1226
1227 Ok(())
1228 }
1229
1230 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1232 debug_assert!(self.validate(metadata).is_ok());
1233 metadata.column_by_name(&self.column_name).is_some()
1234 }
1235}
1236
1237impl From<v1::ModifyColumnType> for ModifyColumnType {
1238 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1239 let target_type = ColumnDataTypeWrapper::new(
1240 modify_column_type.target_type(),
1241 modify_column_type.target_type_extension,
1242 )
1243 .into();
1244
1245 ModifyColumnType {
1246 column_name: modify_column_type.column_name,
1247 target_type,
1248 }
1249 }
1250}
1251
1252#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1253pub enum SetRegionOption {
1254 Ttl(Option<TimeToLive>),
1255 Twsc(String, String),
1257 Format(String),
1259}
1260
1261impl TryFrom<&PbOption> for SetRegionOption {
1262 type Error = MetadataError;
1263
1264 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1265 let PbOption { key, value } = value;
1266 match key.as_str() {
1267 TTL_KEY => {
1268 let ttl = TimeToLive::from_humantime_or_str(value)
1269 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1270
1271 Ok(Self::Ttl(Some(ttl)))
1272 }
1273 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1274 Ok(Self::Twsc(key.clone(), value.clone()))
1275 }
1276 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1277 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1278 }
1279 }
1280}
1281
1282impl From<&UnsetRegionOption> for SetRegionOption {
1283 fn from(unset_option: &UnsetRegionOption) -> Self {
1284 match unset_option {
1285 UnsetRegionOption::TwcsTriggerFileNum => {
1286 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1287 }
1288 UnsetRegionOption::TwcsMaxOutputFileSize => {
1289 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1290 }
1291 UnsetRegionOption::TwcsTimeWindow => {
1292 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1293 }
1294 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1295 }
1296 }
1297}
1298
1299impl TryFrom<&str> for UnsetRegionOption {
1300 type Error = MetadataError;
1301
1302 fn try_from(key: &str) -> Result<Self> {
1303 match key.to_ascii_lowercase().as_str() {
1304 TTL_KEY => Ok(Self::Ttl),
1305 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1306 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1307 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1308 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1309 }
1310 }
1311}
1312
1313#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1314pub enum UnsetRegionOption {
1315 TwcsTriggerFileNum,
1316 TwcsMaxOutputFileSize,
1317 TwcsTimeWindow,
1318 Ttl,
1319}
1320
1321impl UnsetRegionOption {
1322 pub fn as_str(&self) -> &str {
1323 match self {
1324 Self::Ttl => TTL_KEY,
1325 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1326 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1327 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1328 }
1329 }
1330}
1331
1332impl Display for UnsetRegionOption {
1333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1334 write!(f, "{}", self.as_str())
1335 }
1336}
1337
1338#[derive(Debug, Clone, Default)]
1339pub struct RegionFlushRequest {
1340 pub row_group_size: Option<usize>,
1341}
1342
1343#[derive(Debug)]
1344pub struct RegionCompactRequest {
1345 pub options: compact_request::Options,
1346 pub parallelism: Option<u32>,
1347}
1348
1349impl Default for RegionCompactRequest {
1350 fn default() -> Self {
1351 Self {
1352 options: compact_request::Options::Regular(Default::default()),
1354 parallelism: None,
1355 }
1356 }
1357}
1358
1359#[derive(Debug, Clone, Default)]
1360pub struct RegionBuildIndexRequest {}
1361
1362#[derive(Debug)]
1364pub enum RegionTruncateRequest {
1365 All,
1367 ByTimeRanges {
1368 time_ranges: Vec<(Timestamp, Timestamp)>,
1372 },
1373}
1374
1375#[derive(Debug, Clone, Copy, Default)]
1380pub struct RegionCatchupRequest {
1381 pub set_writable: bool,
1383 pub entry_id: Option<entry::Id>,
1386 pub metadata_entry_id: Option<entry::Id>,
1390 pub location_id: Option<u64>,
1392 pub checkpoint: Option<ReplayCheckpoint>,
1394}
1395
1396#[derive(Debug, Clone)]
1397pub struct RegionBulkInsertsRequest {
1398 pub region_id: RegionId,
1399 pub payload: DfRecordBatch,
1400 pub raw_data: ArrowIpc,
1401}
1402
1403impl RegionBulkInsertsRequest {
1404 pub fn estimated_size(&self) -> usize {
1405 self.payload.get_array_memory_size()
1406 }
1407}
1408
1409impl fmt::Display for RegionRequest {
1410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1411 match self {
1412 RegionRequest::Put(_) => write!(f, "Put"),
1413 RegionRequest::Delete(_) => write!(f, "Delete"),
1414 RegionRequest::Create(_) => write!(f, "Create"),
1415 RegionRequest::Drop(_) => write!(f, "Drop"),
1416 RegionRequest::Open(_) => write!(f, "Open"),
1417 RegionRequest::Close(_) => write!(f, "Close"),
1418 RegionRequest::Alter(_) => write!(f, "Alter"),
1419 RegionRequest::Flush(_) => write!(f, "Flush"),
1420 RegionRequest::Compact(_) => write!(f, "Compact"),
1421 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1422 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1423 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1424 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1425 }
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431
1432 use api::v1::region::RegionColumnDef;
1433 use api::v1::{ColumnDataType, ColumnDef};
1434 use datatypes::prelude::ConcreteDataType;
1435 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1436
1437 use super::*;
1438 use crate::metadata::RegionMetadataBuilder;
1439
1440 #[test]
1441 fn test_from_proto_location() {
1442 let proto_location = v1::AddColumnLocation {
1443 location_type: LocationType::First as i32,
1444 after_column_name: String::default(),
1445 };
1446 let location = AddColumnLocation::try_from(proto_location).unwrap();
1447 assert_eq!(location, AddColumnLocation::First);
1448
1449 let proto_location = v1::AddColumnLocation {
1450 location_type: 10,
1451 after_column_name: String::default(),
1452 };
1453 AddColumnLocation::try_from(proto_location).unwrap_err();
1454
1455 let proto_location = v1::AddColumnLocation {
1456 location_type: LocationType::After as i32,
1457 after_column_name: "a".to_string(),
1458 };
1459 let location = AddColumnLocation::try_from(proto_location).unwrap();
1460 assert_eq!(
1461 location,
1462 AddColumnLocation::After {
1463 column_name: "a".to_string()
1464 }
1465 );
1466 }
1467
1468 #[test]
1469 fn test_from_none_proto_add_column() {
1470 AddColumn::try_from(v1::region::AddColumn {
1471 column_def: None,
1472 location: None,
1473 })
1474 .unwrap_err();
1475 }
1476
1477 #[test]
1478 fn test_from_proto_alter_request() {
1479 RegionAlterRequest::try_from(AlterRequest {
1480 region_id: 0,
1481 schema_version: 1,
1482 kind: None,
1483 })
1484 .unwrap_err();
1485
1486 let request = RegionAlterRequest::try_from(AlterRequest {
1487 region_id: 0,
1488 schema_version: 1,
1489 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1490 add_columns: vec![v1::region::AddColumn {
1491 column_def: Some(RegionColumnDef {
1492 column_def: Some(ColumnDef {
1493 name: "a".to_string(),
1494 data_type: ColumnDataType::String as i32,
1495 is_nullable: true,
1496 default_constraint: vec![],
1497 semantic_type: SemanticType::Field as i32,
1498 comment: String::new(),
1499 ..Default::default()
1500 }),
1501 column_id: 1,
1502 }),
1503 location: Some(v1::AddColumnLocation {
1504 location_type: LocationType::First as i32,
1505 after_column_name: String::default(),
1506 }),
1507 }],
1508 })),
1509 })
1510 .unwrap();
1511
1512 assert_eq!(
1513 request,
1514 RegionAlterRequest {
1515 kind: AlterKind::AddColumns {
1516 columns: vec![AddColumn {
1517 column_metadata: ColumnMetadata {
1518 column_schema: ColumnSchema::new(
1519 "a",
1520 ConcreteDataType::string_datatype(),
1521 true,
1522 ),
1523 semantic_type: SemanticType::Field,
1524 column_id: 1,
1525 },
1526 location: Some(AddColumnLocation::First),
1527 }]
1528 },
1529 }
1530 );
1531 }
1532
1533 fn new_metadata() -> RegionMetadata {
1536 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1537 builder
1538 .push_column_metadata(ColumnMetadata {
1539 column_schema: ColumnSchema::new(
1540 "ts",
1541 ConcreteDataType::timestamp_millisecond_datatype(),
1542 false,
1543 ),
1544 semantic_type: SemanticType::Timestamp,
1545 column_id: 1,
1546 })
1547 .push_column_metadata(ColumnMetadata {
1548 column_schema: ColumnSchema::new(
1549 "tag_0",
1550 ConcreteDataType::string_datatype(),
1551 true,
1552 ),
1553 semantic_type: SemanticType::Tag,
1554 column_id: 2,
1555 })
1556 .push_column_metadata(ColumnMetadata {
1557 column_schema: ColumnSchema::new(
1558 "field_0",
1559 ConcreteDataType::string_datatype(),
1560 true,
1561 ),
1562 semantic_type: SemanticType::Field,
1563 column_id: 3,
1564 })
1565 .push_column_metadata(ColumnMetadata {
1566 column_schema: ColumnSchema::new(
1567 "field_1",
1568 ConcreteDataType::boolean_datatype(),
1569 true,
1570 ),
1571 semantic_type: SemanticType::Field,
1572 column_id: 4,
1573 })
1574 .primary_key(vec![2]);
1575 builder.build().unwrap()
1576 }
1577
1578 #[test]
1579 fn test_add_column_validate() {
1580 let metadata = new_metadata();
1581 let add_column = AddColumn {
1582 column_metadata: ColumnMetadata {
1583 column_schema: ColumnSchema::new(
1584 "tag_1",
1585 ConcreteDataType::string_datatype(),
1586 true,
1587 ),
1588 semantic_type: SemanticType::Tag,
1589 column_id: 5,
1590 },
1591 location: None,
1592 };
1593 add_column.validate(&metadata).unwrap();
1594 assert!(add_column.need_alter(&metadata));
1595
1596 AddColumn {
1598 column_metadata: ColumnMetadata {
1599 column_schema: ColumnSchema::new(
1600 "tag_1",
1601 ConcreteDataType::string_datatype(),
1602 false,
1603 ),
1604 semantic_type: SemanticType::Tag,
1605 column_id: 5,
1606 },
1607 location: None,
1608 }
1609 .validate(&metadata)
1610 .unwrap_err();
1611
1612 let add_column = AddColumn {
1614 column_metadata: ColumnMetadata {
1615 column_schema: ColumnSchema::new(
1616 "tag_0",
1617 ConcreteDataType::string_datatype(),
1618 true,
1619 ),
1620 semantic_type: SemanticType::Tag,
1621 column_id: 2,
1622 },
1623 location: None,
1624 };
1625 add_column.validate(&metadata).unwrap();
1626 assert!(!add_column.need_alter(&metadata));
1627 }
1628
1629 #[test]
1630 fn test_add_duplicate_columns() {
1631 let kind = AlterKind::AddColumns {
1632 columns: vec![
1633 AddColumn {
1634 column_metadata: ColumnMetadata {
1635 column_schema: ColumnSchema::new(
1636 "tag_1",
1637 ConcreteDataType::string_datatype(),
1638 true,
1639 ),
1640 semantic_type: SemanticType::Tag,
1641 column_id: 5,
1642 },
1643 location: None,
1644 },
1645 AddColumn {
1646 column_metadata: ColumnMetadata {
1647 column_schema: ColumnSchema::new(
1648 "tag_1",
1649 ConcreteDataType::string_datatype(),
1650 true,
1651 ),
1652 semantic_type: SemanticType::Field,
1653 column_id: 6,
1654 },
1655 location: None,
1656 },
1657 ],
1658 };
1659 let metadata = new_metadata();
1660 kind.validate(&metadata).unwrap();
1661 assert!(kind.need_alter(&metadata));
1662 }
1663
1664 #[test]
1665 fn test_add_existing_column_different_metadata() {
1666 let metadata = new_metadata();
1667
1668 let kind = AlterKind::AddColumns {
1670 columns: vec![AddColumn {
1671 column_metadata: ColumnMetadata {
1672 column_schema: ColumnSchema::new(
1673 "tag_0",
1674 ConcreteDataType::string_datatype(),
1675 true,
1676 ),
1677 semantic_type: SemanticType::Tag,
1678 column_id: 4,
1679 },
1680 location: None,
1681 }],
1682 };
1683 kind.validate(&metadata).unwrap_err();
1684
1685 let kind = AlterKind::AddColumns {
1687 columns: vec![AddColumn {
1688 column_metadata: ColumnMetadata {
1689 column_schema: ColumnSchema::new(
1690 "tag_0",
1691 ConcreteDataType::int64_datatype(),
1692 true,
1693 ),
1694 semantic_type: SemanticType::Tag,
1695 column_id: 2,
1696 },
1697 location: None,
1698 }],
1699 };
1700 kind.validate(&metadata).unwrap_err();
1701
1702 let kind = AlterKind::AddColumns {
1704 columns: vec![AddColumn {
1705 column_metadata: ColumnMetadata {
1706 column_schema: ColumnSchema::new(
1707 "tag_1",
1708 ConcreteDataType::string_datatype(),
1709 true,
1710 ),
1711 semantic_type: SemanticType::Tag,
1712 column_id: 2,
1713 },
1714 location: None,
1715 }],
1716 };
1717 kind.validate(&metadata).unwrap_err();
1718 }
1719
1720 #[test]
1721 fn test_add_existing_column_with_location() {
1722 let metadata = new_metadata();
1723 let kind = AlterKind::AddColumns {
1724 columns: vec![AddColumn {
1725 column_metadata: ColumnMetadata {
1726 column_schema: ColumnSchema::new(
1727 "tag_0",
1728 ConcreteDataType::string_datatype(),
1729 true,
1730 ),
1731 semantic_type: SemanticType::Tag,
1732 column_id: 2,
1733 },
1734 location: Some(AddColumnLocation::First),
1735 }],
1736 };
1737 kind.validate(&metadata).unwrap_err();
1738 }
1739
1740 #[test]
1741 fn test_validate_drop_column() {
1742 let metadata = new_metadata();
1743 let kind = AlterKind::DropColumns {
1744 names: vec!["xxxx".to_string()],
1745 };
1746 kind.validate(&metadata).unwrap();
1747 assert!(!kind.need_alter(&metadata));
1748
1749 AlterKind::DropColumns {
1750 names: vec!["tag_0".to_string()],
1751 }
1752 .validate(&metadata)
1753 .unwrap_err();
1754
1755 let kind = AlterKind::DropColumns {
1756 names: vec!["field_0".to_string()],
1757 };
1758 kind.validate(&metadata).unwrap();
1759 assert!(kind.need_alter(&metadata));
1760 }
1761
1762 #[test]
1763 fn test_validate_modify_column_type() {
1764 let metadata = new_metadata();
1765 AlterKind::ModifyColumnTypes {
1766 columns: vec![ModifyColumnType {
1767 column_name: "xxxx".to_string(),
1768 target_type: ConcreteDataType::string_datatype(),
1769 }],
1770 }
1771 .validate(&metadata)
1772 .unwrap_err();
1773
1774 AlterKind::ModifyColumnTypes {
1775 columns: vec![ModifyColumnType {
1776 column_name: "field_1".to_string(),
1777 target_type: ConcreteDataType::date_datatype(),
1778 }],
1779 }
1780 .validate(&metadata)
1781 .unwrap_err();
1782
1783 AlterKind::ModifyColumnTypes {
1784 columns: vec![ModifyColumnType {
1785 column_name: "ts".to_string(),
1786 target_type: ConcreteDataType::date_datatype(),
1787 }],
1788 }
1789 .validate(&metadata)
1790 .unwrap_err();
1791
1792 AlterKind::ModifyColumnTypes {
1793 columns: vec![ModifyColumnType {
1794 column_name: "tag_0".to_string(),
1795 target_type: ConcreteDataType::date_datatype(),
1796 }],
1797 }
1798 .validate(&metadata)
1799 .unwrap_err();
1800
1801 let kind = AlterKind::ModifyColumnTypes {
1802 columns: vec![ModifyColumnType {
1803 column_name: "field_0".to_string(),
1804 target_type: ConcreteDataType::int32_datatype(),
1805 }],
1806 };
1807 kind.validate(&metadata).unwrap();
1808 assert!(kind.need_alter(&metadata));
1809 }
1810
1811 #[test]
1812 fn test_validate_add_columns() {
1813 let kind = AlterKind::AddColumns {
1814 columns: vec![
1815 AddColumn {
1816 column_metadata: ColumnMetadata {
1817 column_schema: ColumnSchema::new(
1818 "tag_1",
1819 ConcreteDataType::string_datatype(),
1820 true,
1821 ),
1822 semantic_type: SemanticType::Tag,
1823 column_id: 5,
1824 },
1825 location: None,
1826 },
1827 AddColumn {
1828 column_metadata: ColumnMetadata {
1829 column_schema: ColumnSchema::new(
1830 "field_2",
1831 ConcreteDataType::string_datatype(),
1832 true,
1833 ),
1834 semantic_type: SemanticType::Field,
1835 column_id: 6,
1836 },
1837 location: None,
1838 },
1839 ],
1840 };
1841 let request = RegionAlterRequest { kind };
1842 let mut metadata = new_metadata();
1843 metadata.schema_version = 1;
1844 request.validate(&metadata).unwrap();
1845 }
1846
1847 #[test]
1848 fn test_validate_create_region() {
1849 let column_metadatas = vec![
1850 ColumnMetadata {
1851 column_schema: ColumnSchema::new(
1852 "ts",
1853 ConcreteDataType::timestamp_millisecond_datatype(),
1854 false,
1855 ),
1856 semantic_type: SemanticType::Timestamp,
1857 column_id: 1,
1858 },
1859 ColumnMetadata {
1860 column_schema: ColumnSchema::new(
1861 "tag_0",
1862 ConcreteDataType::string_datatype(),
1863 true,
1864 ),
1865 semantic_type: SemanticType::Tag,
1866 column_id: 2,
1867 },
1868 ColumnMetadata {
1869 column_schema: ColumnSchema::new(
1870 "field_0",
1871 ConcreteDataType::string_datatype(),
1872 true,
1873 ),
1874 semantic_type: SemanticType::Field,
1875 column_id: 3,
1876 },
1877 ];
1878 let create = RegionCreateRequest {
1879 engine: "mito".to_string(),
1880 column_metadatas,
1881 primary_key: vec![3, 4],
1882 options: HashMap::new(),
1883 table_dir: "path".to_string(),
1884 path_type: PathType::Bare,
1885 partition_expr_json: Some("".to_string()),
1886 };
1887
1888 assert!(create.validate().is_err());
1889 }
1890
1891 #[test]
1892 fn test_validate_modify_column_fulltext_options() {
1893 let kind = AlterKind::SetIndexes {
1894 options: vec![SetIndexOption::Fulltext {
1895 column_name: "tag_0".to_string(),
1896 options: FulltextOptions::new_unchecked(
1897 true,
1898 FulltextAnalyzer::Chinese,
1899 false,
1900 FulltextBackend::Bloom,
1901 1000,
1902 0.01,
1903 ),
1904 }],
1905 };
1906 let request = RegionAlterRequest { kind };
1907 let mut metadata = new_metadata();
1908 metadata.schema_version = 1;
1909 request.validate(&metadata).unwrap();
1910
1911 let kind = AlterKind::UnsetIndexes {
1912 options: vec![UnsetIndexOption::Fulltext {
1913 column_name: "tag_0".to_string(),
1914 }],
1915 };
1916 let request = RegionAlterRequest { kind };
1917 let mut metadata = new_metadata();
1918 metadata.schema_version = 1;
1919 request.validate(&metadata).unwrap();
1920 }
1921
1922 #[test]
1923 fn test_validate_sync_columns() {
1924 let metadata = new_metadata();
1925 let kind = AlterKind::SyncColumns {
1926 column_metadatas: vec![
1927 ColumnMetadata {
1928 column_schema: ColumnSchema::new(
1929 "tag_1",
1930 ConcreteDataType::string_datatype(),
1931 true,
1932 ),
1933 semantic_type: SemanticType::Tag,
1934 column_id: 5,
1935 },
1936 ColumnMetadata {
1937 column_schema: ColumnSchema::new(
1938 "field_2",
1939 ConcreteDataType::string_datatype(),
1940 true,
1941 ),
1942 semantic_type: SemanticType::Field,
1943 column_id: 6,
1944 },
1945 ],
1946 };
1947 let err = kind.validate(&metadata).unwrap_err();
1948 assert!(err.to_string().contains("not a primary key"));
1949
1950 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1952 let ts_column = column_metadatas_with_different_ts_column
1953 .iter_mut()
1954 .find(|c| c.semantic_type == SemanticType::Timestamp)
1955 .unwrap();
1956 ts_column.column_schema.name = "ts1".to_string();
1957
1958 let kind = AlterKind::SyncColumns {
1959 column_metadatas: column_metadatas_with_different_ts_column,
1960 };
1961 let err = kind.validate(&metadata).unwrap_err();
1962 assert!(
1963 err.to_string()
1964 .contains("timestamp column ts has different id")
1965 );
1966
1967 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1969 let pk_column = column_metadatas_with_different_pk_column
1970 .iter_mut()
1971 .find(|c| c.column_schema.name == "tag_0")
1972 .unwrap();
1973 pk_column.column_id = 100;
1974 let kind = AlterKind::SyncColumns {
1975 column_metadatas: column_metadatas_with_different_pk_column,
1976 };
1977 let err = kind.validate(&metadata).unwrap_err();
1978 assert!(
1979 err.to_string()
1980 .contains("column with same name tag_0 has different id")
1981 );
1982
1983 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1985 column_metadatas_with_new_field_column.push(ColumnMetadata {
1986 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1987 semantic_type: SemanticType::Field,
1988 column_id: 4,
1989 });
1990 let kind = AlterKind::SyncColumns {
1991 column_metadatas: column_metadatas_with_new_field_column,
1992 };
1993 kind.validate(&metadata).unwrap();
1994 }
1995
1996 #[test]
1997 fn test_cast_path_type_to_primitive() {
1998 assert_eq!(PathType::Bare as u8, 0);
1999 assert_eq!(PathType::Data as u8, 1);
2000 assert_eq!(PathType::Metadata as u8, 2);
2001 assert_eq!(
2002 PathType::try_from(PathType::Bare as u8).unwrap(),
2003 PathType::Bare
2004 );
2005 assert_eq!(
2006 PathType::try_from(PathType::Data as u8).unwrap(),
2007 PathType::Data
2008 );
2009 assert_eq!(
2010 PathType::try_from(PathType::Metadata as u8).unwrap(),
2011 PathType::Metadata
2012 );
2013 }
2014}