1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64 Bare,
68 Data,
72 Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80 Create(Vec<(RegionId, RegionCreateRequest)>),
81 Drop(Vec<(RegionId, RegionDropRequest)>),
82 Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88 match body {
89 region_request::Body::Creates(creates) => {
90 let requests = creates
91 .requests
92 .into_iter()
93 .map(parse_region_create)
94 .collect::<Result<Vec<_>>>()?;
95 Ok(Some(Self::Create(requests)))
96 }
97 region_request::Body::Drops(drops) => {
98 let requests = drops
99 .requests
100 .into_iter()
101 .map(parse_region_drop)
102 .collect::<Result<Vec<_>>>()?;
103 Ok(Some(Self::Drop(requests)))
104 }
105 region_request::Body::Alters(alters) => {
106 let requests = alters
107 .requests
108 .into_iter()
109 .map(parse_region_alter)
110 .collect::<Result<Vec<_>>>()?;
111 Ok(Some(Self::Alter(requests)))
112 }
113 _ => Ok(None),
114 }
115 }
116
117 pub fn request_type(&self) -> &'static str {
118 self.into()
119 }
120
121 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122 match self {
123 Self::Create(requests) => requests
124 .into_iter()
125 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126 .collect(),
127 Self::Drop(requests) => requests
128 .into_iter()
129 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130 .collect(),
131 Self::Alter(requests) => requests
132 .into_iter()
133 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134 .collect(),
135 }
136 }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141 Put(RegionPutRequest),
142 Delete(RegionDeleteRequest),
143 Create(RegionCreateRequest),
144 Drop(RegionDropRequest),
145 Open(RegionOpenRequest),
146 Close(RegionCloseRequest),
147 Alter(RegionAlterRequest),
148 Flush(RegionFlushRequest),
149 Compact(RegionCompactRequest),
150 BuildIndex(RegionBuildIndexRequest),
151 Truncate(RegionTruncateRequest),
152 Catchup(RegionCatchupRequest),
153 BulkInserts(RegionBulkInsertsRequest),
154 EnterStaging(EnterStagingRequest),
155}
156
157impl RegionRequest {
158 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
161 match body {
162 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
163 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
164 region_request::Body::Create(create) => make_region_create(create),
165 region_request::Body::Drop(drop) => make_region_drop(drop),
166 region_request::Body::Open(open) => make_region_open(open),
167 region_request::Body::Close(close) => make_region_close(close),
168 region_request::Body::Alter(alter) => make_region_alter(alter),
169 region_request::Body::Flush(flush) => make_region_flush(flush),
170 region_request::Body::Compact(compact) => make_region_compact(compact),
171 region_request::Body::BuildIndex(index) => make_region_build_index(index),
172 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
173 region_request::Body::Creates(creates) => make_region_creates(creates),
174 region_request::Body::Drops(drops) => make_region_drops(drops),
175 region_request::Body::Alters(alters) => make_region_alters(alters),
176 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
177 region_request::Body::Sync(_) => UnexpectedSnafu {
178 reason: "Sync request should be handled separately by RegionServer",
179 }
180 .fail(),
181 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
182 reason: "ListMetadata request should be handled separately by RegionServer",
183 }
184 .fail(),
185 }
186 }
187
188 pub fn request_type(&self) -> &'static str {
190 self.into()
191 }
192}
193
194fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
195 let requests = inserts
196 .requests
197 .into_iter()
198 .filter_map(|r| {
199 let region_id = r.region_id.into();
200 r.rows.map(|rows| {
201 (
202 region_id,
203 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
204 )
205 })
206 })
207 .collect();
208 Ok(requests)
209}
210
211fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
212 let requests = deletes
213 .requests
214 .into_iter()
215 .filter_map(|r| {
216 let region_id = r.region_id.into();
217 r.rows.map(|rows| {
218 (
219 region_id,
220 RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
221 )
222 })
223 })
224 .collect();
225 Ok(requests)
226}
227
228fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
229 let column_metadatas = create
230 .column_defs
231 .into_iter()
232 .map(ColumnMetadata::try_from_column_def)
233 .collect::<Result<Vec<_>>>()?;
234 let region_id = RegionId::from(create.region_id);
235 let table_dir = table_dir(&create.path, region_id.table_id());
236 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
237 Ok((
238 region_id,
239 RegionCreateRequest {
240 engine: create.engine,
241 column_metadatas,
242 primary_key: create.primary_key,
243 options: create.options,
244 table_dir,
245 path_type: PathType::Bare,
246 partition_expr_json,
247 },
248 ))
249}
250
251fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
252 let (region_id, request) = parse_region_create(create)?;
253 Ok(vec![(region_id, RegionRequest::Create(request))])
254}
255
256fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
257 let mut requests = Vec::with_capacity(creates.requests.len());
258 for create in creates.requests {
259 requests.extend(make_region_create(create)?);
260 }
261 Ok(requests)
262}
263
264fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
265 let region_id = drop.region_id.into();
266 Ok((
267 region_id,
268 RegionDropRequest {
269 fast_path: drop.fast_path,
270 },
271 ))
272}
273
274fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
275 let (region_id, request) = parse_region_drop(drop)?;
276 Ok(vec![(region_id, RegionRequest::Drop(request))])
277}
278
279fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
280 let mut requests = Vec::with_capacity(drops.requests.len());
281 for drop in drops.requests {
282 requests.extend(make_region_drop(drop)?);
283 }
284 Ok(requests)
285}
286
287fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
288 let region_id = RegionId::from(open.region_id);
289 let table_dir = table_dir(&open.path, region_id.table_id());
290 Ok(vec![(
291 region_id,
292 RegionRequest::Open(RegionOpenRequest {
293 engine: open.engine,
294 table_dir,
295 path_type: PathType::Bare,
296 options: open.options,
297 skip_wal_replay: false,
298 checkpoint: None,
299 }),
300 )])
301}
302
303fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
304 let region_id = close.region_id.into();
305 Ok(vec![(
306 region_id,
307 RegionRequest::Close(RegionCloseRequest {}),
308 )])
309}
310
311fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
312 let region_id = alter.region_id.into();
313 let request = RegionAlterRequest::try_from(alter)?;
314 Ok((region_id, request))
315}
316
317fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
318 let (region_id, request) = parse_region_alter(alter)?;
319 Ok(vec![(region_id, RegionRequest::Alter(request))])
320}
321
322fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
323 let mut requests = Vec::with_capacity(alters.requests.len());
324 for alter in alters.requests {
325 requests.extend(make_region_alter(alter)?);
326 }
327 Ok(requests)
328}
329
330fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
331 let region_id = flush.region_id.into();
332 Ok(vec![(
333 region_id,
334 RegionRequest::Flush(RegionFlushRequest {
335 row_group_size: None,
336 }),
337 )])
338}
339
340fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
341 let region_id = compact.region_id.into();
342 let options = compact
343 .options
344 .unwrap_or(compact_request::Options::Regular(Default::default()));
345 let parallelism = if compact.parallelism == 0 {
347 None
348 } else {
349 Some(compact.parallelism)
350 };
351 Ok(vec![(
352 region_id,
353 RegionRequest::Compact(RegionCompactRequest {
354 options,
355 parallelism,
356 }),
357 )])
358}
359
360fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
361 let region_id = index.region_id.into();
362 Ok(vec![(
363 region_id,
364 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
365 )])
366}
367
368fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
369 let region_id = truncate.region_id.into();
370 match truncate.kind {
371 None => InvalidRawRegionRequestSnafu {
372 err: "missing kind in TruncateRequest".to_string(),
373 }
374 .fail(),
375 Some(truncate_request::Kind::All(_)) => Ok(vec![(
376 region_id,
377 RegionRequest::Truncate(RegionTruncateRequest::All),
378 )]),
379 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
380 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
381
382 Ok(vec![(
383 region_id,
384 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
385 )])
386 }
387 }
388}
389
390fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
392 let region_id = request.region_id.into();
393 let Some(Body::ArrowIpc(request)) = request.body else {
394 return Ok(vec![]);
395 };
396
397 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
398 .with_label_values(&["decode"])
399 .start_timer();
400 let mut decoder =
401 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
402 let payload = decoder
403 .try_decode_record_batch(&request.data_header, &request.payload)
404 .context(FlightCodecSnafu)?;
405 decoder_timer.observe_duration();
406 Ok(vec![(
407 region_id,
408 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
409 region_id,
410 payload,
411 raw_data: request,
412 }),
413 )])
414}
415
416#[derive(Debug)]
418pub struct RegionPutRequest {
419 pub rows: Rows,
421 pub hint: Option<WriteHint>,
423}
424
425#[derive(Debug)]
426pub struct RegionReadRequest {
427 pub request: ScanRequest,
428}
429
430#[derive(Debug)]
432pub struct RegionDeleteRequest {
433 pub rows: Rows,
437 pub hint: Option<WriteHint>,
439}
440
441#[derive(Debug, Clone)]
442pub struct RegionCreateRequest {
443 pub engine: String,
445 pub column_metadatas: Vec<ColumnMetadata>,
447 pub primary_key: Vec<ColumnId>,
449 pub options: HashMap<String, String>,
451 pub table_dir: String,
453 pub path_type: PathType,
455 pub partition_expr_json: Option<String>,
458}
459
460impl RegionCreateRequest {
461 pub fn validate(&self) -> Result<()> {
463 ensure!(
465 self.column_metadatas
466 .iter()
467 .any(|x| x.semantic_type == SemanticType::Timestamp),
468 InvalidRegionRequestSnafu {
469 region_id: RegionId::new(0, 0),
470 err: "missing timestamp column in create region request".to_string(),
471 }
472 );
473
474 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
476 for (i, c) in self.column_metadatas.iter().enumerate() {
477 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
478 return InvalidRegionRequestSnafu {
479 region_id: RegionId::new(0, 0),
480 err: format!(
481 "duplicate column id {} (at position {} and {}) in create region request",
482 c.column_id, previous, i
483 ),
484 }
485 .fail();
486 }
487 }
488
489 for column_id in &self.primary_key {
491 ensure!(
492 column_id_to_indices.contains_key(column_id),
493 InvalidRegionRequestSnafu {
494 region_id: RegionId::new(0, 0),
495 err: format!(
496 "missing primary key column {} in create region request",
497 column_id
498 ),
499 }
500 );
501 }
502
503 Ok(())
504 }
505
506 pub fn is_physical_table(&self) -> bool {
508 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
509 }
510}
511
512#[derive(Debug, Clone)]
513pub struct RegionDropRequest {
514 pub fast_path: bool,
515}
516
517#[derive(Debug, Clone)]
519pub struct RegionOpenRequest {
520 pub engine: String,
522 pub table_dir: String,
524 pub path_type: PathType,
526 pub options: HashMap<String, String>,
528 pub skip_wal_replay: bool,
530 pub checkpoint: Option<ReplayCheckpoint>,
532}
533
534#[derive(Debug, Clone, Copy, PartialEq, Eq)]
535pub struct ReplayCheckpoint {
536 pub entry_id: u64,
537 pub metadata_entry_id: Option<u64>,
538}
539
540impl RegionOpenRequest {
541 pub fn is_physical_table(&self) -> bool {
543 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
544 }
545}
546
547#[derive(Debug)]
549pub struct RegionCloseRequest {}
550
551#[derive(Debug, PartialEq, Eq, Clone)]
553pub struct RegionAlterRequest {
554 pub kind: AlterKind,
556}
557
558impl RegionAlterRequest {
559 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
561 self.kind.validate(metadata)?;
562
563 Ok(())
564 }
565
566 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
570 debug_assert!(self.validate(metadata).is_ok());
571 self.kind.need_alter(metadata)
572 }
573}
574
575impl TryFrom<AlterRequest> for RegionAlterRequest {
576 type Error = MetadataError;
577
578 fn try_from(value: AlterRequest) -> Result<Self> {
579 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
580 err: "missing kind in AlterRequest",
581 })?;
582
583 let kind = AlterKind::try_from(kind)?;
584 Ok(RegionAlterRequest { kind })
585 }
586}
587
588#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
590pub enum AlterKind {
591 AddColumns {
593 columns: Vec<AddColumn>,
595 },
596 DropColumns {
598 names: Vec<String>,
600 },
601 ModifyColumnTypes {
603 columns: Vec<ModifyColumnType>,
605 },
606 SetRegionOptions { options: Vec<SetRegionOption> },
608 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
610 SetIndexes { options: Vec<SetIndexOption> },
612 UnsetIndexes { options: Vec<UnsetIndexOption> },
614 DropDefaults {
616 names: Vec<String>,
618 },
619 SetDefaults {
621 columns: Vec<SetDefault>,
623 },
624 SyncColumns {
626 column_metadatas: Vec<ColumnMetadata>,
627 },
628}
629#[derive(Debug, PartialEq, Eq, Clone)]
630pub struct SetDefault {
631 pub name: String,
632 pub default_constraint: Vec<u8>,
633}
634
635#[derive(Debug, PartialEq, Eq, Clone)]
636pub enum SetIndexOption {
637 Fulltext {
638 column_name: String,
639 options: FulltextOptions,
640 },
641 Inverted {
642 column_name: String,
643 },
644 Skipping {
645 column_name: String,
646 options: SkippingIndexOptions,
647 },
648}
649
650impl SetIndexOption {
651 pub fn column_name(&self) -> &String {
653 match self {
654 SetIndexOption::Fulltext { column_name, .. } => column_name,
655 SetIndexOption::Inverted { column_name } => column_name,
656 SetIndexOption::Skipping { column_name, .. } => column_name,
657 }
658 }
659
660 pub fn is_fulltext(&self) -> bool {
662 match self {
663 SetIndexOption::Fulltext { .. } => true,
664 SetIndexOption::Inverted { .. } => false,
665 SetIndexOption::Skipping { .. } => false,
666 }
667 }
668}
669
670impl TryFrom<v1::SetIndex> for SetIndexOption {
671 type Error = MetadataError;
672
673 fn try_from(value: v1::SetIndex) -> Result<Self> {
674 let option = value.options.context(InvalidRawRegionRequestSnafu {
675 err: "missing options in SetIndex",
676 })?;
677
678 let opt = match option {
679 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
680 column_name: x.column_name.clone(),
681 options: FulltextOptions::new(
682 x.enable,
683 as_fulltext_option_analyzer(
684 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
685 ),
686 x.case_sensitive,
687 as_fulltext_option_backend(
688 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
689 ),
690 x.granularity as u32,
691 x.false_positive_rate,
692 )
693 .context(InvalidIndexOptionSnafu)?,
694 },
695 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
696 column_name: i.column_name,
697 },
698 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
699 column_name: s.column_name,
700 options: SkippingIndexOptions::new(
701 s.granularity as u32,
702 s.false_positive_rate,
703 as_skipping_index_type(
704 PbSkippingIndexType::try_from(s.skipping_index_type)
705 .context(DecodeProtoSnafu)?,
706 ),
707 )
708 .context(InvalidIndexOptionSnafu)?,
709 },
710 };
711
712 Ok(opt)
713 }
714}
715
716#[derive(Debug, PartialEq, Eq, Clone)]
717pub enum UnsetIndexOption {
718 Fulltext { column_name: String },
719 Inverted { column_name: String },
720 Skipping { column_name: String },
721}
722
723impl UnsetIndexOption {
724 pub fn column_name(&self) -> &String {
725 match self {
726 UnsetIndexOption::Fulltext { column_name } => column_name,
727 UnsetIndexOption::Inverted { column_name } => column_name,
728 UnsetIndexOption::Skipping { column_name } => column_name,
729 }
730 }
731
732 pub fn is_fulltext(&self) -> bool {
733 match self {
734 UnsetIndexOption::Fulltext { .. } => true,
735 UnsetIndexOption::Inverted { .. } => false,
736 UnsetIndexOption::Skipping { .. } => false,
737 }
738 }
739}
740
741impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
742 type Error = MetadataError;
743
744 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
745 let option = value.options.context(InvalidRawRegionRequestSnafu {
746 err: "missing options in UnsetIndex",
747 })?;
748
749 let opt = match option {
750 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
751 column_name: f.column_name,
752 },
753 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
754 column_name: i.column_name,
755 },
756 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
757 column_name: s.column_name,
758 },
759 };
760
761 Ok(opt)
762 }
763}
764
765impl AlterKind {
766 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
770 match self {
771 AlterKind::AddColumns { columns } => {
772 for col_to_add in columns {
773 col_to_add.validate(metadata)?;
774 }
775 }
776 AlterKind::DropColumns { names } => {
777 for name in names {
778 Self::validate_column_to_drop(name, metadata)?;
779 }
780 }
781 AlterKind::ModifyColumnTypes { columns } => {
782 for col_to_change in columns {
783 col_to_change.validate(metadata)?;
784 }
785 }
786 AlterKind::SetRegionOptions { .. } => {}
787 AlterKind::UnsetRegionOptions { .. } => {}
788 AlterKind::SetIndexes { options } => {
789 for option in options {
790 Self::validate_column_alter_index_option(
791 option.column_name(),
792 metadata,
793 option.is_fulltext(),
794 )?;
795 }
796 }
797 AlterKind::UnsetIndexes { options } => {
798 for option in options {
799 Self::validate_column_alter_index_option(
800 option.column_name(),
801 metadata,
802 option.is_fulltext(),
803 )?;
804 }
805 }
806 AlterKind::DropDefaults { names } => {
807 names
808 .iter()
809 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
810 }
811 AlterKind::SetDefaults { columns } => {
812 columns
813 .iter()
814 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
815 }
816 AlterKind::SyncColumns { column_metadatas } => {
817 let new_primary_keys = column_metadatas
818 .iter()
819 .filter(|c| c.semantic_type == SemanticType::Tag)
820 .map(|c| (c.column_schema.name.as_str(), c.column_id))
821 .collect::<HashMap<_, _>>();
822
823 let old_primary_keys = metadata
824 .column_metadatas
825 .iter()
826 .filter(|c| c.semantic_type == SemanticType::Tag)
827 .map(|c| (c.column_schema.name.as_str(), c.column_id));
828
829 for (name, id) in old_primary_keys {
830 let primary_key =
831 new_primary_keys
832 .get(name)
833 .with_context(|| InvalidRegionRequestSnafu {
834 region_id: metadata.region_id,
835 err: format!("column {} is not a primary key", name),
836 })?;
837
838 ensure!(
839 *primary_key == id,
840 InvalidRegionRequestSnafu {
841 region_id: metadata.region_id,
842 err: format!(
843 "column with same name {} has different id, existing: {}, got: {}",
844 name, id, primary_key
845 ),
846 }
847 );
848 }
849
850 let new_ts_column = column_metadatas
851 .iter()
852 .find(|c| c.semantic_type == SemanticType::Timestamp)
853 .map(|c| (c.column_schema.name.as_str(), c.column_id))
854 .context(InvalidRegionRequestSnafu {
855 region_id: metadata.region_id,
856 err: "timestamp column not found",
857 })?;
858
859 let old_ts_column = metadata
861 .column_metadatas
862 .iter()
863 .find(|c| c.semantic_type == SemanticType::Timestamp)
864 .map(|c| (c.column_schema.name.as_str(), c.column_id))
865 .unwrap();
866
867 ensure!(
868 new_ts_column == old_ts_column,
869 InvalidRegionRequestSnafu {
870 region_id: metadata.region_id,
871 err: format!(
872 "timestamp column {} has different id, existing: {}, got: {}",
873 old_ts_column.0, old_ts_column.1, new_ts_column.1
874 ),
875 }
876 );
877 }
878 }
879 Ok(())
880 }
881
882 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
884 debug_assert!(self.validate(metadata).is_ok());
885 match self {
886 AlterKind::AddColumns { columns } => columns
887 .iter()
888 .any(|col_to_add| col_to_add.need_alter(metadata)),
889 AlterKind::DropColumns { names } => names
890 .iter()
891 .any(|name| metadata.column_by_name(name).is_some()),
892 AlterKind::ModifyColumnTypes { columns } => columns
893 .iter()
894 .any(|col_to_change| col_to_change.need_alter(metadata)),
895 AlterKind::SetRegionOptions { .. } => true,
896 AlterKind::UnsetRegionOptions { .. } => true,
897 AlterKind::SetIndexes { options, .. } => options
898 .iter()
899 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
900 AlterKind::UnsetIndexes { options } => options
901 .iter()
902 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
903 AlterKind::DropDefaults { names } => names
904 .iter()
905 .any(|name| metadata.column_by_name(name).is_some()),
906
907 AlterKind::SetDefaults { columns } => columns
908 .iter()
909 .any(|x| metadata.column_by_name(&x.name).is_some()),
910 AlterKind::SyncColumns { column_metadatas } => {
911 metadata.column_metadatas != *column_metadatas
912 }
913 }
914 }
915
916 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
918 let Some(column) = metadata.column_by_name(name) else {
919 return Ok(());
920 };
921 ensure!(
922 column.semantic_type == SemanticType::Field,
923 InvalidRegionRequestSnafu {
924 region_id: metadata.region_id,
925 err: format!("column {} is not a field and could not be dropped", name),
926 }
927 );
928 Ok(())
929 }
930
931 fn validate_column_alter_index_option(
933 column_name: &String,
934 metadata: &RegionMetadata,
935 is_fulltext: bool,
936 ) -> Result<()> {
937 let column = metadata
938 .column_by_name(column_name)
939 .context(InvalidRegionRequestSnafu {
940 region_id: metadata.region_id,
941 err: format!("column {} not found", column_name),
942 })?;
943
944 if is_fulltext {
945 ensure!(
946 column.column_schema.data_type.is_string(),
947 InvalidRegionRequestSnafu {
948 region_id: metadata.region_id,
949 err: format!(
950 "cannot change alter index options for non-string column {}",
951 column_name
952 ),
953 }
954 );
955 }
956
957 Ok(())
958 }
959
960 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
962 metadata
963 .column_by_name(column_name)
964 .context(InvalidRegionRequestSnafu {
965 region_id: metadata.region_id,
966 err: format!("column {} not found", column_name),
967 })?;
968
969 Ok(())
970 }
971}
972
973impl TryFrom<alter_request::Kind> for AlterKind {
974 type Error = MetadataError;
975
976 fn try_from(kind: alter_request::Kind) -> Result<Self> {
977 let alter_kind = match kind {
978 alter_request::Kind::AddColumns(x) => {
979 let columns = x
980 .add_columns
981 .into_iter()
982 .map(|x| x.try_into())
983 .collect::<Result<Vec<_>>>()?;
984 AlterKind::AddColumns { columns }
985 }
986 alter_request::Kind::ModifyColumnTypes(x) => {
987 let columns = x
988 .modify_column_types
989 .into_iter()
990 .map(|x| x.into())
991 .collect::<Vec<_>>();
992 AlterKind::ModifyColumnTypes { columns }
993 }
994 alter_request::Kind::DropColumns(x) => {
995 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
996 AlterKind::DropColumns { names }
997 }
998 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
999 options: options
1000 .table_options
1001 .iter()
1002 .map(TryFrom::try_from)
1003 .collect::<Result<Vec<_>>>()?,
1004 },
1005 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1006 keys: options
1007 .keys
1008 .iter()
1009 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1010 .collect::<Result<Vec<_>>>()?,
1011 },
1012 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1013 options: vec![SetIndexOption::try_from(o)?],
1014 },
1015 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1016 options: vec![UnsetIndexOption::try_from(o)?],
1017 },
1018 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1019 options: o
1020 .set_indexes
1021 .into_iter()
1022 .map(SetIndexOption::try_from)
1023 .collect::<Result<Vec<_>>>()?,
1024 },
1025 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1026 options: o
1027 .unset_indexes
1028 .into_iter()
1029 .map(UnsetIndexOption::try_from)
1030 .collect::<Result<Vec<_>>>()?,
1031 },
1032 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1033 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1034 },
1035 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1036 columns: x
1037 .set_defaults
1038 .into_iter()
1039 .map(|x| {
1040 Ok(SetDefault {
1041 name: x.column_name,
1042 default_constraint: x.default_constraint.clone(),
1043 })
1044 })
1045 .collect::<Result<Vec<_>>>()?,
1046 },
1047 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1048 column_metadatas: x
1049 .column_defs
1050 .into_iter()
1051 .map(ColumnMetadata::try_from_column_def)
1052 .collect::<Result<Vec<_>>>()?,
1053 },
1054 };
1055
1056 Ok(alter_kind)
1057 }
1058}
1059
1060#[derive(Debug, PartialEq, Eq, Clone)]
1062pub struct AddColumn {
1063 pub column_metadata: ColumnMetadata,
1065 pub location: Option<AddColumnLocation>,
1068}
1069
1070impl AddColumn {
1071 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1076 ensure!(
1077 self.column_metadata.column_schema.is_nullable()
1078 || self
1079 .column_metadata
1080 .column_schema
1081 .default_constraint()
1082 .is_some(),
1083 InvalidRegionRequestSnafu {
1084 region_id: metadata.region_id,
1085 err: format!(
1086 "no default value for column {}",
1087 self.column_metadata.column_schema.name
1088 ),
1089 }
1090 );
1091
1092 if let Some(existing_column) =
1093 metadata.column_by_name(&self.column_metadata.column_schema.name)
1094 {
1095 ensure!(
1097 *existing_column == self.column_metadata,
1098 InvalidRegionRequestSnafu {
1099 region_id: metadata.region_id,
1100 err: format!(
1101 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1102 self.column_metadata.column_schema.name,
1103 existing_column,
1104 self.column_metadata,
1105 ),
1106 }
1107 );
1108 ensure!(
1109 self.location.is_none(),
1110 InvalidRegionRequestSnafu {
1111 region_id: metadata.region_id,
1112 err: format!(
1113 "column {} already exists, but location is specified",
1114 self.column_metadata.column_schema.name
1115 ),
1116 }
1117 );
1118 }
1119
1120 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1121 ensure!(
1123 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1124 InvalidRegionRequestSnafu {
1125 region_id: metadata.region_id,
1126 err: format!(
1127 "column id {} already exists with different name {}",
1128 self.column_metadata.column_id, existing_column.column_schema.name
1129 ),
1130 }
1131 );
1132 }
1133
1134 Ok(())
1135 }
1136
1137 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1139 debug_assert!(self.validate(metadata).is_ok());
1140 metadata
1141 .column_by_name(&self.column_metadata.column_schema.name)
1142 .is_none()
1143 }
1144}
1145
1146impl TryFrom<v1::region::AddColumn> for AddColumn {
1147 type Error = MetadataError;
1148
1149 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1150 let column_def = add_column
1151 .column_def
1152 .context(InvalidRawRegionRequestSnafu {
1153 err: "missing column_def in AddColumn",
1154 })?;
1155
1156 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1157 let location = add_column
1158 .location
1159 .map(AddColumnLocation::try_from)
1160 .transpose()?;
1161
1162 Ok(AddColumn {
1163 column_metadata,
1164 location,
1165 })
1166 }
1167}
1168
1169#[derive(Debug, PartialEq, Eq, Clone)]
1171pub enum AddColumnLocation {
1172 First,
1174 After {
1176 column_name: String,
1178 },
1179}
1180
1181impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1182 type Error = MetadataError;
1183
1184 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1185 let location_type = LocationType::try_from(location.location_type)
1186 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1187 let add_column_location = match location_type {
1188 LocationType::First => AddColumnLocation::First,
1189 LocationType::After => AddColumnLocation::After {
1190 column_name: location.after_column_name,
1191 },
1192 };
1193
1194 Ok(add_column_location)
1195 }
1196}
1197
1198#[derive(Debug, PartialEq, Eq, Clone)]
1200pub struct ModifyColumnType {
1201 pub column_name: String,
1203 pub target_type: ConcreteDataType,
1205}
1206
1207impl ModifyColumnType {
1208 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1210 let column_meta = metadata
1211 .column_by_name(&self.column_name)
1212 .with_context(|| InvalidRegionRequestSnafu {
1213 region_id: metadata.region_id,
1214 err: format!("column {} not found", self.column_name),
1215 })?;
1216
1217 ensure!(
1218 matches!(column_meta.semantic_type, SemanticType::Field),
1219 InvalidRegionRequestSnafu {
1220 region_id: metadata.region_id,
1221 err: "'timestamp' or 'tag' column cannot change type".to_string()
1222 }
1223 );
1224 ensure!(
1225 column_meta
1226 .column_schema
1227 .data_type
1228 .can_arrow_type_cast_to(&self.target_type),
1229 InvalidRegionRequestSnafu {
1230 region_id: metadata.region_id,
1231 err: format!(
1232 "column '{}' cannot be cast automatically to type '{}'",
1233 self.column_name, self.target_type
1234 ),
1235 }
1236 );
1237
1238 Ok(())
1239 }
1240
1241 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1243 debug_assert!(self.validate(metadata).is_ok());
1244 metadata.column_by_name(&self.column_name).is_some()
1245 }
1246}
1247
1248impl From<v1::ModifyColumnType> for ModifyColumnType {
1249 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1250 let target_type = ColumnDataTypeWrapper::new(
1251 modify_column_type.target_type(),
1252 modify_column_type.target_type_extension,
1253 )
1254 .into();
1255
1256 ModifyColumnType {
1257 column_name: modify_column_type.column_name,
1258 target_type,
1259 }
1260 }
1261}
1262
1263#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1264pub enum SetRegionOption {
1265 Ttl(Option<TimeToLive>),
1266 Twsc(String, String),
1268 Format(String),
1270}
1271
1272impl TryFrom<&PbOption> for SetRegionOption {
1273 type Error = MetadataError;
1274
1275 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1276 let PbOption { key, value } = value;
1277 match key.as_str() {
1278 TTL_KEY => {
1279 let ttl = TimeToLive::from_humantime_or_str(value)
1280 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1281
1282 Ok(Self::Ttl(Some(ttl)))
1283 }
1284 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1285 Ok(Self::Twsc(key.clone(), value.clone()))
1286 }
1287 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1288 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1289 }
1290 }
1291}
1292
1293impl From<&UnsetRegionOption> for SetRegionOption {
1294 fn from(unset_option: &UnsetRegionOption) -> Self {
1295 match unset_option {
1296 UnsetRegionOption::TwcsTriggerFileNum => {
1297 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1298 }
1299 UnsetRegionOption::TwcsMaxOutputFileSize => {
1300 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1301 }
1302 UnsetRegionOption::TwcsTimeWindow => {
1303 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1304 }
1305 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1306 }
1307 }
1308}
1309
1310impl TryFrom<&str> for UnsetRegionOption {
1311 type Error = MetadataError;
1312
1313 fn try_from(key: &str) -> Result<Self> {
1314 match key.to_ascii_lowercase().as_str() {
1315 TTL_KEY => Ok(Self::Ttl),
1316 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1317 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1318 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1319 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1320 }
1321 }
1322}
1323
1324#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1325pub enum UnsetRegionOption {
1326 TwcsTriggerFileNum,
1327 TwcsMaxOutputFileSize,
1328 TwcsTimeWindow,
1329 Ttl,
1330}
1331
1332impl UnsetRegionOption {
1333 pub fn as_str(&self) -> &str {
1334 match self {
1335 Self::Ttl => TTL_KEY,
1336 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1337 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1338 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1339 }
1340 }
1341}
1342
1343impl Display for UnsetRegionOption {
1344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1345 write!(f, "{}", self.as_str())
1346 }
1347}
1348
1349#[derive(Debug, Clone, Default)]
1350pub struct RegionFlushRequest {
1351 pub row_group_size: Option<usize>,
1352}
1353
1354#[derive(Debug)]
1355pub struct RegionCompactRequest {
1356 pub options: compact_request::Options,
1357 pub parallelism: Option<u32>,
1358}
1359
1360impl Default for RegionCompactRequest {
1361 fn default() -> Self {
1362 Self {
1363 options: compact_request::Options::Regular(Default::default()),
1365 parallelism: None,
1366 }
1367 }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371pub struct RegionBuildIndexRequest {}
1372
1373#[derive(Debug)]
1375pub enum RegionTruncateRequest {
1376 All,
1378 ByTimeRanges {
1379 time_ranges: Vec<(Timestamp, Timestamp)>,
1383 },
1384}
1385
1386#[derive(Debug, Clone, Copy, Default)]
1391pub struct RegionCatchupRequest {
1392 pub set_writable: bool,
1394 pub entry_id: Option<entry::Id>,
1397 pub metadata_entry_id: Option<entry::Id>,
1401 pub location_id: Option<u64>,
1403 pub checkpoint: Option<ReplayCheckpoint>,
1405}
1406
1407#[derive(Debug, Clone)]
1408pub struct RegionBulkInsertsRequest {
1409 pub region_id: RegionId,
1410 pub payload: DfRecordBatch,
1411 pub raw_data: ArrowIpc,
1412}
1413
1414impl RegionBulkInsertsRequest {
1415 pub fn estimated_size(&self) -> usize {
1416 self.payload.get_array_memory_size()
1417 }
1418}
1419
1420#[derive(Debug, Clone)]
1426pub struct EnterStagingRequest {
1427 pub partition_expr: String,
1429}
1430
1431impl fmt::Display for RegionRequest {
1432 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1433 match self {
1434 RegionRequest::Put(_) => write!(f, "Put"),
1435 RegionRequest::Delete(_) => write!(f, "Delete"),
1436 RegionRequest::Create(_) => write!(f, "Create"),
1437 RegionRequest::Drop(_) => write!(f, "Drop"),
1438 RegionRequest::Open(_) => write!(f, "Open"),
1439 RegionRequest::Close(_) => write!(f, "Close"),
1440 RegionRequest::Alter(_) => write!(f, "Alter"),
1441 RegionRequest::Flush(_) => write!(f, "Flush"),
1442 RegionRequest::Compact(_) => write!(f, "Compact"),
1443 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1444 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1445 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1446 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1447 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1448 }
1449 }
1450}
1451
1452#[cfg(test)]
1453mod tests {
1454
1455 use api::v1::region::RegionColumnDef;
1456 use api::v1::{ColumnDataType, ColumnDef};
1457 use datatypes::prelude::ConcreteDataType;
1458 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1459
1460 use super::*;
1461 use crate::metadata::RegionMetadataBuilder;
1462
1463 #[test]
1464 fn test_from_proto_location() {
1465 let proto_location = v1::AddColumnLocation {
1466 location_type: LocationType::First as i32,
1467 after_column_name: String::default(),
1468 };
1469 let location = AddColumnLocation::try_from(proto_location).unwrap();
1470 assert_eq!(location, AddColumnLocation::First);
1471
1472 let proto_location = v1::AddColumnLocation {
1473 location_type: 10,
1474 after_column_name: String::default(),
1475 };
1476 AddColumnLocation::try_from(proto_location).unwrap_err();
1477
1478 let proto_location = v1::AddColumnLocation {
1479 location_type: LocationType::After as i32,
1480 after_column_name: "a".to_string(),
1481 };
1482 let location = AddColumnLocation::try_from(proto_location).unwrap();
1483 assert_eq!(
1484 location,
1485 AddColumnLocation::After {
1486 column_name: "a".to_string()
1487 }
1488 );
1489 }
1490
1491 #[test]
1492 fn test_from_none_proto_add_column() {
1493 AddColumn::try_from(v1::region::AddColumn {
1494 column_def: None,
1495 location: None,
1496 })
1497 .unwrap_err();
1498 }
1499
1500 #[test]
1501 fn test_from_proto_alter_request() {
1502 RegionAlterRequest::try_from(AlterRequest {
1503 region_id: 0,
1504 schema_version: 1,
1505 kind: None,
1506 })
1507 .unwrap_err();
1508
1509 let request = RegionAlterRequest::try_from(AlterRequest {
1510 region_id: 0,
1511 schema_version: 1,
1512 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1513 add_columns: vec![v1::region::AddColumn {
1514 column_def: Some(RegionColumnDef {
1515 column_def: Some(ColumnDef {
1516 name: "a".to_string(),
1517 data_type: ColumnDataType::String as i32,
1518 is_nullable: true,
1519 default_constraint: vec![],
1520 semantic_type: SemanticType::Field as i32,
1521 comment: String::new(),
1522 ..Default::default()
1523 }),
1524 column_id: 1,
1525 }),
1526 location: Some(v1::AddColumnLocation {
1527 location_type: LocationType::First as i32,
1528 after_column_name: String::default(),
1529 }),
1530 }],
1531 })),
1532 })
1533 .unwrap();
1534
1535 assert_eq!(
1536 request,
1537 RegionAlterRequest {
1538 kind: AlterKind::AddColumns {
1539 columns: vec![AddColumn {
1540 column_metadata: ColumnMetadata {
1541 column_schema: ColumnSchema::new(
1542 "a",
1543 ConcreteDataType::string_datatype(),
1544 true,
1545 ),
1546 semantic_type: SemanticType::Field,
1547 column_id: 1,
1548 },
1549 location: Some(AddColumnLocation::First),
1550 }]
1551 },
1552 }
1553 );
1554 }
1555
1556 fn new_metadata() -> RegionMetadata {
1559 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1560 builder
1561 .push_column_metadata(ColumnMetadata {
1562 column_schema: ColumnSchema::new(
1563 "ts",
1564 ConcreteDataType::timestamp_millisecond_datatype(),
1565 false,
1566 ),
1567 semantic_type: SemanticType::Timestamp,
1568 column_id: 1,
1569 })
1570 .push_column_metadata(ColumnMetadata {
1571 column_schema: ColumnSchema::new(
1572 "tag_0",
1573 ConcreteDataType::string_datatype(),
1574 true,
1575 ),
1576 semantic_type: SemanticType::Tag,
1577 column_id: 2,
1578 })
1579 .push_column_metadata(ColumnMetadata {
1580 column_schema: ColumnSchema::new(
1581 "field_0",
1582 ConcreteDataType::string_datatype(),
1583 true,
1584 ),
1585 semantic_type: SemanticType::Field,
1586 column_id: 3,
1587 })
1588 .push_column_metadata(ColumnMetadata {
1589 column_schema: ColumnSchema::new(
1590 "field_1",
1591 ConcreteDataType::boolean_datatype(),
1592 true,
1593 ),
1594 semantic_type: SemanticType::Field,
1595 column_id: 4,
1596 })
1597 .primary_key(vec![2]);
1598 builder.build().unwrap()
1599 }
1600
1601 #[test]
1602 fn test_add_column_validate() {
1603 let metadata = new_metadata();
1604 let add_column = AddColumn {
1605 column_metadata: ColumnMetadata {
1606 column_schema: ColumnSchema::new(
1607 "tag_1",
1608 ConcreteDataType::string_datatype(),
1609 true,
1610 ),
1611 semantic_type: SemanticType::Tag,
1612 column_id: 5,
1613 },
1614 location: None,
1615 };
1616 add_column.validate(&metadata).unwrap();
1617 assert!(add_column.need_alter(&metadata));
1618
1619 AddColumn {
1621 column_metadata: ColumnMetadata {
1622 column_schema: ColumnSchema::new(
1623 "tag_1",
1624 ConcreteDataType::string_datatype(),
1625 false,
1626 ),
1627 semantic_type: SemanticType::Tag,
1628 column_id: 5,
1629 },
1630 location: None,
1631 }
1632 .validate(&metadata)
1633 .unwrap_err();
1634
1635 let add_column = AddColumn {
1637 column_metadata: ColumnMetadata {
1638 column_schema: ColumnSchema::new(
1639 "tag_0",
1640 ConcreteDataType::string_datatype(),
1641 true,
1642 ),
1643 semantic_type: SemanticType::Tag,
1644 column_id: 2,
1645 },
1646 location: None,
1647 };
1648 add_column.validate(&metadata).unwrap();
1649 assert!(!add_column.need_alter(&metadata));
1650 }
1651
1652 #[test]
1653 fn test_add_duplicate_columns() {
1654 let kind = AlterKind::AddColumns {
1655 columns: vec![
1656 AddColumn {
1657 column_metadata: ColumnMetadata {
1658 column_schema: ColumnSchema::new(
1659 "tag_1",
1660 ConcreteDataType::string_datatype(),
1661 true,
1662 ),
1663 semantic_type: SemanticType::Tag,
1664 column_id: 5,
1665 },
1666 location: None,
1667 },
1668 AddColumn {
1669 column_metadata: ColumnMetadata {
1670 column_schema: ColumnSchema::new(
1671 "tag_1",
1672 ConcreteDataType::string_datatype(),
1673 true,
1674 ),
1675 semantic_type: SemanticType::Field,
1676 column_id: 6,
1677 },
1678 location: None,
1679 },
1680 ],
1681 };
1682 let metadata = new_metadata();
1683 kind.validate(&metadata).unwrap();
1684 assert!(kind.need_alter(&metadata));
1685 }
1686
1687 #[test]
1688 fn test_add_existing_column_different_metadata() {
1689 let metadata = new_metadata();
1690
1691 let kind = AlterKind::AddColumns {
1693 columns: vec![AddColumn {
1694 column_metadata: ColumnMetadata {
1695 column_schema: ColumnSchema::new(
1696 "tag_0",
1697 ConcreteDataType::string_datatype(),
1698 true,
1699 ),
1700 semantic_type: SemanticType::Tag,
1701 column_id: 4,
1702 },
1703 location: None,
1704 }],
1705 };
1706 kind.validate(&metadata).unwrap_err();
1707
1708 let kind = AlterKind::AddColumns {
1710 columns: vec![AddColumn {
1711 column_metadata: ColumnMetadata {
1712 column_schema: ColumnSchema::new(
1713 "tag_0",
1714 ConcreteDataType::int64_datatype(),
1715 true,
1716 ),
1717 semantic_type: SemanticType::Tag,
1718 column_id: 2,
1719 },
1720 location: None,
1721 }],
1722 };
1723 kind.validate(&metadata).unwrap_err();
1724
1725 let kind = AlterKind::AddColumns {
1727 columns: vec![AddColumn {
1728 column_metadata: ColumnMetadata {
1729 column_schema: ColumnSchema::new(
1730 "tag_1",
1731 ConcreteDataType::string_datatype(),
1732 true,
1733 ),
1734 semantic_type: SemanticType::Tag,
1735 column_id: 2,
1736 },
1737 location: None,
1738 }],
1739 };
1740 kind.validate(&metadata).unwrap_err();
1741 }
1742
1743 #[test]
1744 fn test_add_existing_column_with_location() {
1745 let metadata = new_metadata();
1746 let kind = AlterKind::AddColumns {
1747 columns: vec![AddColumn {
1748 column_metadata: ColumnMetadata {
1749 column_schema: ColumnSchema::new(
1750 "tag_0",
1751 ConcreteDataType::string_datatype(),
1752 true,
1753 ),
1754 semantic_type: SemanticType::Tag,
1755 column_id: 2,
1756 },
1757 location: Some(AddColumnLocation::First),
1758 }],
1759 };
1760 kind.validate(&metadata).unwrap_err();
1761 }
1762
1763 #[test]
1764 fn test_validate_drop_column() {
1765 let metadata = new_metadata();
1766 let kind = AlterKind::DropColumns {
1767 names: vec!["xxxx".to_string()],
1768 };
1769 kind.validate(&metadata).unwrap();
1770 assert!(!kind.need_alter(&metadata));
1771
1772 AlterKind::DropColumns {
1773 names: vec!["tag_0".to_string()],
1774 }
1775 .validate(&metadata)
1776 .unwrap_err();
1777
1778 let kind = AlterKind::DropColumns {
1779 names: vec!["field_0".to_string()],
1780 };
1781 kind.validate(&metadata).unwrap();
1782 assert!(kind.need_alter(&metadata));
1783 }
1784
1785 #[test]
1786 fn test_validate_modify_column_type() {
1787 let metadata = new_metadata();
1788 AlterKind::ModifyColumnTypes {
1789 columns: vec![ModifyColumnType {
1790 column_name: "xxxx".to_string(),
1791 target_type: ConcreteDataType::string_datatype(),
1792 }],
1793 }
1794 .validate(&metadata)
1795 .unwrap_err();
1796
1797 AlterKind::ModifyColumnTypes {
1798 columns: vec![ModifyColumnType {
1799 column_name: "field_1".to_string(),
1800 target_type: ConcreteDataType::date_datatype(),
1801 }],
1802 }
1803 .validate(&metadata)
1804 .unwrap_err();
1805
1806 AlterKind::ModifyColumnTypes {
1807 columns: vec![ModifyColumnType {
1808 column_name: "ts".to_string(),
1809 target_type: ConcreteDataType::date_datatype(),
1810 }],
1811 }
1812 .validate(&metadata)
1813 .unwrap_err();
1814
1815 AlterKind::ModifyColumnTypes {
1816 columns: vec![ModifyColumnType {
1817 column_name: "tag_0".to_string(),
1818 target_type: ConcreteDataType::date_datatype(),
1819 }],
1820 }
1821 .validate(&metadata)
1822 .unwrap_err();
1823
1824 let kind = AlterKind::ModifyColumnTypes {
1825 columns: vec![ModifyColumnType {
1826 column_name: "field_0".to_string(),
1827 target_type: ConcreteDataType::int32_datatype(),
1828 }],
1829 };
1830 kind.validate(&metadata).unwrap();
1831 assert!(kind.need_alter(&metadata));
1832 }
1833
1834 #[test]
1835 fn test_validate_add_columns() {
1836 let kind = AlterKind::AddColumns {
1837 columns: vec![
1838 AddColumn {
1839 column_metadata: ColumnMetadata {
1840 column_schema: ColumnSchema::new(
1841 "tag_1",
1842 ConcreteDataType::string_datatype(),
1843 true,
1844 ),
1845 semantic_type: SemanticType::Tag,
1846 column_id: 5,
1847 },
1848 location: None,
1849 },
1850 AddColumn {
1851 column_metadata: ColumnMetadata {
1852 column_schema: ColumnSchema::new(
1853 "field_2",
1854 ConcreteDataType::string_datatype(),
1855 true,
1856 ),
1857 semantic_type: SemanticType::Field,
1858 column_id: 6,
1859 },
1860 location: None,
1861 },
1862 ],
1863 };
1864 let request = RegionAlterRequest { kind };
1865 let mut metadata = new_metadata();
1866 metadata.schema_version = 1;
1867 request.validate(&metadata).unwrap();
1868 }
1869
1870 #[test]
1871 fn test_validate_create_region() {
1872 let column_metadatas = vec![
1873 ColumnMetadata {
1874 column_schema: ColumnSchema::new(
1875 "ts",
1876 ConcreteDataType::timestamp_millisecond_datatype(),
1877 false,
1878 ),
1879 semantic_type: SemanticType::Timestamp,
1880 column_id: 1,
1881 },
1882 ColumnMetadata {
1883 column_schema: ColumnSchema::new(
1884 "tag_0",
1885 ConcreteDataType::string_datatype(),
1886 true,
1887 ),
1888 semantic_type: SemanticType::Tag,
1889 column_id: 2,
1890 },
1891 ColumnMetadata {
1892 column_schema: ColumnSchema::new(
1893 "field_0",
1894 ConcreteDataType::string_datatype(),
1895 true,
1896 ),
1897 semantic_type: SemanticType::Field,
1898 column_id: 3,
1899 },
1900 ];
1901 let create = RegionCreateRequest {
1902 engine: "mito".to_string(),
1903 column_metadatas,
1904 primary_key: vec![3, 4],
1905 options: HashMap::new(),
1906 table_dir: "path".to_string(),
1907 path_type: PathType::Bare,
1908 partition_expr_json: Some("".to_string()),
1909 };
1910
1911 assert!(create.validate().is_err());
1912 }
1913
1914 #[test]
1915 fn test_validate_modify_column_fulltext_options() {
1916 let kind = AlterKind::SetIndexes {
1917 options: vec![SetIndexOption::Fulltext {
1918 column_name: "tag_0".to_string(),
1919 options: FulltextOptions::new_unchecked(
1920 true,
1921 FulltextAnalyzer::Chinese,
1922 false,
1923 FulltextBackend::Bloom,
1924 1000,
1925 0.01,
1926 ),
1927 }],
1928 };
1929 let request = RegionAlterRequest { kind };
1930 let mut metadata = new_metadata();
1931 metadata.schema_version = 1;
1932 request.validate(&metadata).unwrap();
1933
1934 let kind = AlterKind::UnsetIndexes {
1935 options: vec![UnsetIndexOption::Fulltext {
1936 column_name: "tag_0".to_string(),
1937 }],
1938 };
1939 let request = RegionAlterRequest { kind };
1940 let mut metadata = new_metadata();
1941 metadata.schema_version = 1;
1942 request.validate(&metadata).unwrap();
1943 }
1944
1945 #[test]
1946 fn test_validate_sync_columns() {
1947 let metadata = new_metadata();
1948 let kind = AlterKind::SyncColumns {
1949 column_metadatas: vec![
1950 ColumnMetadata {
1951 column_schema: ColumnSchema::new(
1952 "tag_1",
1953 ConcreteDataType::string_datatype(),
1954 true,
1955 ),
1956 semantic_type: SemanticType::Tag,
1957 column_id: 5,
1958 },
1959 ColumnMetadata {
1960 column_schema: ColumnSchema::new(
1961 "field_2",
1962 ConcreteDataType::string_datatype(),
1963 true,
1964 ),
1965 semantic_type: SemanticType::Field,
1966 column_id: 6,
1967 },
1968 ],
1969 };
1970 let err = kind.validate(&metadata).unwrap_err();
1971 assert!(err.to_string().contains("not a primary key"));
1972
1973 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1975 let ts_column = column_metadatas_with_different_ts_column
1976 .iter_mut()
1977 .find(|c| c.semantic_type == SemanticType::Timestamp)
1978 .unwrap();
1979 ts_column.column_schema.name = "ts1".to_string();
1980
1981 let kind = AlterKind::SyncColumns {
1982 column_metadatas: column_metadatas_with_different_ts_column,
1983 };
1984 let err = kind.validate(&metadata).unwrap_err();
1985 assert!(
1986 err.to_string()
1987 .contains("timestamp column ts has different id")
1988 );
1989
1990 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1992 let pk_column = column_metadatas_with_different_pk_column
1993 .iter_mut()
1994 .find(|c| c.column_schema.name == "tag_0")
1995 .unwrap();
1996 pk_column.column_id = 100;
1997 let kind = AlterKind::SyncColumns {
1998 column_metadatas: column_metadatas_with_different_pk_column,
1999 };
2000 let err = kind.validate(&metadata).unwrap_err();
2001 assert!(
2002 err.to_string()
2003 .contains("column with same name tag_0 has different id")
2004 );
2005
2006 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2008 column_metadatas_with_new_field_column.push(ColumnMetadata {
2009 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2010 semantic_type: SemanticType::Field,
2011 column_id: 4,
2012 });
2013 let kind = AlterKind::SyncColumns {
2014 column_metadatas: column_metadatas_with_new_field_column,
2015 };
2016 kind.validate(&metadata).unwrap();
2017 }
2018
2019 #[test]
2020 fn test_cast_path_type_to_primitive() {
2021 assert_eq!(PathType::Bare as u8, 0);
2022 assert_eq!(PathType::Data as u8, 1);
2023 assert_eq!(PathType::Metadata as u8, 2);
2024 assert_eq!(
2025 PathType::try_from(PathType::Bare as u8).unwrap(),
2026 PathType::Bare
2027 );
2028 assert_eq!(
2029 PathType::try_from(PathType::Data as u8).unwrap(),
2030 PathType::Data
2031 );
2032 assert_eq!(
2033 PathType::try_from(PathType::Metadata as u8).unwrap(),
2034 PathType::Metadata
2035 );
2036 }
2037}