1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64 Bare,
68 Data,
72 Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80 Create(Vec<(RegionId, RegionCreateRequest)>),
81 Drop(Vec<(RegionId, RegionDropRequest)>),
82 Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88 match body {
89 region_request::Body::Creates(creates) => {
90 let requests = creates
91 .requests
92 .into_iter()
93 .map(parse_region_create)
94 .collect::<Result<Vec<_>>>()?;
95 Ok(Some(Self::Create(requests)))
96 }
97 region_request::Body::Drops(drops) => {
98 let requests = drops
99 .requests
100 .into_iter()
101 .map(parse_region_drop)
102 .collect::<Result<Vec<_>>>()?;
103 Ok(Some(Self::Drop(requests)))
104 }
105 region_request::Body::Alters(alters) => {
106 let requests = alters
107 .requests
108 .into_iter()
109 .map(parse_region_alter)
110 .collect::<Result<Vec<_>>>()?;
111 Ok(Some(Self::Alter(requests)))
112 }
113 _ => Ok(None),
114 }
115 }
116
117 pub fn request_type(&self) -> &'static str {
118 self.into()
119 }
120
121 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122 match self {
123 Self::Create(requests) => requests
124 .into_iter()
125 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126 .collect(),
127 Self::Drop(requests) => requests
128 .into_iter()
129 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130 .collect(),
131 Self::Alter(requests) => requests
132 .into_iter()
133 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134 .collect(),
135 }
136 }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141 Put(RegionPutRequest),
142 Delete(RegionDeleteRequest),
143 Create(RegionCreateRequest),
144 Drop(RegionDropRequest),
145 Open(RegionOpenRequest),
146 Close(RegionCloseRequest),
147 Alter(RegionAlterRequest),
148 Flush(RegionFlushRequest),
149 Compact(RegionCompactRequest),
150 BuildIndex(RegionBuildIndexRequest),
151 Truncate(RegionTruncateRequest),
152 Catchup(RegionCatchupRequest),
153 BulkInserts(RegionBulkInsertsRequest),
154 EnterStaging(EnterStagingRequest),
155 ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162 match body {
163 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165 region_request::Body::Create(create) => make_region_create(create),
166 region_request::Body::Drop(drop) => make_region_drop(drop),
167 region_request::Body::Open(open) => make_region_open(open),
168 region_request::Body::Close(close) => make_region_close(close),
169 region_request::Body::Alter(alter) => make_region_alter(alter),
170 region_request::Body::Flush(flush) => make_region_flush(flush),
171 region_request::Body::Compact(compact) => make_region_compact(compact),
172 region_request::Body::BuildIndex(index) => make_region_build_index(index),
173 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174 region_request::Body::Creates(creates) => make_region_creates(creates),
175 region_request::Body::Drops(drops) => make_region_drops(drops),
176 region_request::Body::Alters(alters) => make_region_alters(alters),
177 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178 region_request::Body::Sync(_) => UnexpectedSnafu {
179 reason: "Sync request should be handled separately by RegionServer",
180 }
181 .fail(),
182 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183 reason: "ListMetadata request should be handled separately by RegionServer",
184 }
185 .fail(),
186 region_request::Body::ApplyStagingManifest(apply) => {
187 make_region_apply_staging_manifest(apply)
188 }
189 }
190 }
191
192 pub fn request_type(&self) -> &'static str {
194 self.into()
195 }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199 let requests = inserts
200 .requests
201 .into_iter()
202 .filter_map(|r| {
203 let region_id = r.region_id.into();
204 r.rows.map(|rows| {
205 (
206 region_id,
207 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
208 )
209 })
210 })
211 .collect();
212 Ok(requests)
213}
214
215fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
216 let requests = deletes
217 .requests
218 .into_iter()
219 .filter_map(|r| {
220 let region_id = r.region_id.into();
221 r.rows.map(|rows| {
222 (
223 region_id,
224 RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
225 )
226 })
227 })
228 .collect();
229 Ok(requests)
230}
231
232fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
233 let column_metadatas = create
234 .column_defs
235 .into_iter()
236 .map(ColumnMetadata::try_from_column_def)
237 .collect::<Result<Vec<_>>>()?;
238 let region_id = RegionId::from(create.region_id);
239 let table_dir = table_dir(&create.path, region_id.table_id());
240 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
241 Ok((
242 region_id,
243 RegionCreateRequest {
244 engine: create.engine,
245 column_metadatas,
246 primary_key: create.primary_key,
247 options: create.options,
248 table_dir,
249 path_type: PathType::Bare,
250 partition_expr_json,
251 },
252 ))
253}
254
255fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
256 let (region_id, request) = parse_region_create(create)?;
257 Ok(vec![(region_id, RegionRequest::Create(request))])
258}
259
260fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
261 let mut requests = Vec::with_capacity(creates.requests.len());
262 for create in creates.requests {
263 requests.extend(make_region_create(create)?);
264 }
265 Ok(requests)
266}
267
268fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
269 let region_id = drop.region_id.into();
270 Ok((
271 region_id,
272 RegionDropRequest {
273 fast_path: drop.fast_path,
274 force: drop.force,
275 partial_drop: drop.partial_drop,
276 },
277 ))
278}
279
280fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
281 let (region_id, request) = parse_region_drop(drop)?;
282 Ok(vec![(region_id, RegionRequest::Drop(request))])
283}
284
285fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
286 let mut requests = Vec::with_capacity(drops.requests.len());
287 for drop in drops.requests {
288 requests.extend(make_region_drop(drop)?);
289 }
290 Ok(requests)
291}
292
293fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
294 let region_id = RegionId::from(open.region_id);
295 let table_dir = table_dir(&open.path, region_id.table_id());
296 Ok(vec![(
297 region_id,
298 RegionRequest::Open(RegionOpenRequest {
299 engine: open.engine,
300 table_dir,
301 path_type: PathType::Bare,
302 options: open.options,
303 skip_wal_replay: false,
304 checkpoint: None,
305 }),
306 )])
307}
308
309fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
310 let region_id = close.region_id.into();
311 Ok(vec![(
312 region_id,
313 RegionRequest::Close(RegionCloseRequest {}),
314 )])
315}
316
317fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
318 let region_id = alter.region_id.into();
319 let request = RegionAlterRequest::try_from(alter)?;
320 Ok((region_id, request))
321}
322
323fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
324 let (region_id, request) = parse_region_alter(alter)?;
325 Ok(vec![(region_id, RegionRequest::Alter(request))])
326}
327
328fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
329 let mut requests = Vec::with_capacity(alters.requests.len());
330 for alter in alters.requests {
331 requests.extend(make_region_alter(alter)?);
332 }
333 Ok(requests)
334}
335
336fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337 let region_id = flush.region_id.into();
338 Ok(vec![(
339 region_id,
340 RegionRequest::Flush(RegionFlushRequest {
341 row_group_size: None,
342 }),
343 )])
344}
345
346fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
347 let region_id = compact.region_id.into();
348 let options = compact
349 .options
350 .unwrap_or(compact_request::Options::Regular(Default::default()));
351 let parallelism = if compact.parallelism == 0 {
353 None
354 } else {
355 Some(compact.parallelism)
356 };
357 Ok(vec![(
358 region_id,
359 RegionRequest::Compact(RegionCompactRequest {
360 options,
361 parallelism,
362 }),
363 )])
364}
365
366fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
367 let region_id = index.region_id.into();
368 Ok(vec![(
369 region_id,
370 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
371 )])
372}
373
374fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
375 let region_id = truncate.region_id.into();
376 match truncate.kind {
377 None => InvalidRawRegionRequestSnafu {
378 err: "missing kind in TruncateRequest".to_string(),
379 }
380 .fail(),
381 Some(truncate_request::Kind::All(_)) => Ok(vec![(
382 region_id,
383 RegionRequest::Truncate(RegionTruncateRequest::All),
384 )]),
385 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
386 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
387
388 Ok(vec![(
389 region_id,
390 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
391 )])
392 }
393 }
394}
395
396fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
398 let region_id = request.region_id.into();
399 let Some(Body::ArrowIpc(request)) = request.body else {
400 return Ok(vec![]);
401 };
402
403 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
404 .with_label_values(&["decode"])
405 .start_timer();
406 let mut decoder =
407 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
408 let payload = decoder
409 .try_decode_record_batch(&request.data_header, &request.payload)
410 .context(FlightCodecSnafu)?;
411 decoder_timer.observe_duration();
412 Ok(vec![(
413 region_id,
414 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
415 region_id,
416 payload,
417 raw_data: request,
418 }),
419 )])
420}
421
422fn make_region_apply_staging_manifest(
423 api::v1::region::ApplyStagingManifestRequest {
424 region_id,
425 partition_expr,
426 central_region_id,
427 manifest_path,
428 }: api::v1::region::ApplyStagingManifestRequest,
429) -> Result<Vec<(RegionId, RegionRequest)>> {
430 let region_id = region_id.into();
431 Ok(vec![(
432 region_id,
433 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
434 partition_expr,
435 central_region_id: central_region_id.into(),
436 manifest_path,
437 }),
438 )])
439}
440
441#[derive(Debug)]
443pub struct RegionPutRequest {
444 pub rows: Rows,
446 pub hint: Option<WriteHint>,
448}
449
450#[derive(Debug)]
451pub struct RegionReadRequest {
452 pub request: ScanRequest,
453}
454
455#[derive(Debug)]
457pub struct RegionDeleteRequest {
458 pub rows: Rows,
462 pub hint: Option<WriteHint>,
464}
465
466#[derive(Debug, Clone)]
467pub struct RegionCreateRequest {
468 pub engine: String,
470 pub column_metadatas: Vec<ColumnMetadata>,
472 pub primary_key: Vec<ColumnId>,
474 pub options: HashMap<String, String>,
476 pub table_dir: String,
478 pub path_type: PathType,
480 pub partition_expr_json: Option<String>,
483}
484
485impl RegionCreateRequest {
486 pub fn validate(&self) -> Result<()> {
488 ensure!(
490 self.column_metadatas
491 .iter()
492 .any(|x| x.semantic_type == SemanticType::Timestamp),
493 InvalidRegionRequestSnafu {
494 region_id: RegionId::new(0, 0),
495 err: "missing timestamp column in create region request".to_string(),
496 }
497 );
498
499 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
501 for (i, c) in self.column_metadatas.iter().enumerate() {
502 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
503 return InvalidRegionRequestSnafu {
504 region_id: RegionId::new(0, 0),
505 err: format!(
506 "duplicate column id {} (at position {} and {}) in create region request",
507 c.column_id, previous, i
508 ),
509 }
510 .fail();
511 }
512 }
513
514 for column_id in &self.primary_key {
516 ensure!(
517 column_id_to_indices.contains_key(column_id),
518 InvalidRegionRequestSnafu {
519 region_id: RegionId::new(0, 0),
520 err: format!(
521 "missing primary key column {} in create region request",
522 column_id
523 ),
524 }
525 );
526 }
527
528 Ok(())
529 }
530
531 pub fn is_physical_table(&self) -> bool {
533 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
534 }
535}
536
537#[derive(Debug, Clone)]
538pub struct RegionDropRequest {
539 pub fast_path: bool,
542
543 pub force: bool,
546
547 pub partial_drop: bool,
550}
551
552#[derive(Debug, Clone)]
554pub struct RegionOpenRequest {
555 pub engine: String,
557 pub table_dir: String,
559 pub path_type: PathType,
561 pub options: HashMap<String, String>,
563 pub skip_wal_replay: bool,
565 pub checkpoint: Option<ReplayCheckpoint>,
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq)]
570pub struct ReplayCheckpoint {
571 pub entry_id: u64,
572 pub metadata_entry_id: Option<u64>,
573}
574
575impl RegionOpenRequest {
576 pub fn is_physical_table(&self) -> bool {
578 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
579 }
580}
581
582#[derive(Debug)]
584pub struct RegionCloseRequest {}
585
586#[derive(Debug, PartialEq, Eq, Clone)]
588pub struct RegionAlterRequest {
589 pub kind: AlterKind,
591}
592
593impl RegionAlterRequest {
594 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
596 self.kind.validate(metadata)?;
597
598 Ok(())
599 }
600
601 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
605 debug_assert!(self.validate(metadata).is_ok());
606 self.kind.need_alter(metadata)
607 }
608}
609
610impl TryFrom<AlterRequest> for RegionAlterRequest {
611 type Error = MetadataError;
612
613 fn try_from(value: AlterRequest) -> Result<Self> {
614 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
615 err: "missing kind in AlterRequest",
616 })?;
617
618 let kind = AlterKind::try_from(kind)?;
619 Ok(RegionAlterRequest { kind })
620 }
621}
622
623#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
625pub enum AlterKind {
626 AddColumns {
628 columns: Vec<AddColumn>,
630 },
631 DropColumns {
633 names: Vec<String>,
635 },
636 ModifyColumnTypes {
638 columns: Vec<ModifyColumnType>,
640 },
641 SetRegionOptions { options: Vec<SetRegionOption> },
643 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
645 SetIndexes { options: Vec<SetIndexOption> },
647 UnsetIndexes { options: Vec<UnsetIndexOption> },
649 DropDefaults {
651 names: Vec<String>,
653 },
654 SetDefaults {
656 columns: Vec<SetDefault>,
658 },
659 SyncColumns {
661 column_metadatas: Vec<ColumnMetadata>,
662 },
663}
664#[derive(Debug, PartialEq, Eq, Clone)]
665pub struct SetDefault {
666 pub name: String,
667 pub default_constraint: Vec<u8>,
668}
669
670#[derive(Debug, PartialEq, Eq, Clone)]
671pub enum SetIndexOption {
672 Fulltext {
673 column_name: String,
674 options: FulltextOptions,
675 },
676 Inverted {
677 column_name: String,
678 },
679 Skipping {
680 column_name: String,
681 options: SkippingIndexOptions,
682 },
683}
684
685impl SetIndexOption {
686 pub fn column_name(&self) -> &String {
688 match self {
689 SetIndexOption::Fulltext { column_name, .. } => column_name,
690 SetIndexOption::Inverted { column_name } => column_name,
691 SetIndexOption::Skipping { column_name, .. } => column_name,
692 }
693 }
694
695 pub fn is_fulltext(&self) -> bool {
697 match self {
698 SetIndexOption::Fulltext { .. } => true,
699 SetIndexOption::Inverted { .. } => false,
700 SetIndexOption::Skipping { .. } => false,
701 }
702 }
703}
704
705impl TryFrom<v1::SetIndex> for SetIndexOption {
706 type Error = MetadataError;
707
708 fn try_from(value: v1::SetIndex) -> Result<Self> {
709 let option = value.options.context(InvalidRawRegionRequestSnafu {
710 err: "missing options in SetIndex",
711 })?;
712
713 let opt = match option {
714 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
715 column_name: x.column_name.clone(),
716 options: FulltextOptions::new(
717 x.enable,
718 as_fulltext_option_analyzer(
719 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
720 ),
721 x.case_sensitive,
722 as_fulltext_option_backend(
723 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
724 ),
725 x.granularity as u32,
726 x.false_positive_rate,
727 )
728 .context(InvalidIndexOptionSnafu)?,
729 },
730 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
731 column_name: i.column_name,
732 },
733 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
734 column_name: s.column_name,
735 options: SkippingIndexOptions::new(
736 s.granularity as u32,
737 s.false_positive_rate,
738 as_skipping_index_type(
739 PbSkippingIndexType::try_from(s.skipping_index_type)
740 .context(DecodeProtoSnafu)?,
741 ),
742 )
743 .context(InvalidIndexOptionSnafu)?,
744 },
745 };
746
747 Ok(opt)
748 }
749}
750
751#[derive(Debug, PartialEq, Eq, Clone)]
752pub enum UnsetIndexOption {
753 Fulltext { column_name: String },
754 Inverted { column_name: String },
755 Skipping { column_name: String },
756}
757
758impl UnsetIndexOption {
759 pub fn column_name(&self) -> &String {
760 match self {
761 UnsetIndexOption::Fulltext { column_name } => column_name,
762 UnsetIndexOption::Inverted { column_name } => column_name,
763 UnsetIndexOption::Skipping { column_name } => column_name,
764 }
765 }
766
767 pub fn is_fulltext(&self) -> bool {
768 match self {
769 UnsetIndexOption::Fulltext { .. } => true,
770 UnsetIndexOption::Inverted { .. } => false,
771 UnsetIndexOption::Skipping { .. } => false,
772 }
773 }
774}
775
776impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
777 type Error = MetadataError;
778
779 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
780 let option = value.options.context(InvalidRawRegionRequestSnafu {
781 err: "missing options in UnsetIndex",
782 })?;
783
784 let opt = match option {
785 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
786 column_name: f.column_name,
787 },
788 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
789 column_name: i.column_name,
790 },
791 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
792 column_name: s.column_name,
793 },
794 };
795
796 Ok(opt)
797 }
798}
799
800impl AlterKind {
801 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
805 match self {
806 AlterKind::AddColumns { columns } => {
807 for col_to_add in columns {
808 col_to_add.validate(metadata)?;
809 }
810 }
811 AlterKind::DropColumns { names } => {
812 for name in names {
813 Self::validate_column_to_drop(name, metadata)?;
814 }
815 }
816 AlterKind::ModifyColumnTypes { columns } => {
817 for col_to_change in columns {
818 col_to_change.validate(metadata)?;
819 }
820 }
821 AlterKind::SetRegionOptions { .. } => {}
822 AlterKind::UnsetRegionOptions { .. } => {}
823 AlterKind::SetIndexes { options } => {
824 for option in options {
825 Self::validate_column_alter_index_option(
826 option.column_name(),
827 metadata,
828 option.is_fulltext(),
829 )?;
830 }
831 }
832 AlterKind::UnsetIndexes { options } => {
833 for option in options {
834 Self::validate_column_alter_index_option(
835 option.column_name(),
836 metadata,
837 option.is_fulltext(),
838 )?;
839 }
840 }
841 AlterKind::DropDefaults { names } => {
842 names
843 .iter()
844 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
845 }
846 AlterKind::SetDefaults { columns } => {
847 columns
848 .iter()
849 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
850 }
851 AlterKind::SyncColumns { column_metadatas } => {
852 let new_primary_keys = column_metadatas
853 .iter()
854 .filter(|c| c.semantic_type == SemanticType::Tag)
855 .map(|c| (c.column_schema.name.as_str(), c.column_id))
856 .collect::<HashMap<_, _>>();
857
858 let old_primary_keys = metadata
859 .column_metadatas
860 .iter()
861 .filter(|c| c.semantic_type == SemanticType::Tag)
862 .map(|c| (c.column_schema.name.as_str(), c.column_id));
863
864 for (name, id) in old_primary_keys {
865 let primary_key =
866 new_primary_keys
867 .get(name)
868 .with_context(|| InvalidRegionRequestSnafu {
869 region_id: metadata.region_id,
870 err: format!("column {} is not a primary key", name),
871 })?;
872
873 ensure!(
874 *primary_key == id,
875 InvalidRegionRequestSnafu {
876 region_id: metadata.region_id,
877 err: format!(
878 "column with same name {} has different id, existing: {}, got: {}",
879 name, id, primary_key
880 ),
881 }
882 );
883 }
884
885 let new_ts_column = column_metadatas
886 .iter()
887 .find(|c| c.semantic_type == SemanticType::Timestamp)
888 .map(|c| (c.column_schema.name.as_str(), c.column_id))
889 .context(InvalidRegionRequestSnafu {
890 region_id: metadata.region_id,
891 err: "timestamp column not found",
892 })?;
893
894 let old_ts_column = metadata
896 .column_metadatas
897 .iter()
898 .find(|c| c.semantic_type == SemanticType::Timestamp)
899 .map(|c| (c.column_schema.name.as_str(), c.column_id))
900 .unwrap();
901
902 ensure!(
903 new_ts_column == old_ts_column,
904 InvalidRegionRequestSnafu {
905 region_id: metadata.region_id,
906 err: format!(
907 "timestamp column {} has different id, existing: {}, got: {}",
908 old_ts_column.0, old_ts_column.1, new_ts_column.1
909 ),
910 }
911 );
912 }
913 }
914 Ok(())
915 }
916
917 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
919 debug_assert!(self.validate(metadata).is_ok());
920 match self {
921 AlterKind::AddColumns { columns } => columns
922 .iter()
923 .any(|col_to_add| col_to_add.need_alter(metadata)),
924 AlterKind::DropColumns { names } => names
925 .iter()
926 .any(|name| metadata.column_by_name(name).is_some()),
927 AlterKind::ModifyColumnTypes { columns } => columns
928 .iter()
929 .any(|col_to_change| col_to_change.need_alter(metadata)),
930 AlterKind::SetRegionOptions { .. } => true,
931 AlterKind::UnsetRegionOptions { .. } => true,
932 AlterKind::SetIndexes { options, .. } => options
933 .iter()
934 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
935 AlterKind::UnsetIndexes { options } => options
936 .iter()
937 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
938 AlterKind::DropDefaults { names } => names
939 .iter()
940 .any(|name| metadata.column_by_name(name).is_some()),
941
942 AlterKind::SetDefaults { columns } => columns
943 .iter()
944 .any(|x| metadata.column_by_name(&x.name).is_some()),
945 AlterKind::SyncColumns { column_metadatas } => {
946 metadata.column_metadatas != *column_metadatas
947 }
948 }
949 }
950
951 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
953 let Some(column) = metadata.column_by_name(name) else {
954 return Ok(());
955 };
956 ensure!(
957 column.semantic_type == SemanticType::Field,
958 InvalidRegionRequestSnafu {
959 region_id: metadata.region_id,
960 err: format!("column {} is not a field and could not be dropped", name),
961 }
962 );
963 Ok(())
964 }
965
966 fn validate_column_alter_index_option(
968 column_name: &String,
969 metadata: &RegionMetadata,
970 is_fulltext: bool,
971 ) -> Result<()> {
972 let column = metadata
973 .column_by_name(column_name)
974 .context(InvalidRegionRequestSnafu {
975 region_id: metadata.region_id,
976 err: format!("column {} not found", column_name),
977 })?;
978
979 if is_fulltext {
980 ensure!(
981 column.column_schema.data_type.is_string(),
982 InvalidRegionRequestSnafu {
983 region_id: metadata.region_id,
984 err: format!(
985 "cannot change alter index options for non-string column {}",
986 column_name
987 ),
988 }
989 );
990 }
991
992 Ok(())
993 }
994
995 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
997 metadata
998 .column_by_name(column_name)
999 .context(InvalidRegionRequestSnafu {
1000 region_id: metadata.region_id,
1001 err: format!("column {} not found", column_name),
1002 })?;
1003
1004 Ok(())
1005 }
1006}
1007
1008impl TryFrom<alter_request::Kind> for AlterKind {
1009 type Error = MetadataError;
1010
1011 fn try_from(kind: alter_request::Kind) -> Result<Self> {
1012 let alter_kind = match kind {
1013 alter_request::Kind::AddColumns(x) => {
1014 let columns = x
1015 .add_columns
1016 .into_iter()
1017 .map(|x| x.try_into())
1018 .collect::<Result<Vec<_>>>()?;
1019 AlterKind::AddColumns { columns }
1020 }
1021 alter_request::Kind::ModifyColumnTypes(x) => {
1022 let columns = x
1023 .modify_column_types
1024 .into_iter()
1025 .map(|x| x.into())
1026 .collect::<Vec<_>>();
1027 AlterKind::ModifyColumnTypes { columns }
1028 }
1029 alter_request::Kind::DropColumns(x) => {
1030 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1031 AlterKind::DropColumns { names }
1032 }
1033 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1034 options: options
1035 .table_options
1036 .iter()
1037 .map(TryFrom::try_from)
1038 .collect::<Result<Vec<_>>>()?,
1039 },
1040 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1041 keys: options
1042 .keys
1043 .iter()
1044 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1045 .collect::<Result<Vec<_>>>()?,
1046 },
1047 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1048 options: vec![SetIndexOption::try_from(o)?],
1049 },
1050 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1051 options: vec![UnsetIndexOption::try_from(o)?],
1052 },
1053 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1054 options: o
1055 .set_indexes
1056 .into_iter()
1057 .map(SetIndexOption::try_from)
1058 .collect::<Result<Vec<_>>>()?,
1059 },
1060 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1061 options: o
1062 .unset_indexes
1063 .into_iter()
1064 .map(UnsetIndexOption::try_from)
1065 .collect::<Result<Vec<_>>>()?,
1066 },
1067 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1068 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1069 },
1070 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1071 columns: x
1072 .set_defaults
1073 .into_iter()
1074 .map(|x| {
1075 Ok(SetDefault {
1076 name: x.column_name,
1077 default_constraint: x.default_constraint.clone(),
1078 })
1079 })
1080 .collect::<Result<Vec<_>>>()?,
1081 },
1082 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1083 column_metadatas: x
1084 .column_defs
1085 .into_iter()
1086 .map(ColumnMetadata::try_from_column_def)
1087 .collect::<Result<Vec<_>>>()?,
1088 },
1089 };
1090
1091 Ok(alter_kind)
1092 }
1093}
1094
1095#[derive(Debug, PartialEq, Eq, Clone)]
1097pub struct AddColumn {
1098 pub column_metadata: ColumnMetadata,
1100 pub location: Option<AddColumnLocation>,
1103}
1104
1105impl AddColumn {
1106 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1111 ensure!(
1112 self.column_metadata.column_schema.is_nullable()
1113 || self
1114 .column_metadata
1115 .column_schema
1116 .default_constraint()
1117 .is_some(),
1118 InvalidRegionRequestSnafu {
1119 region_id: metadata.region_id,
1120 err: format!(
1121 "no default value for column {}",
1122 self.column_metadata.column_schema.name
1123 ),
1124 }
1125 );
1126
1127 if let Some(existing_column) =
1128 metadata.column_by_name(&self.column_metadata.column_schema.name)
1129 {
1130 ensure!(
1132 *existing_column == self.column_metadata,
1133 InvalidRegionRequestSnafu {
1134 region_id: metadata.region_id,
1135 err: format!(
1136 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1137 self.column_metadata.column_schema.name,
1138 existing_column,
1139 self.column_metadata,
1140 ),
1141 }
1142 );
1143 ensure!(
1144 self.location.is_none(),
1145 InvalidRegionRequestSnafu {
1146 region_id: metadata.region_id,
1147 err: format!(
1148 "column {} already exists, but location is specified",
1149 self.column_metadata.column_schema.name
1150 ),
1151 }
1152 );
1153 }
1154
1155 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1156 ensure!(
1158 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1159 InvalidRegionRequestSnafu {
1160 region_id: metadata.region_id,
1161 err: format!(
1162 "column id {} already exists with different name {}",
1163 self.column_metadata.column_id, existing_column.column_schema.name
1164 ),
1165 }
1166 );
1167 }
1168
1169 Ok(())
1170 }
1171
1172 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1174 debug_assert!(self.validate(metadata).is_ok());
1175 metadata
1176 .column_by_name(&self.column_metadata.column_schema.name)
1177 .is_none()
1178 }
1179}
1180
1181impl TryFrom<v1::region::AddColumn> for AddColumn {
1182 type Error = MetadataError;
1183
1184 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1185 let column_def = add_column
1186 .column_def
1187 .context(InvalidRawRegionRequestSnafu {
1188 err: "missing column_def in AddColumn",
1189 })?;
1190
1191 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1192 let location = add_column
1193 .location
1194 .map(AddColumnLocation::try_from)
1195 .transpose()?;
1196
1197 Ok(AddColumn {
1198 column_metadata,
1199 location,
1200 })
1201 }
1202}
1203
1204#[derive(Debug, PartialEq, Eq, Clone)]
1206pub enum AddColumnLocation {
1207 First,
1209 After {
1211 column_name: String,
1213 },
1214}
1215
1216impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1217 type Error = MetadataError;
1218
1219 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1220 let location_type = LocationType::try_from(location.location_type)
1221 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1222 let add_column_location = match location_type {
1223 LocationType::First => AddColumnLocation::First,
1224 LocationType::After => AddColumnLocation::After {
1225 column_name: location.after_column_name,
1226 },
1227 };
1228
1229 Ok(add_column_location)
1230 }
1231}
1232
1233#[derive(Debug, PartialEq, Eq, Clone)]
1235pub struct ModifyColumnType {
1236 pub column_name: String,
1238 pub target_type: ConcreteDataType,
1240}
1241
1242impl ModifyColumnType {
1243 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1245 let column_meta = metadata
1246 .column_by_name(&self.column_name)
1247 .with_context(|| InvalidRegionRequestSnafu {
1248 region_id: metadata.region_id,
1249 err: format!("column {} not found", self.column_name),
1250 })?;
1251
1252 ensure!(
1253 matches!(column_meta.semantic_type, SemanticType::Field),
1254 InvalidRegionRequestSnafu {
1255 region_id: metadata.region_id,
1256 err: "'timestamp' or 'tag' column cannot change type".to_string()
1257 }
1258 );
1259 ensure!(
1260 column_meta
1261 .column_schema
1262 .data_type
1263 .can_arrow_type_cast_to(&self.target_type),
1264 InvalidRegionRequestSnafu {
1265 region_id: metadata.region_id,
1266 err: format!(
1267 "column '{}' cannot be cast automatically to type '{}'",
1268 self.column_name, self.target_type
1269 ),
1270 }
1271 );
1272
1273 Ok(())
1274 }
1275
1276 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1278 debug_assert!(self.validate(metadata).is_ok());
1279 metadata.column_by_name(&self.column_name).is_some()
1280 }
1281}
1282
1283impl From<v1::ModifyColumnType> for ModifyColumnType {
1284 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1285 let target_type = ColumnDataTypeWrapper::new(
1286 modify_column_type.target_type(),
1287 modify_column_type.target_type_extension,
1288 )
1289 .into();
1290
1291 ModifyColumnType {
1292 column_name: modify_column_type.column_name,
1293 target_type,
1294 }
1295 }
1296}
1297
1298#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1299pub enum SetRegionOption {
1300 Ttl(Option<TimeToLive>),
1301 Twsc(String, String),
1303 Format(String),
1305}
1306
1307impl TryFrom<&PbOption> for SetRegionOption {
1308 type Error = MetadataError;
1309
1310 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1311 let PbOption { key, value } = value;
1312 match key.as_str() {
1313 TTL_KEY => {
1314 let ttl = TimeToLive::from_humantime_or_str(value)
1315 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1316
1317 Ok(Self::Ttl(Some(ttl)))
1318 }
1319 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1320 Ok(Self::Twsc(key.clone(), value.clone()))
1321 }
1322 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1323 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1324 }
1325 }
1326}
1327
1328impl From<&UnsetRegionOption> for SetRegionOption {
1329 fn from(unset_option: &UnsetRegionOption) -> Self {
1330 match unset_option {
1331 UnsetRegionOption::TwcsTriggerFileNum => {
1332 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1333 }
1334 UnsetRegionOption::TwcsMaxOutputFileSize => {
1335 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1336 }
1337 UnsetRegionOption::TwcsTimeWindow => {
1338 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1339 }
1340 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1341 }
1342 }
1343}
1344
1345impl TryFrom<&str> for UnsetRegionOption {
1346 type Error = MetadataError;
1347
1348 fn try_from(key: &str) -> Result<Self> {
1349 match key.to_ascii_lowercase().as_str() {
1350 TTL_KEY => Ok(Self::Ttl),
1351 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1352 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1353 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1354 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1355 }
1356 }
1357}
1358
1359#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1360pub enum UnsetRegionOption {
1361 TwcsTriggerFileNum,
1362 TwcsMaxOutputFileSize,
1363 TwcsTimeWindow,
1364 Ttl,
1365}
1366
1367impl UnsetRegionOption {
1368 pub fn as_str(&self) -> &str {
1369 match self {
1370 Self::Ttl => TTL_KEY,
1371 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1372 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1373 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1374 }
1375 }
1376}
1377
1378impl Display for UnsetRegionOption {
1379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1380 write!(f, "{}", self.as_str())
1381 }
1382}
1383
1384#[derive(Debug, Clone, Default)]
1385pub struct RegionFlushRequest {
1386 pub row_group_size: Option<usize>,
1387}
1388
1389#[derive(Debug)]
1390pub struct RegionCompactRequest {
1391 pub options: compact_request::Options,
1392 pub parallelism: Option<u32>,
1393}
1394
1395impl Default for RegionCompactRequest {
1396 fn default() -> Self {
1397 Self {
1398 options: compact_request::Options::Regular(Default::default()),
1400 parallelism: None,
1401 }
1402 }
1403}
1404
1405#[derive(Debug, Clone, Default)]
1406pub struct RegionBuildIndexRequest {}
1407
1408#[derive(Debug)]
1410pub enum RegionTruncateRequest {
1411 All,
1413 ByTimeRanges {
1414 time_ranges: Vec<(Timestamp, Timestamp)>,
1418 },
1419}
1420
1421#[derive(Debug, Clone, Copy, Default)]
1426pub struct RegionCatchupRequest {
1427 pub set_writable: bool,
1429 pub entry_id: Option<entry::Id>,
1432 pub metadata_entry_id: Option<entry::Id>,
1436 pub location_id: Option<u64>,
1438 pub checkpoint: Option<ReplayCheckpoint>,
1440}
1441
1442#[derive(Debug, Clone)]
1443pub struct RegionBulkInsertsRequest {
1444 pub region_id: RegionId,
1445 pub payload: DfRecordBatch,
1446 pub raw_data: ArrowIpc,
1447}
1448
1449impl RegionBulkInsertsRequest {
1450 pub fn estimated_size(&self) -> usize {
1451 self.payload.get_array_memory_size()
1452 }
1453}
1454
1455#[derive(Debug, Clone)]
1461pub struct EnterStagingRequest {
1462 pub partition_expr: String,
1464}
1465
1466#[derive(Debug, Clone)]
1485pub struct ApplyStagingManifestRequest {
1486 pub partition_expr: String,
1488 pub central_region_id: RegionId,
1490 pub manifest_path: String,
1493}
1494
1495impl fmt::Display for RegionRequest {
1496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1497 match self {
1498 RegionRequest::Put(_) => write!(f, "Put"),
1499 RegionRequest::Delete(_) => write!(f, "Delete"),
1500 RegionRequest::Create(_) => write!(f, "Create"),
1501 RegionRequest::Drop(_) => write!(f, "Drop"),
1502 RegionRequest::Open(_) => write!(f, "Open"),
1503 RegionRequest::Close(_) => write!(f, "Close"),
1504 RegionRequest::Alter(_) => write!(f, "Alter"),
1505 RegionRequest::Flush(_) => write!(f, "Flush"),
1506 RegionRequest::Compact(_) => write!(f, "Compact"),
1507 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1508 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1509 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1510 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1511 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1512 RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1513 }
1514 }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519
1520 use api::v1::region::RegionColumnDef;
1521 use api::v1::{ColumnDataType, ColumnDef};
1522 use datatypes::prelude::ConcreteDataType;
1523 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1524
1525 use super::*;
1526 use crate::metadata::RegionMetadataBuilder;
1527
1528 #[test]
1529 fn test_from_proto_location() {
1530 let proto_location = v1::AddColumnLocation {
1531 location_type: LocationType::First as i32,
1532 after_column_name: String::default(),
1533 };
1534 let location = AddColumnLocation::try_from(proto_location).unwrap();
1535 assert_eq!(location, AddColumnLocation::First);
1536
1537 let proto_location = v1::AddColumnLocation {
1538 location_type: 10,
1539 after_column_name: String::default(),
1540 };
1541 AddColumnLocation::try_from(proto_location).unwrap_err();
1542
1543 let proto_location = v1::AddColumnLocation {
1544 location_type: LocationType::After as i32,
1545 after_column_name: "a".to_string(),
1546 };
1547 let location = AddColumnLocation::try_from(proto_location).unwrap();
1548 assert_eq!(
1549 location,
1550 AddColumnLocation::After {
1551 column_name: "a".to_string()
1552 }
1553 );
1554 }
1555
1556 #[test]
1557 fn test_from_none_proto_add_column() {
1558 AddColumn::try_from(v1::region::AddColumn {
1559 column_def: None,
1560 location: None,
1561 })
1562 .unwrap_err();
1563 }
1564
1565 #[test]
1566 fn test_from_proto_alter_request() {
1567 RegionAlterRequest::try_from(AlterRequest {
1568 region_id: 0,
1569 schema_version: 1,
1570 kind: None,
1571 })
1572 .unwrap_err();
1573
1574 let request = RegionAlterRequest::try_from(AlterRequest {
1575 region_id: 0,
1576 schema_version: 1,
1577 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1578 add_columns: vec![v1::region::AddColumn {
1579 column_def: Some(RegionColumnDef {
1580 column_def: Some(ColumnDef {
1581 name: "a".to_string(),
1582 data_type: ColumnDataType::String as i32,
1583 is_nullable: true,
1584 default_constraint: vec![],
1585 semantic_type: SemanticType::Field as i32,
1586 comment: String::new(),
1587 ..Default::default()
1588 }),
1589 column_id: 1,
1590 }),
1591 location: Some(v1::AddColumnLocation {
1592 location_type: LocationType::First as i32,
1593 after_column_name: String::default(),
1594 }),
1595 }],
1596 })),
1597 })
1598 .unwrap();
1599
1600 assert_eq!(
1601 request,
1602 RegionAlterRequest {
1603 kind: AlterKind::AddColumns {
1604 columns: vec![AddColumn {
1605 column_metadata: ColumnMetadata {
1606 column_schema: ColumnSchema::new(
1607 "a",
1608 ConcreteDataType::string_datatype(),
1609 true,
1610 ),
1611 semantic_type: SemanticType::Field,
1612 column_id: 1,
1613 },
1614 location: Some(AddColumnLocation::First),
1615 }]
1616 },
1617 }
1618 );
1619 }
1620
1621 fn new_metadata() -> RegionMetadata {
1624 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1625 builder
1626 .push_column_metadata(ColumnMetadata {
1627 column_schema: ColumnSchema::new(
1628 "ts",
1629 ConcreteDataType::timestamp_millisecond_datatype(),
1630 false,
1631 ),
1632 semantic_type: SemanticType::Timestamp,
1633 column_id: 1,
1634 })
1635 .push_column_metadata(ColumnMetadata {
1636 column_schema: ColumnSchema::new(
1637 "tag_0",
1638 ConcreteDataType::string_datatype(),
1639 true,
1640 ),
1641 semantic_type: SemanticType::Tag,
1642 column_id: 2,
1643 })
1644 .push_column_metadata(ColumnMetadata {
1645 column_schema: ColumnSchema::new(
1646 "field_0",
1647 ConcreteDataType::string_datatype(),
1648 true,
1649 ),
1650 semantic_type: SemanticType::Field,
1651 column_id: 3,
1652 })
1653 .push_column_metadata(ColumnMetadata {
1654 column_schema: ColumnSchema::new(
1655 "field_1",
1656 ConcreteDataType::boolean_datatype(),
1657 true,
1658 ),
1659 semantic_type: SemanticType::Field,
1660 column_id: 4,
1661 })
1662 .primary_key(vec![2]);
1663 builder.build().unwrap()
1664 }
1665
1666 #[test]
1667 fn test_add_column_validate() {
1668 let metadata = new_metadata();
1669 let add_column = AddColumn {
1670 column_metadata: ColumnMetadata {
1671 column_schema: ColumnSchema::new(
1672 "tag_1",
1673 ConcreteDataType::string_datatype(),
1674 true,
1675 ),
1676 semantic_type: SemanticType::Tag,
1677 column_id: 5,
1678 },
1679 location: None,
1680 };
1681 add_column.validate(&metadata).unwrap();
1682 assert!(add_column.need_alter(&metadata));
1683
1684 AddColumn {
1686 column_metadata: ColumnMetadata {
1687 column_schema: ColumnSchema::new(
1688 "tag_1",
1689 ConcreteDataType::string_datatype(),
1690 false,
1691 ),
1692 semantic_type: SemanticType::Tag,
1693 column_id: 5,
1694 },
1695 location: None,
1696 }
1697 .validate(&metadata)
1698 .unwrap_err();
1699
1700 let add_column = AddColumn {
1702 column_metadata: ColumnMetadata {
1703 column_schema: ColumnSchema::new(
1704 "tag_0",
1705 ConcreteDataType::string_datatype(),
1706 true,
1707 ),
1708 semantic_type: SemanticType::Tag,
1709 column_id: 2,
1710 },
1711 location: None,
1712 };
1713 add_column.validate(&metadata).unwrap();
1714 assert!(!add_column.need_alter(&metadata));
1715 }
1716
1717 #[test]
1718 fn test_add_duplicate_columns() {
1719 let kind = AlterKind::AddColumns {
1720 columns: vec![
1721 AddColumn {
1722 column_metadata: ColumnMetadata {
1723 column_schema: ColumnSchema::new(
1724 "tag_1",
1725 ConcreteDataType::string_datatype(),
1726 true,
1727 ),
1728 semantic_type: SemanticType::Tag,
1729 column_id: 5,
1730 },
1731 location: None,
1732 },
1733 AddColumn {
1734 column_metadata: ColumnMetadata {
1735 column_schema: ColumnSchema::new(
1736 "tag_1",
1737 ConcreteDataType::string_datatype(),
1738 true,
1739 ),
1740 semantic_type: SemanticType::Field,
1741 column_id: 6,
1742 },
1743 location: None,
1744 },
1745 ],
1746 };
1747 let metadata = new_metadata();
1748 kind.validate(&metadata).unwrap();
1749 assert!(kind.need_alter(&metadata));
1750 }
1751
1752 #[test]
1753 fn test_add_existing_column_different_metadata() {
1754 let metadata = new_metadata();
1755
1756 let kind = AlterKind::AddColumns {
1758 columns: vec![AddColumn {
1759 column_metadata: ColumnMetadata {
1760 column_schema: ColumnSchema::new(
1761 "tag_0",
1762 ConcreteDataType::string_datatype(),
1763 true,
1764 ),
1765 semantic_type: SemanticType::Tag,
1766 column_id: 4,
1767 },
1768 location: None,
1769 }],
1770 };
1771 kind.validate(&metadata).unwrap_err();
1772
1773 let kind = AlterKind::AddColumns {
1775 columns: vec![AddColumn {
1776 column_metadata: ColumnMetadata {
1777 column_schema: ColumnSchema::new(
1778 "tag_0",
1779 ConcreteDataType::int64_datatype(),
1780 true,
1781 ),
1782 semantic_type: SemanticType::Tag,
1783 column_id: 2,
1784 },
1785 location: None,
1786 }],
1787 };
1788 kind.validate(&metadata).unwrap_err();
1789
1790 let kind = AlterKind::AddColumns {
1792 columns: vec![AddColumn {
1793 column_metadata: ColumnMetadata {
1794 column_schema: ColumnSchema::new(
1795 "tag_1",
1796 ConcreteDataType::string_datatype(),
1797 true,
1798 ),
1799 semantic_type: SemanticType::Tag,
1800 column_id: 2,
1801 },
1802 location: None,
1803 }],
1804 };
1805 kind.validate(&metadata).unwrap_err();
1806 }
1807
1808 #[test]
1809 fn test_add_existing_column_with_location() {
1810 let metadata = new_metadata();
1811 let kind = AlterKind::AddColumns {
1812 columns: vec![AddColumn {
1813 column_metadata: ColumnMetadata {
1814 column_schema: ColumnSchema::new(
1815 "tag_0",
1816 ConcreteDataType::string_datatype(),
1817 true,
1818 ),
1819 semantic_type: SemanticType::Tag,
1820 column_id: 2,
1821 },
1822 location: Some(AddColumnLocation::First),
1823 }],
1824 };
1825 kind.validate(&metadata).unwrap_err();
1826 }
1827
1828 #[test]
1829 fn test_validate_drop_column() {
1830 let metadata = new_metadata();
1831 let kind = AlterKind::DropColumns {
1832 names: vec!["xxxx".to_string()],
1833 };
1834 kind.validate(&metadata).unwrap();
1835 assert!(!kind.need_alter(&metadata));
1836
1837 AlterKind::DropColumns {
1838 names: vec!["tag_0".to_string()],
1839 }
1840 .validate(&metadata)
1841 .unwrap_err();
1842
1843 let kind = AlterKind::DropColumns {
1844 names: vec!["field_0".to_string()],
1845 };
1846 kind.validate(&metadata).unwrap();
1847 assert!(kind.need_alter(&metadata));
1848 }
1849
1850 #[test]
1851 fn test_validate_modify_column_type() {
1852 let metadata = new_metadata();
1853 AlterKind::ModifyColumnTypes {
1854 columns: vec![ModifyColumnType {
1855 column_name: "xxxx".to_string(),
1856 target_type: ConcreteDataType::string_datatype(),
1857 }],
1858 }
1859 .validate(&metadata)
1860 .unwrap_err();
1861
1862 AlterKind::ModifyColumnTypes {
1863 columns: vec![ModifyColumnType {
1864 column_name: "field_1".to_string(),
1865 target_type: ConcreteDataType::date_datatype(),
1866 }],
1867 }
1868 .validate(&metadata)
1869 .unwrap_err();
1870
1871 AlterKind::ModifyColumnTypes {
1872 columns: vec![ModifyColumnType {
1873 column_name: "ts".to_string(),
1874 target_type: ConcreteDataType::date_datatype(),
1875 }],
1876 }
1877 .validate(&metadata)
1878 .unwrap_err();
1879
1880 AlterKind::ModifyColumnTypes {
1881 columns: vec![ModifyColumnType {
1882 column_name: "tag_0".to_string(),
1883 target_type: ConcreteDataType::date_datatype(),
1884 }],
1885 }
1886 .validate(&metadata)
1887 .unwrap_err();
1888
1889 let kind = AlterKind::ModifyColumnTypes {
1890 columns: vec![ModifyColumnType {
1891 column_name: "field_0".to_string(),
1892 target_type: ConcreteDataType::int32_datatype(),
1893 }],
1894 };
1895 kind.validate(&metadata).unwrap();
1896 assert!(kind.need_alter(&metadata));
1897 }
1898
1899 #[test]
1900 fn test_validate_add_columns() {
1901 let kind = AlterKind::AddColumns {
1902 columns: vec![
1903 AddColumn {
1904 column_metadata: ColumnMetadata {
1905 column_schema: ColumnSchema::new(
1906 "tag_1",
1907 ConcreteDataType::string_datatype(),
1908 true,
1909 ),
1910 semantic_type: SemanticType::Tag,
1911 column_id: 5,
1912 },
1913 location: None,
1914 },
1915 AddColumn {
1916 column_metadata: ColumnMetadata {
1917 column_schema: ColumnSchema::new(
1918 "field_2",
1919 ConcreteDataType::string_datatype(),
1920 true,
1921 ),
1922 semantic_type: SemanticType::Field,
1923 column_id: 6,
1924 },
1925 location: None,
1926 },
1927 ],
1928 };
1929 let request = RegionAlterRequest { kind };
1930 let mut metadata = new_metadata();
1931 metadata.schema_version = 1;
1932 request.validate(&metadata).unwrap();
1933 }
1934
1935 #[test]
1936 fn test_validate_create_region() {
1937 let column_metadatas = vec![
1938 ColumnMetadata {
1939 column_schema: ColumnSchema::new(
1940 "ts",
1941 ConcreteDataType::timestamp_millisecond_datatype(),
1942 false,
1943 ),
1944 semantic_type: SemanticType::Timestamp,
1945 column_id: 1,
1946 },
1947 ColumnMetadata {
1948 column_schema: ColumnSchema::new(
1949 "tag_0",
1950 ConcreteDataType::string_datatype(),
1951 true,
1952 ),
1953 semantic_type: SemanticType::Tag,
1954 column_id: 2,
1955 },
1956 ColumnMetadata {
1957 column_schema: ColumnSchema::new(
1958 "field_0",
1959 ConcreteDataType::string_datatype(),
1960 true,
1961 ),
1962 semantic_type: SemanticType::Field,
1963 column_id: 3,
1964 },
1965 ];
1966 let create = RegionCreateRequest {
1967 engine: "mito".to_string(),
1968 column_metadatas,
1969 primary_key: vec![3, 4],
1970 options: HashMap::new(),
1971 table_dir: "path".to_string(),
1972 path_type: PathType::Bare,
1973 partition_expr_json: Some("".to_string()),
1974 };
1975
1976 assert!(create.validate().is_err());
1977 }
1978
1979 #[test]
1980 fn test_validate_modify_column_fulltext_options() {
1981 let kind = AlterKind::SetIndexes {
1982 options: vec![SetIndexOption::Fulltext {
1983 column_name: "tag_0".to_string(),
1984 options: FulltextOptions::new_unchecked(
1985 true,
1986 FulltextAnalyzer::Chinese,
1987 false,
1988 FulltextBackend::Bloom,
1989 1000,
1990 0.01,
1991 ),
1992 }],
1993 };
1994 let request = RegionAlterRequest { kind };
1995 let mut metadata = new_metadata();
1996 metadata.schema_version = 1;
1997 request.validate(&metadata).unwrap();
1998
1999 let kind = AlterKind::UnsetIndexes {
2000 options: vec![UnsetIndexOption::Fulltext {
2001 column_name: "tag_0".to_string(),
2002 }],
2003 };
2004 let request = RegionAlterRequest { kind };
2005 let mut metadata = new_metadata();
2006 metadata.schema_version = 1;
2007 request.validate(&metadata).unwrap();
2008 }
2009
2010 #[test]
2011 fn test_validate_sync_columns() {
2012 let metadata = new_metadata();
2013 let kind = AlterKind::SyncColumns {
2014 column_metadatas: vec![
2015 ColumnMetadata {
2016 column_schema: ColumnSchema::new(
2017 "tag_1",
2018 ConcreteDataType::string_datatype(),
2019 true,
2020 ),
2021 semantic_type: SemanticType::Tag,
2022 column_id: 5,
2023 },
2024 ColumnMetadata {
2025 column_schema: ColumnSchema::new(
2026 "field_2",
2027 ConcreteDataType::string_datatype(),
2028 true,
2029 ),
2030 semantic_type: SemanticType::Field,
2031 column_id: 6,
2032 },
2033 ],
2034 };
2035 let err = kind.validate(&metadata).unwrap_err();
2036 assert!(err.to_string().contains("not a primary key"));
2037
2038 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2040 let ts_column = column_metadatas_with_different_ts_column
2041 .iter_mut()
2042 .find(|c| c.semantic_type == SemanticType::Timestamp)
2043 .unwrap();
2044 ts_column.column_schema.name = "ts1".to_string();
2045
2046 let kind = AlterKind::SyncColumns {
2047 column_metadatas: column_metadatas_with_different_ts_column,
2048 };
2049 let err = kind.validate(&metadata).unwrap_err();
2050 assert!(
2051 err.to_string()
2052 .contains("timestamp column ts has different id")
2053 );
2054
2055 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2057 let pk_column = column_metadatas_with_different_pk_column
2058 .iter_mut()
2059 .find(|c| c.column_schema.name == "tag_0")
2060 .unwrap();
2061 pk_column.column_id = 100;
2062 let kind = AlterKind::SyncColumns {
2063 column_metadatas: column_metadatas_with_different_pk_column,
2064 };
2065 let err = kind.validate(&metadata).unwrap_err();
2066 assert!(
2067 err.to_string()
2068 .contains("column with same name tag_0 has different id")
2069 );
2070
2071 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2073 column_metadatas_with_new_field_column.push(ColumnMetadata {
2074 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2075 semantic_type: SemanticType::Field,
2076 column_id: 4,
2077 });
2078 let kind = AlterKind::SyncColumns {
2079 column_metadatas: column_metadatas_with_new_field_column,
2080 };
2081 kind.validate(&metadata).unwrap();
2082 }
2083
2084 #[test]
2085 fn test_cast_path_type_to_primitive() {
2086 assert_eq!(PathType::Bare as u8, 0);
2087 assert_eq!(PathType::Data as u8, 1);
2088 assert_eq!(PathType::Metadata as u8, 2);
2089 assert_eq!(
2090 PathType::try_from(PathType::Bare as u8).unwrap(),
2091 PathType::Bare
2092 );
2093 assert_eq!(
2094 PathType::try_from(PathType::Data as u8).unwrap(),
2095 PathType::Data
2096 );
2097 assert_eq!(
2098 PathType::try_from(PathType::Metadata as u8).unwrap(),
2099 PathType::Metadata
2100 );
2101 }
2102}