1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64 Bare,
68 Data,
72 Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80 Create(Vec<(RegionId, RegionCreateRequest)>),
81 Drop(Vec<(RegionId, RegionDropRequest)>),
82 Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88 match body {
89 region_request::Body::Creates(creates) => {
90 let requests = creates
91 .requests
92 .into_iter()
93 .map(parse_region_create)
94 .collect::<Result<Vec<_>>>()?;
95 Ok(Some(Self::Create(requests)))
96 }
97 region_request::Body::Drops(drops) => {
98 let requests = drops
99 .requests
100 .into_iter()
101 .map(parse_region_drop)
102 .collect::<Result<Vec<_>>>()?;
103 Ok(Some(Self::Drop(requests)))
104 }
105 region_request::Body::Alters(alters) => {
106 let requests = alters
107 .requests
108 .into_iter()
109 .map(parse_region_alter)
110 .collect::<Result<Vec<_>>>()?;
111 Ok(Some(Self::Alter(requests)))
112 }
113 _ => Ok(None),
114 }
115 }
116
117 pub fn request_type(&self) -> &'static str {
118 self.into()
119 }
120
121 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122 match self {
123 Self::Create(requests) => requests
124 .into_iter()
125 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126 .collect(),
127 Self::Drop(requests) => requests
128 .into_iter()
129 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130 .collect(),
131 Self::Alter(requests) => requests
132 .into_iter()
133 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134 .collect(),
135 }
136 }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141 Put(RegionPutRequest),
142 Delete(RegionDeleteRequest),
143 Create(RegionCreateRequest),
144 Drop(RegionDropRequest),
145 Open(RegionOpenRequest),
146 Close(RegionCloseRequest),
147 Alter(RegionAlterRequest),
148 Flush(RegionFlushRequest),
149 Compact(RegionCompactRequest),
150 BuildIndex(RegionBuildIndexRequest),
151 Truncate(RegionTruncateRequest),
152 Catchup(RegionCatchupRequest),
153 BulkInserts(RegionBulkInsertsRequest),
154 EnterStaging(EnterStagingRequest),
155 ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162 match body {
163 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165 region_request::Body::Create(create) => make_region_create(create),
166 region_request::Body::Drop(drop) => make_region_drop(drop),
167 region_request::Body::Open(open) => make_region_open(open),
168 region_request::Body::Close(close) => make_region_close(close),
169 region_request::Body::Alter(alter) => make_region_alter(alter),
170 region_request::Body::Flush(flush) => make_region_flush(flush),
171 region_request::Body::Compact(compact) => make_region_compact(compact),
172 region_request::Body::BuildIndex(index) => make_region_build_index(index),
173 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174 region_request::Body::Creates(creates) => make_region_creates(creates),
175 region_request::Body::Drops(drops) => make_region_drops(drops),
176 region_request::Body::Alters(alters) => make_region_alters(alters),
177 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178 region_request::Body::Sync(_) => UnexpectedSnafu {
179 reason: "Sync request should be handled separately by RegionServer",
180 }
181 .fail(),
182 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183 reason: "ListMetadata request should be handled separately by RegionServer",
184 }
185 .fail(),
186 region_request::Body::ApplyStagingManifest(apply) => {
187 make_region_apply_staging_manifest(apply)
188 }
189 }
190 }
191
192 pub fn request_type(&self) -> &'static str {
194 self.into()
195 }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199 let requests = inserts
200 .requests
201 .into_iter()
202 .filter_map(|r| {
203 let region_id = r.region_id.into();
204 r.rows.map(|rows| {
205 (
206 region_id,
207 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
208 )
209 })
210 })
211 .collect();
212 Ok(requests)
213}
214
215fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
216 let requests = deletes
217 .requests
218 .into_iter()
219 .filter_map(|r| {
220 let region_id = r.region_id.into();
221 r.rows.map(|rows| {
222 (
223 region_id,
224 RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
225 )
226 })
227 })
228 .collect();
229 Ok(requests)
230}
231
232fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
233 let column_metadatas = create
234 .column_defs
235 .into_iter()
236 .map(ColumnMetadata::try_from_column_def)
237 .collect::<Result<Vec<_>>>()?;
238 let region_id = RegionId::from(create.region_id);
239 let table_dir = table_dir(&create.path, region_id.table_id());
240 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
241 Ok((
242 region_id,
243 RegionCreateRequest {
244 engine: create.engine,
245 column_metadatas,
246 primary_key: create.primary_key,
247 options: create.options,
248 table_dir,
249 path_type: PathType::Bare,
250 partition_expr_json,
251 },
252 ))
253}
254
255fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
256 let (region_id, request) = parse_region_create(create)?;
257 Ok(vec![(region_id, RegionRequest::Create(request))])
258}
259
260fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
261 let mut requests = Vec::with_capacity(creates.requests.len());
262 for create in creates.requests {
263 requests.extend(make_region_create(create)?);
264 }
265 Ok(requests)
266}
267
268fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
269 let region_id = drop.region_id.into();
270 Ok((
271 region_id,
272 RegionDropRequest {
273 fast_path: drop.fast_path,
274 },
275 ))
276}
277
278fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
279 let (region_id, request) = parse_region_drop(drop)?;
280 Ok(vec![(region_id, RegionRequest::Drop(request))])
281}
282
283fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
284 let mut requests = Vec::with_capacity(drops.requests.len());
285 for drop in drops.requests {
286 requests.extend(make_region_drop(drop)?);
287 }
288 Ok(requests)
289}
290
291fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
292 let region_id = RegionId::from(open.region_id);
293 let table_dir = table_dir(&open.path, region_id.table_id());
294 Ok(vec![(
295 region_id,
296 RegionRequest::Open(RegionOpenRequest {
297 engine: open.engine,
298 table_dir,
299 path_type: PathType::Bare,
300 options: open.options,
301 skip_wal_replay: false,
302 checkpoint: None,
303 }),
304 )])
305}
306
307fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308 let region_id = close.region_id.into();
309 Ok(vec![(
310 region_id,
311 RegionRequest::Close(RegionCloseRequest {}),
312 )])
313}
314
315fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
316 let region_id = alter.region_id.into();
317 let request = RegionAlterRequest::try_from(alter)?;
318 Ok((region_id, request))
319}
320
321fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
322 let (region_id, request) = parse_region_alter(alter)?;
323 Ok(vec![(region_id, RegionRequest::Alter(request))])
324}
325
326fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
327 let mut requests = Vec::with_capacity(alters.requests.len());
328 for alter in alters.requests {
329 requests.extend(make_region_alter(alter)?);
330 }
331 Ok(requests)
332}
333
334fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
335 let region_id = flush.region_id.into();
336 Ok(vec![(
337 region_id,
338 RegionRequest::Flush(RegionFlushRequest {
339 row_group_size: None,
340 }),
341 )])
342}
343
344fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
345 let region_id = compact.region_id.into();
346 let options = compact
347 .options
348 .unwrap_or(compact_request::Options::Regular(Default::default()));
349 let parallelism = if compact.parallelism == 0 {
351 None
352 } else {
353 Some(compact.parallelism)
354 };
355 Ok(vec![(
356 region_id,
357 RegionRequest::Compact(RegionCompactRequest {
358 options,
359 parallelism,
360 }),
361 )])
362}
363
364fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
365 let region_id = index.region_id.into();
366 Ok(vec![(
367 region_id,
368 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
369 )])
370}
371
372fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
373 let region_id = truncate.region_id.into();
374 match truncate.kind {
375 None => InvalidRawRegionRequestSnafu {
376 err: "missing kind in TruncateRequest".to_string(),
377 }
378 .fail(),
379 Some(truncate_request::Kind::All(_)) => Ok(vec![(
380 region_id,
381 RegionRequest::Truncate(RegionTruncateRequest::All),
382 )]),
383 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
384 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
385
386 Ok(vec![(
387 region_id,
388 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
389 )])
390 }
391 }
392}
393
394fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
396 let region_id = request.region_id.into();
397 let Some(Body::ArrowIpc(request)) = request.body else {
398 return Ok(vec![]);
399 };
400
401 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
402 .with_label_values(&["decode"])
403 .start_timer();
404 let mut decoder =
405 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
406 let payload = decoder
407 .try_decode_record_batch(&request.data_header, &request.payload)
408 .context(FlightCodecSnafu)?;
409 decoder_timer.observe_duration();
410 Ok(vec![(
411 region_id,
412 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
413 region_id,
414 payload,
415 raw_data: request,
416 }),
417 )])
418}
419
420fn make_region_apply_staging_manifest(
421 api::v1::region::ApplyStagingManifestRequest {
422 region_id,
423 partition_expr,
424 central_region_id,
425 manifest_path,
426 }: api::v1::region::ApplyStagingManifestRequest,
427) -> Result<Vec<(RegionId, RegionRequest)>> {
428 let region_id = region_id.into();
429 Ok(vec![(
430 region_id,
431 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
432 partition_expr,
433 central_region_id: central_region_id.into(),
434 manifest_path,
435 }),
436 )])
437}
438
439#[derive(Debug)]
441pub struct RegionPutRequest {
442 pub rows: Rows,
444 pub hint: Option<WriteHint>,
446}
447
448#[derive(Debug)]
449pub struct RegionReadRequest {
450 pub request: ScanRequest,
451}
452
453#[derive(Debug)]
455pub struct RegionDeleteRequest {
456 pub rows: Rows,
460 pub hint: Option<WriteHint>,
462}
463
464#[derive(Debug, Clone)]
465pub struct RegionCreateRequest {
466 pub engine: String,
468 pub column_metadatas: Vec<ColumnMetadata>,
470 pub primary_key: Vec<ColumnId>,
472 pub options: HashMap<String, String>,
474 pub table_dir: String,
476 pub path_type: PathType,
478 pub partition_expr_json: Option<String>,
481}
482
483impl RegionCreateRequest {
484 pub fn validate(&self) -> Result<()> {
486 ensure!(
488 self.column_metadatas
489 .iter()
490 .any(|x| x.semantic_type == SemanticType::Timestamp),
491 InvalidRegionRequestSnafu {
492 region_id: RegionId::new(0, 0),
493 err: "missing timestamp column in create region request".to_string(),
494 }
495 );
496
497 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
499 for (i, c) in self.column_metadatas.iter().enumerate() {
500 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
501 return InvalidRegionRequestSnafu {
502 region_id: RegionId::new(0, 0),
503 err: format!(
504 "duplicate column id {} (at position {} and {}) in create region request",
505 c.column_id, previous, i
506 ),
507 }
508 .fail();
509 }
510 }
511
512 for column_id in &self.primary_key {
514 ensure!(
515 column_id_to_indices.contains_key(column_id),
516 InvalidRegionRequestSnafu {
517 region_id: RegionId::new(0, 0),
518 err: format!(
519 "missing primary key column {} in create region request",
520 column_id
521 ),
522 }
523 );
524 }
525
526 Ok(())
527 }
528
529 pub fn is_physical_table(&self) -> bool {
531 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
532 }
533}
534
535#[derive(Debug, Clone)]
536pub struct RegionDropRequest {
537 pub fast_path: bool,
538}
539
540#[derive(Debug, Clone)]
542pub struct RegionOpenRequest {
543 pub engine: String,
545 pub table_dir: String,
547 pub path_type: PathType,
549 pub options: HashMap<String, String>,
551 pub skip_wal_replay: bool,
553 pub checkpoint: Option<ReplayCheckpoint>,
555}
556
557#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub struct ReplayCheckpoint {
559 pub entry_id: u64,
560 pub metadata_entry_id: Option<u64>,
561}
562
563impl RegionOpenRequest {
564 pub fn is_physical_table(&self) -> bool {
566 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
567 }
568}
569
570#[derive(Debug)]
572pub struct RegionCloseRequest {}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct RegionAlterRequest {
577 pub kind: AlterKind,
579}
580
581impl RegionAlterRequest {
582 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
584 self.kind.validate(metadata)?;
585
586 Ok(())
587 }
588
589 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
593 debug_assert!(self.validate(metadata).is_ok());
594 self.kind.need_alter(metadata)
595 }
596}
597
598impl TryFrom<AlterRequest> for RegionAlterRequest {
599 type Error = MetadataError;
600
601 fn try_from(value: AlterRequest) -> Result<Self> {
602 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
603 err: "missing kind in AlterRequest",
604 })?;
605
606 let kind = AlterKind::try_from(kind)?;
607 Ok(RegionAlterRequest { kind })
608 }
609}
610
611#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
613pub enum AlterKind {
614 AddColumns {
616 columns: Vec<AddColumn>,
618 },
619 DropColumns {
621 names: Vec<String>,
623 },
624 ModifyColumnTypes {
626 columns: Vec<ModifyColumnType>,
628 },
629 SetRegionOptions { options: Vec<SetRegionOption> },
631 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
633 SetIndexes { options: Vec<SetIndexOption> },
635 UnsetIndexes { options: Vec<UnsetIndexOption> },
637 DropDefaults {
639 names: Vec<String>,
641 },
642 SetDefaults {
644 columns: Vec<SetDefault>,
646 },
647 SyncColumns {
649 column_metadatas: Vec<ColumnMetadata>,
650 },
651}
652#[derive(Debug, PartialEq, Eq, Clone)]
653pub struct SetDefault {
654 pub name: String,
655 pub default_constraint: Vec<u8>,
656}
657
658#[derive(Debug, PartialEq, Eq, Clone)]
659pub enum SetIndexOption {
660 Fulltext {
661 column_name: String,
662 options: FulltextOptions,
663 },
664 Inverted {
665 column_name: String,
666 },
667 Skipping {
668 column_name: String,
669 options: SkippingIndexOptions,
670 },
671}
672
673impl SetIndexOption {
674 pub fn column_name(&self) -> &String {
676 match self {
677 SetIndexOption::Fulltext { column_name, .. } => column_name,
678 SetIndexOption::Inverted { column_name } => column_name,
679 SetIndexOption::Skipping { column_name, .. } => column_name,
680 }
681 }
682
683 pub fn is_fulltext(&self) -> bool {
685 match self {
686 SetIndexOption::Fulltext { .. } => true,
687 SetIndexOption::Inverted { .. } => false,
688 SetIndexOption::Skipping { .. } => false,
689 }
690 }
691}
692
693impl TryFrom<v1::SetIndex> for SetIndexOption {
694 type Error = MetadataError;
695
696 fn try_from(value: v1::SetIndex) -> Result<Self> {
697 let option = value.options.context(InvalidRawRegionRequestSnafu {
698 err: "missing options in SetIndex",
699 })?;
700
701 let opt = match option {
702 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
703 column_name: x.column_name.clone(),
704 options: FulltextOptions::new(
705 x.enable,
706 as_fulltext_option_analyzer(
707 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
708 ),
709 x.case_sensitive,
710 as_fulltext_option_backend(
711 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
712 ),
713 x.granularity as u32,
714 x.false_positive_rate,
715 )
716 .context(InvalidIndexOptionSnafu)?,
717 },
718 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
719 column_name: i.column_name,
720 },
721 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
722 column_name: s.column_name,
723 options: SkippingIndexOptions::new(
724 s.granularity as u32,
725 s.false_positive_rate,
726 as_skipping_index_type(
727 PbSkippingIndexType::try_from(s.skipping_index_type)
728 .context(DecodeProtoSnafu)?,
729 ),
730 )
731 .context(InvalidIndexOptionSnafu)?,
732 },
733 };
734
735 Ok(opt)
736 }
737}
738
739#[derive(Debug, PartialEq, Eq, Clone)]
740pub enum UnsetIndexOption {
741 Fulltext { column_name: String },
742 Inverted { column_name: String },
743 Skipping { column_name: String },
744}
745
746impl UnsetIndexOption {
747 pub fn column_name(&self) -> &String {
748 match self {
749 UnsetIndexOption::Fulltext { column_name } => column_name,
750 UnsetIndexOption::Inverted { column_name } => column_name,
751 UnsetIndexOption::Skipping { column_name } => column_name,
752 }
753 }
754
755 pub fn is_fulltext(&self) -> bool {
756 match self {
757 UnsetIndexOption::Fulltext { .. } => true,
758 UnsetIndexOption::Inverted { .. } => false,
759 UnsetIndexOption::Skipping { .. } => false,
760 }
761 }
762}
763
764impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
765 type Error = MetadataError;
766
767 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
768 let option = value.options.context(InvalidRawRegionRequestSnafu {
769 err: "missing options in UnsetIndex",
770 })?;
771
772 let opt = match option {
773 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
774 column_name: f.column_name,
775 },
776 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
777 column_name: i.column_name,
778 },
779 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
780 column_name: s.column_name,
781 },
782 };
783
784 Ok(opt)
785 }
786}
787
788impl AlterKind {
789 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
793 match self {
794 AlterKind::AddColumns { columns } => {
795 for col_to_add in columns {
796 col_to_add.validate(metadata)?;
797 }
798 }
799 AlterKind::DropColumns { names } => {
800 for name in names {
801 Self::validate_column_to_drop(name, metadata)?;
802 }
803 }
804 AlterKind::ModifyColumnTypes { columns } => {
805 for col_to_change in columns {
806 col_to_change.validate(metadata)?;
807 }
808 }
809 AlterKind::SetRegionOptions { .. } => {}
810 AlterKind::UnsetRegionOptions { .. } => {}
811 AlterKind::SetIndexes { options } => {
812 for option in options {
813 Self::validate_column_alter_index_option(
814 option.column_name(),
815 metadata,
816 option.is_fulltext(),
817 )?;
818 }
819 }
820 AlterKind::UnsetIndexes { options } => {
821 for option in options {
822 Self::validate_column_alter_index_option(
823 option.column_name(),
824 metadata,
825 option.is_fulltext(),
826 )?;
827 }
828 }
829 AlterKind::DropDefaults { names } => {
830 names
831 .iter()
832 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
833 }
834 AlterKind::SetDefaults { columns } => {
835 columns
836 .iter()
837 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
838 }
839 AlterKind::SyncColumns { column_metadatas } => {
840 let new_primary_keys = column_metadatas
841 .iter()
842 .filter(|c| c.semantic_type == SemanticType::Tag)
843 .map(|c| (c.column_schema.name.as_str(), c.column_id))
844 .collect::<HashMap<_, _>>();
845
846 let old_primary_keys = metadata
847 .column_metadatas
848 .iter()
849 .filter(|c| c.semantic_type == SemanticType::Tag)
850 .map(|c| (c.column_schema.name.as_str(), c.column_id));
851
852 for (name, id) in old_primary_keys {
853 let primary_key =
854 new_primary_keys
855 .get(name)
856 .with_context(|| InvalidRegionRequestSnafu {
857 region_id: metadata.region_id,
858 err: format!("column {} is not a primary key", name),
859 })?;
860
861 ensure!(
862 *primary_key == id,
863 InvalidRegionRequestSnafu {
864 region_id: metadata.region_id,
865 err: format!(
866 "column with same name {} has different id, existing: {}, got: {}",
867 name, id, primary_key
868 ),
869 }
870 );
871 }
872
873 let new_ts_column = column_metadatas
874 .iter()
875 .find(|c| c.semantic_type == SemanticType::Timestamp)
876 .map(|c| (c.column_schema.name.as_str(), c.column_id))
877 .context(InvalidRegionRequestSnafu {
878 region_id: metadata.region_id,
879 err: "timestamp column not found",
880 })?;
881
882 let old_ts_column = metadata
884 .column_metadatas
885 .iter()
886 .find(|c| c.semantic_type == SemanticType::Timestamp)
887 .map(|c| (c.column_schema.name.as_str(), c.column_id))
888 .unwrap();
889
890 ensure!(
891 new_ts_column == old_ts_column,
892 InvalidRegionRequestSnafu {
893 region_id: metadata.region_id,
894 err: format!(
895 "timestamp column {} has different id, existing: {}, got: {}",
896 old_ts_column.0, old_ts_column.1, new_ts_column.1
897 ),
898 }
899 );
900 }
901 }
902 Ok(())
903 }
904
905 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
907 debug_assert!(self.validate(metadata).is_ok());
908 match self {
909 AlterKind::AddColumns { columns } => columns
910 .iter()
911 .any(|col_to_add| col_to_add.need_alter(metadata)),
912 AlterKind::DropColumns { names } => names
913 .iter()
914 .any(|name| metadata.column_by_name(name).is_some()),
915 AlterKind::ModifyColumnTypes { columns } => columns
916 .iter()
917 .any(|col_to_change| col_to_change.need_alter(metadata)),
918 AlterKind::SetRegionOptions { .. } => true,
919 AlterKind::UnsetRegionOptions { .. } => true,
920 AlterKind::SetIndexes { options, .. } => options
921 .iter()
922 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
923 AlterKind::UnsetIndexes { options } => options
924 .iter()
925 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
926 AlterKind::DropDefaults { names } => names
927 .iter()
928 .any(|name| metadata.column_by_name(name).is_some()),
929
930 AlterKind::SetDefaults { columns } => columns
931 .iter()
932 .any(|x| metadata.column_by_name(&x.name).is_some()),
933 AlterKind::SyncColumns { column_metadatas } => {
934 metadata.column_metadatas != *column_metadatas
935 }
936 }
937 }
938
939 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
941 let Some(column) = metadata.column_by_name(name) else {
942 return Ok(());
943 };
944 ensure!(
945 column.semantic_type == SemanticType::Field,
946 InvalidRegionRequestSnafu {
947 region_id: metadata.region_id,
948 err: format!("column {} is not a field and could not be dropped", name),
949 }
950 );
951 Ok(())
952 }
953
954 fn validate_column_alter_index_option(
956 column_name: &String,
957 metadata: &RegionMetadata,
958 is_fulltext: bool,
959 ) -> Result<()> {
960 let column = metadata
961 .column_by_name(column_name)
962 .context(InvalidRegionRequestSnafu {
963 region_id: metadata.region_id,
964 err: format!("column {} not found", column_name),
965 })?;
966
967 if is_fulltext {
968 ensure!(
969 column.column_schema.data_type.is_string(),
970 InvalidRegionRequestSnafu {
971 region_id: metadata.region_id,
972 err: format!(
973 "cannot change alter index options for non-string column {}",
974 column_name
975 ),
976 }
977 );
978 }
979
980 Ok(())
981 }
982
983 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
985 metadata
986 .column_by_name(column_name)
987 .context(InvalidRegionRequestSnafu {
988 region_id: metadata.region_id,
989 err: format!("column {} not found", column_name),
990 })?;
991
992 Ok(())
993 }
994}
995
996impl TryFrom<alter_request::Kind> for AlterKind {
997 type Error = MetadataError;
998
999 fn try_from(kind: alter_request::Kind) -> Result<Self> {
1000 let alter_kind = match kind {
1001 alter_request::Kind::AddColumns(x) => {
1002 let columns = x
1003 .add_columns
1004 .into_iter()
1005 .map(|x| x.try_into())
1006 .collect::<Result<Vec<_>>>()?;
1007 AlterKind::AddColumns { columns }
1008 }
1009 alter_request::Kind::ModifyColumnTypes(x) => {
1010 let columns = x
1011 .modify_column_types
1012 .into_iter()
1013 .map(|x| x.into())
1014 .collect::<Vec<_>>();
1015 AlterKind::ModifyColumnTypes { columns }
1016 }
1017 alter_request::Kind::DropColumns(x) => {
1018 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1019 AlterKind::DropColumns { names }
1020 }
1021 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1022 options: options
1023 .table_options
1024 .iter()
1025 .map(TryFrom::try_from)
1026 .collect::<Result<Vec<_>>>()?,
1027 },
1028 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1029 keys: options
1030 .keys
1031 .iter()
1032 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1033 .collect::<Result<Vec<_>>>()?,
1034 },
1035 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1036 options: vec![SetIndexOption::try_from(o)?],
1037 },
1038 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1039 options: vec![UnsetIndexOption::try_from(o)?],
1040 },
1041 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1042 options: o
1043 .set_indexes
1044 .into_iter()
1045 .map(SetIndexOption::try_from)
1046 .collect::<Result<Vec<_>>>()?,
1047 },
1048 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1049 options: o
1050 .unset_indexes
1051 .into_iter()
1052 .map(UnsetIndexOption::try_from)
1053 .collect::<Result<Vec<_>>>()?,
1054 },
1055 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1056 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1057 },
1058 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1059 columns: x
1060 .set_defaults
1061 .into_iter()
1062 .map(|x| {
1063 Ok(SetDefault {
1064 name: x.column_name,
1065 default_constraint: x.default_constraint.clone(),
1066 })
1067 })
1068 .collect::<Result<Vec<_>>>()?,
1069 },
1070 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1071 column_metadatas: x
1072 .column_defs
1073 .into_iter()
1074 .map(ColumnMetadata::try_from_column_def)
1075 .collect::<Result<Vec<_>>>()?,
1076 },
1077 };
1078
1079 Ok(alter_kind)
1080 }
1081}
1082
1083#[derive(Debug, PartialEq, Eq, Clone)]
1085pub struct AddColumn {
1086 pub column_metadata: ColumnMetadata,
1088 pub location: Option<AddColumnLocation>,
1091}
1092
1093impl AddColumn {
1094 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1099 ensure!(
1100 self.column_metadata.column_schema.is_nullable()
1101 || self
1102 .column_metadata
1103 .column_schema
1104 .default_constraint()
1105 .is_some(),
1106 InvalidRegionRequestSnafu {
1107 region_id: metadata.region_id,
1108 err: format!(
1109 "no default value for column {}",
1110 self.column_metadata.column_schema.name
1111 ),
1112 }
1113 );
1114
1115 if let Some(existing_column) =
1116 metadata.column_by_name(&self.column_metadata.column_schema.name)
1117 {
1118 ensure!(
1120 *existing_column == self.column_metadata,
1121 InvalidRegionRequestSnafu {
1122 region_id: metadata.region_id,
1123 err: format!(
1124 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1125 self.column_metadata.column_schema.name,
1126 existing_column,
1127 self.column_metadata,
1128 ),
1129 }
1130 );
1131 ensure!(
1132 self.location.is_none(),
1133 InvalidRegionRequestSnafu {
1134 region_id: metadata.region_id,
1135 err: format!(
1136 "column {} already exists, but location is specified",
1137 self.column_metadata.column_schema.name
1138 ),
1139 }
1140 );
1141 }
1142
1143 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1144 ensure!(
1146 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1147 InvalidRegionRequestSnafu {
1148 region_id: metadata.region_id,
1149 err: format!(
1150 "column id {} already exists with different name {}",
1151 self.column_metadata.column_id, existing_column.column_schema.name
1152 ),
1153 }
1154 );
1155 }
1156
1157 Ok(())
1158 }
1159
1160 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1162 debug_assert!(self.validate(metadata).is_ok());
1163 metadata
1164 .column_by_name(&self.column_metadata.column_schema.name)
1165 .is_none()
1166 }
1167}
1168
1169impl TryFrom<v1::region::AddColumn> for AddColumn {
1170 type Error = MetadataError;
1171
1172 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1173 let column_def = add_column
1174 .column_def
1175 .context(InvalidRawRegionRequestSnafu {
1176 err: "missing column_def in AddColumn",
1177 })?;
1178
1179 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1180 let location = add_column
1181 .location
1182 .map(AddColumnLocation::try_from)
1183 .transpose()?;
1184
1185 Ok(AddColumn {
1186 column_metadata,
1187 location,
1188 })
1189 }
1190}
1191
1192#[derive(Debug, PartialEq, Eq, Clone)]
1194pub enum AddColumnLocation {
1195 First,
1197 After {
1199 column_name: String,
1201 },
1202}
1203
1204impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1205 type Error = MetadataError;
1206
1207 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1208 let location_type = LocationType::try_from(location.location_type)
1209 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1210 let add_column_location = match location_type {
1211 LocationType::First => AddColumnLocation::First,
1212 LocationType::After => AddColumnLocation::After {
1213 column_name: location.after_column_name,
1214 },
1215 };
1216
1217 Ok(add_column_location)
1218 }
1219}
1220
1221#[derive(Debug, PartialEq, Eq, Clone)]
1223pub struct ModifyColumnType {
1224 pub column_name: String,
1226 pub target_type: ConcreteDataType,
1228}
1229
1230impl ModifyColumnType {
1231 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1233 let column_meta = metadata
1234 .column_by_name(&self.column_name)
1235 .with_context(|| InvalidRegionRequestSnafu {
1236 region_id: metadata.region_id,
1237 err: format!("column {} not found", self.column_name),
1238 })?;
1239
1240 ensure!(
1241 matches!(column_meta.semantic_type, SemanticType::Field),
1242 InvalidRegionRequestSnafu {
1243 region_id: metadata.region_id,
1244 err: "'timestamp' or 'tag' column cannot change type".to_string()
1245 }
1246 );
1247 ensure!(
1248 column_meta
1249 .column_schema
1250 .data_type
1251 .can_arrow_type_cast_to(&self.target_type),
1252 InvalidRegionRequestSnafu {
1253 region_id: metadata.region_id,
1254 err: format!(
1255 "column '{}' cannot be cast automatically to type '{}'",
1256 self.column_name, self.target_type
1257 ),
1258 }
1259 );
1260
1261 Ok(())
1262 }
1263
1264 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1266 debug_assert!(self.validate(metadata).is_ok());
1267 metadata.column_by_name(&self.column_name).is_some()
1268 }
1269}
1270
1271impl From<v1::ModifyColumnType> for ModifyColumnType {
1272 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1273 let target_type = ColumnDataTypeWrapper::new(
1274 modify_column_type.target_type(),
1275 modify_column_type.target_type_extension,
1276 )
1277 .into();
1278
1279 ModifyColumnType {
1280 column_name: modify_column_type.column_name,
1281 target_type,
1282 }
1283 }
1284}
1285
1286#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1287pub enum SetRegionOption {
1288 Ttl(Option<TimeToLive>),
1289 Twsc(String, String),
1291 Format(String),
1293}
1294
1295impl TryFrom<&PbOption> for SetRegionOption {
1296 type Error = MetadataError;
1297
1298 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1299 let PbOption { key, value } = value;
1300 match key.as_str() {
1301 TTL_KEY => {
1302 let ttl = TimeToLive::from_humantime_or_str(value)
1303 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1304
1305 Ok(Self::Ttl(Some(ttl)))
1306 }
1307 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1308 Ok(Self::Twsc(key.clone(), value.clone()))
1309 }
1310 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1311 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1312 }
1313 }
1314}
1315
1316impl From<&UnsetRegionOption> for SetRegionOption {
1317 fn from(unset_option: &UnsetRegionOption) -> Self {
1318 match unset_option {
1319 UnsetRegionOption::TwcsTriggerFileNum => {
1320 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1321 }
1322 UnsetRegionOption::TwcsMaxOutputFileSize => {
1323 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1324 }
1325 UnsetRegionOption::TwcsTimeWindow => {
1326 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1327 }
1328 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1329 }
1330 }
1331}
1332
1333impl TryFrom<&str> for UnsetRegionOption {
1334 type Error = MetadataError;
1335
1336 fn try_from(key: &str) -> Result<Self> {
1337 match key.to_ascii_lowercase().as_str() {
1338 TTL_KEY => Ok(Self::Ttl),
1339 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1340 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1341 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1342 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1343 }
1344 }
1345}
1346
1347#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1348pub enum UnsetRegionOption {
1349 TwcsTriggerFileNum,
1350 TwcsMaxOutputFileSize,
1351 TwcsTimeWindow,
1352 Ttl,
1353}
1354
1355impl UnsetRegionOption {
1356 pub fn as_str(&self) -> &str {
1357 match self {
1358 Self::Ttl => TTL_KEY,
1359 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1360 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1361 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1362 }
1363 }
1364}
1365
1366impl Display for UnsetRegionOption {
1367 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1368 write!(f, "{}", self.as_str())
1369 }
1370}
1371
1372#[derive(Debug, Clone, Default)]
1373pub struct RegionFlushRequest {
1374 pub row_group_size: Option<usize>,
1375}
1376
1377#[derive(Debug)]
1378pub struct RegionCompactRequest {
1379 pub options: compact_request::Options,
1380 pub parallelism: Option<u32>,
1381}
1382
1383impl Default for RegionCompactRequest {
1384 fn default() -> Self {
1385 Self {
1386 options: compact_request::Options::Regular(Default::default()),
1388 parallelism: None,
1389 }
1390 }
1391}
1392
1393#[derive(Debug, Clone, Default)]
1394pub struct RegionBuildIndexRequest {}
1395
1396#[derive(Debug)]
1398pub enum RegionTruncateRequest {
1399 All,
1401 ByTimeRanges {
1402 time_ranges: Vec<(Timestamp, Timestamp)>,
1406 },
1407}
1408
1409#[derive(Debug, Clone, Copy, Default)]
1414pub struct RegionCatchupRequest {
1415 pub set_writable: bool,
1417 pub entry_id: Option<entry::Id>,
1420 pub metadata_entry_id: Option<entry::Id>,
1424 pub location_id: Option<u64>,
1426 pub checkpoint: Option<ReplayCheckpoint>,
1428}
1429
1430#[derive(Debug, Clone)]
1431pub struct RegionBulkInsertsRequest {
1432 pub region_id: RegionId,
1433 pub payload: DfRecordBatch,
1434 pub raw_data: ArrowIpc,
1435}
1436
1437impl RegionBulkInsertsRequest {
1438 pub fn estimated_size(&self) -> usize {
1439 self.payload.get_array_memory_size()
1440 }
1441}
1442
1443#[derive(Debug, Clone)]
1449pub struct EnterStagingRequest {
1450 pub partition_expr: String,
1452}
1453
1454#[derive(Debug, Clone)]
1473pub struct ApplyStagingManifestRequest {
1474 pub partition_expr: String,
1476 pub central_region_id: RegionId,
1478 pub manifest_path: String,
1481}
1482
1483impl fmt::Display for RegionRequest {
1484 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1485 match self {
1486 RegionRequest::Put(_) => write!(f, "Put"),
1487 RegionRequest::Delete(_) => write!(f, "Delete"),
1488 RegionRequest::Create(_) => write!(f, "Create"),
1489 RegionRequest::Drop(_) => write!(f, "Drop"),
1490 RegionRequest::Open(_) => write!(f, "Open"),
1491 RegionRequest::Close(_) => write!(f, "Close"),
1492 RegionRequest::Alter(_) => write!(f, "Alter"),
1493 RegionRequest::Flush(_) => write!(f, "Flush"),
1494 RegionRequest::Compact(_) => write!(f, "Compact"),
1495 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1496 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1497 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1498 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1499 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1500 RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1501 }
1502 }
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507
1508 use api::v1::region::RegionColumnDef;
1509 use api::v1::{ColumnDataType, ColumnDef};
1510 use datatypes::prelude::ConcreteDataType;
1511 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1512
1513 use super::*;
1514 use crate::metadata::RegionMetadataBuilder;
1515
1516 #[test]
1517 fn test_from_proto_location() {
1518 let proto_location = v1::AddColumnLocation {
1519 location_type: LocationType::First as i32,
1520 after_column_name: String::default(),
1521 };
1522 let location = AddColumnLocation::try_from(proto_location).unwrap();
1523 assert_eq!(location, AddColumnLocation::First);
1524
1525 let proto_location = v1::AddColumnLocation {
1526 location_type: 10,
1527 after_column_name: String::default(),
1528 };
1529 AddColumnLocation::try_from(proto_location).unwrap_err();
1530
1531 let proto_location = v1::AddColumnLocation {
1532 location_type: LocationType::After as i32,
1533 after_column_name: "a".to_string(),
1534 };
1535 let location = AddColumnLocation::try_from(proto_location).unwrap();
1536 assert_eq!(
1537 location,
1538 AddColumnLocation::After {
1539 column_name: "a".to_string()
1540 }
1541 );
1542 }
1543
1544 #[test]
1545 fn test_from_none_proto_add_column() {
1546 AddColumn::try_from(v1::region::AddColumn {
1547 column_def: None,
1548 location: None,
1549 })
1550 .unwrap_err();
1551 }
1552
1553 #[test]
1554 fn test_from_proto_alter_request() {
1555 RegionAlterRequest::try_from(AlterRequest {
1556 region_id: 0,
1557 schema_version: 1,
1558 kind: None,
1559 })
1560 .unwrap_err();
1561
1562 let request = RegionAlterRequest::try_from(AlterRequest {
1563 region_id: 0,
1564 schema_version: 1,
1565 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1566 add_columns: vec![v1::region::AddColumn {
1567 column_def: Some(RegionColumnDef {
1568 column_def: Some(ColumnDef {
1569 name: "a".to_string(),
1570 data_type: ColumnDataType::String as i32,
1571 is_nullable: true,
1572 default_constraint: vec![],
1573 semantic_type: SemanticType::Field as i32,
1574 comment: String::new(),
1575 ..Default::default()
1576 }),
1577 column_id: 1,
1578 }),
1579 location: Some(v1::AddColumnLocation {
1580 location_type: LocationType::First as i32,
1581 after_column_name: String::default(),
1582 }),
1583 }],
1584 })),
1585 })
1586 .unwrap();
1587
1588 assert_eq!(
1589 request,
1590 RegionAlterRequest {
1591 kind: AlterKind::AddColumns {
1592 columns: vec![AddColumn {
1593 column_metadata: ColumnMetadata {
1594 column_schema: ColumnSchema::new(
1595 "a",
1596 ConcreteDataType::string_datatype(),
1597 true,
1598 ),
1599 semantic_type: SemanticType::Field,
1600 column_id: 1,
1601 },
1602 location: Some(AddColumnLocation::First),
1603 }]
1604 },
1605 }
1606 );
1607 }
1608
1609 fn new_metadata() -> RegionMetadata {
1612 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1613 builder
1614 .push_column_metadata(ColumnMetadata {
1615 column_schema: ColumnSchema::new(
1616 "ts",
1617 ConcreteDataType::timestamp_millisecond_datatype(),
1618 false,
1619 ),
1620 semantic_type: SemanticType::Timestamp,
1621 column_id: 1,
1622 })
1623 .push_column_metadata(ColumnMetadata {
1624 column_schema: ColumnSchema::new(
1625 "tag_0",
1626 ConcreteDataType::string_datatype(),
1627 true,
1628 ),
1629 semantic_type: SemanticType::Tag,
1630 column_id: 2,
1631 })
1632 .push_column_metadata(ColumnMetadata {
1633 column_schema: ColumnSchema::new(
1634 "field_0",
1635 ConcreteDataType::string_datatype(),
1636 true,
1637 ),
1638 semantic_type: SemanticType::Field,
1639 column_id: 3,
1640 })
1641 .push_column_metadata(ColumnMetadata {
1642 column_schema: ColumnSchema::new(
1643 "field_1",
1644 ConcreteDataType::boolean_datatype(),
1645 true,
1646 ),
1647 semantic_type: SemanticType::Field,
1648 column_id: 4,
1649 })
1650 .primary_key(vec![2]);
1651 builder.build().unwrap()
1652 }
1653
1654 #[test]
1655 fn test_add_column_validate() {
1656 let metadata = new_metadata();
1657 let add_column = AddColumn {
1658 column_metadata: ColumnMetadata {
1659 column_schema: ColumnSchema::new(
1660 "tag_1",
1661 ConcreteDataType::string_datatype(),
1662 true,
1663 ),
1664 semantic_type: SemanticType::Tag,
1665 column_id: 5,
1666 },
1667 location: None,
1668 };
1669 add_column.validate(&metadata).unwrap();
1670 assert!(add_column.need_alter(&metadata));
1671
1672 AddColumn {
1674 column_metadata: ColumnMetadata {
1675 column_schema: ColumnSchema::new(
1676 "tag_1",
1677 ConcreteDataType::string_datatype(),
1678 false,
1679 ),
1680 semantic_type: SemanticType::Tag,
1681 column_id: 5,
1682 },
1683 location: None,
1684 }
1685 .validate(&metadata)
1686 .unwrap_err();
1687
1688 let add_column = AddColumn {
1690 column_metadata: ColumnMetadata {
1691 column_schema: ColumnSchema::new(
1692 "tag_0",
1693 ConcreteDataType::string_datatype(),
1694 true,
1695 ),
1696 semantic_type: SemanticType::Tag,
1697 column_id: 2,
1698 },
1699 location: None,
1700 };
1701 add_column.validate(&metadata).unwrap();
1702 assert!(!add_column.need_alter(&metadata));
1703 }
1704
1705 #[test]
1706 fn test_add_duplicate_columns() {
1707 let kind = AlterKind::AddColumns {
1708 columns: vec![
1709 AddColumn {
1710 column_metadata: ColumnMetadata {
1711 column_schema: ColumnSchema::new(
1712 "tag_1",
1713 ConcreteDataType::string_datatype(),
1714 true,
1715 ),
1716 semantic_type: SemanticType::Tag,
1717 column_id: 5,
1718 },
1719 location: None,
1720 },
1721 AddColumn {
1722 column_metadata: ColumnMetadata {
1723 column_schema: ColumnSchema::new(
1724 "tag_1",
1725 ConcreteDataType::string_datatype(),
1726 true,
1727 ),
1728 semantic_type: SemanticType::Field,
1729 column_id: 6,
1730 },
1731 location: None,
1732 },
1733 ],
1734 };
1735 let metadata = new_metadata();
1736 kind.validate(&metadata).unwrap();
1737 assert!(kind.need_alter(&metadata));
1738 }
1739
1740 #[test]
1741 fn test_add_existing_column_different_metadata() {
1742 let metadata = new_metadata();
1743
1744 let kind = AlterKind::AddColumns {
1746 columns: vec![AddColumn {
1747 column_metadata: ColumnMetadata {
1748 column_schema: ColumnSchema::new(
1749 "tag_0",
1750 ConcreteDataType::string_datatype(),
1751 true,
1752 ),
1753 semantic_type: SemanticType::Tag,
1754 column_id: 4,
1755 },
1756 location: None,
1757 }],
1758 };
1759 kind.validate(&metadata).unwrap_err();
1760
1761 let kind = AlterKind::AddColumns {
1763 columns: vec![AddColumn {
1764 column_metadata: ColumnMetadata {
1765 column_schema: ColumnSchema::new(
1766 "tag_0",
1767 ConcreteDataType::int64_datatype(),
1768 true,
1769 ),
1770 semantic_type: SemanticType::Tag,
1771 column_id: 2,
1772 },
1773 location: None,
1774 }],
1775 };
1776 kind.validate(&metadata).unwrap_err();
1777
1778 let kind = AlterKind::AddColumns {
1780 columns: vec![AddColumn {
1781 column_metadata: ColumnMetadata {
1782 column_schema: ColumnSchema::new(
1783 "tag_1",
1784 ConcreteDataType::string_datatype(),
1785 true,
1786 ),
1787 semantic_type: SemanticType::Tag,
1788 column_id: 2,
1789 },
1790 location: None,
1791 }],
1792 };
1793 kind.validate(&metadata).unwrap_err();
1794 }
1795
1796 #[test]
1797 fn test_add_existing_column_with_location() {
1798 let metadata = new_metadata();
1799 let kind = AlterKind::AddColumns {
1800 columns: vec![AddColumn {
1801 column_metadata: ColumnMetadata {
1802 column_schema: ColumnSchema::new(
1803 "tag_0",
1804 ConcreteDataType::string_datatype(),
1805 true,
1806 ),
1807 semantic_type: SemanticType::Tag,
1808 column_id: 2,
1809 },
1810 location: Some(AddColumnLocation::First),
1811 }],
1812 };
1813 kind.validate(&metadata).unwrap_err();
1814 }
1815
1816 #[test]
1817 fn test_validate_drop_column() {
1818 let metadata = new_metadata();
1819 let kind = AlterKind::DropColumns {
1820 names: vec!["xxxx".to_string()],
1821 };
1822 kind.validate(&metadata).unwrap();
1823 assert!(!kind.need_alter(&metadata));
1824
1825 AlterKind::DropColumns {
1826 names: vec!["tag_0".to_string()],
1827 }
1828 .validate(&metadata)
1829 .unwrap_err();
1830
1831 let kind = AlterKind::DropColumns {
1832 names: vec!["field_0".to_string()],
1833 };
1834 kind.validate(&metadata).unwrap();
1835 assert!(kind.need_alter(&metadata));
1836 }
1837
1838 #[test]
1839 fn test_validate_modify_column_type() {
1840 let metadata = new_metadata();
1841 AlterKind::ModifyColumnTypes {
1842 columns: vec![ModifyColumnType {
1843 column_name: "xxxx".to_string(),
1844 target_type: ConcreteDataType::string_datatype(),
1845 }],
1846 }
1847 .validate(&metadata)
1848 .unwrap_err();
1849
1850 AlterKind::ModifyColumnTypes {
1851 columns: vec![ModifyColumnType {
1852 column_name: "field_1".to_string(),
1853 target_type: ConcreteDataType::date_datatype(),
1854 }],
1855 }
1856 .validate(&metadata)
1857 .unwrap_err();
1858
1859 AlterKind::ModifyColumnTypes {
1860 columns: vec![ModifyColumnType {
1861 column_name: "ts".to_string(),
1862 target_type: ConcreteDataType::date_datatype(),
1863 }],
1864 }
1865 .validate(&metadata)
1866 .unwrap_err();
1867
1868 AlterKind::ModifyColumnTypes {
1869 columns: vec![ModifyColumnType {
1870 column_name: "tag_0".to_string(),
1871 target_type: ConcreteDataType::date_datatype(),
1872 }],
1873 }
1874 .validate(&metadata)
1875 .unwrap_err();
1876
1877 let kind = AlterKind::ModifyColumnTypes {
1878 columns: vec![ModifyColumnType {
1879 column_name: "field_0".to_string(),
1880 target_type: ConcreteDataType::int32_datatype(),
1881 }],
1882 };
1883 kind.validate(&metadata).unwrap();
1884 assert!(kind.need_alter(&metadata));
1885 }
1886
1887 #[test]
1888 fn test_validate_add_columns() {
1889 let kind = AlterKind::AddColumns {
1890 columns: vec![
1891 AddColumn {
1892 column_metadata: ColumnMetadata {
1893 column_schema: ColumnSchema::new(
1894 "tag_1",
1895 ConcreteDataType::string_datatype(),
1896 true,
1897 ),
1898 semantic_type: SemanticType::Tag,
1899 column_id: 5,
1900 },
1901 location: None,
1902 },
1903 AddColumn {
1904 column_metadata: ColumnMetadata {
1905 column_schema: ColumnSchema::new(
1906 "field_2",
1907 ConcreteDataType::string_datatype(),
1908 true,
1909 ),
1910 semantic_type: SemanticType::Field,
1911 column_id: 6,
1912 },
1913 location: None,
1914 },
1915 ],
1916 };
1917 let request = RegionAlterRequest { kind };
1918 let mut metadata = new_metadata();
1919 metadata.schema_version = 1;
1920 request.validate(&metadata).unwrap();
1921 }
1922
1923 #[test]
1924 fn test_validate_create_region() {
1925 let column_metadatas = vec![
1926 ColumnMetadata {
1927 column_schema: ColumnSchema::new(
1928 "ts",
1929 ConcreteDataType::timestamp_millisecond_datatype(),
1930 false,
1931 ),
1932 semantic_type: SemanticType::Timestamp,
1933 column_id: 1,
1934 },
1935 ColumnMetadata {
1936 column_schema: ColumnSchema::new(
1937 "tag_0",
1938 ConcreteDataType::string_datatype(),
1939 true,
1940 ),
1941 semantic_type: SemanticType::Tag,
1942 column_id: 2,
1943 },
1944 ColumnMetadata {
1945 column_schema: ColumnSchema::new(
1946 "field_0",
1947 ConcreteDataType::string_datatype(),
1948 true,
1949 ),
1950 semantic_type: SemanticType::Field,
1951 column_id: 3,
1952 },
1953 ];
1954 let create = RegionCreateRequest {
1955 engine: "mito".to_string(),
1956 column_metadatas,
1957 primary_key: vec![3, 4],
1958 options: HashMap::new(),
1959 table_dir: "path".to_string(),
1960 path_type: PathType::Bare,
1961 partition_expr_json: Some("".to_string()),
1962 };
1963
1964 assert!(create.validate().is_err());
1965 }
1966
1967 #[test]
1968 fn test_validate_modify_column_fulltext_options() {
1969 let kind = AlterKind::SetIndexes {
1970 options: vec![SetIndexOption::Fulltext {
1971 column_name: "tag_0".to_string(),
1972 options: FulltextOptions::new_unchecked(
1973 true,
1974 FulltextAnalyzer::Chinese,
1975 false,
1976 FulltextBackend::Bloom,
1977 1000,
1978 0.01,
1979 ),
1980 }],
1981 };
1982 let request = RegionAlterRequest { kind };
1983 let mut metadata = new_metadata();
1984 metadata.schema_version = 1;
1985 request.validate(&metadata).unwrap();
1986
1987 let kind = AlterKind::UnsetIndexes {
1988 options: vec![UnsetIndexOption::Fulltext {
1989 column_name: "tag_0".to_string(),
1990 }],
1991 };
1992 let request = RegionAlterRequest { kind };
1993 let mut metadata = new_metadata();
1994 metadata.schema_version = 1;
1995 request.validate(&metadata).unwrap();
1996 }
1997
1998 #[test]
1999 fn test_validate_sync_columns() {
2000 let metadata = new_metadata();
2001 let kind = AlterKind::SyncColumns {
2002 column_metadatas: vec![
2003 ColumnMetadata {
2004 column_schema: ColumnSchema::new(
2005 "tag_1",
2006 ConcreteDataType::string_datatype(),
2007 true,
2008 ),
2009 semantic_type: SemanticType::Tag,
2010 column_id: 5,
2011 },
2012 ColumnMetadata {
2013 column_schema: ColumnSchema::new(
2014 "field_2",
2015 ConcreteDataType::string_datatype(),
2016 true,
2017 ),
2018 semantic_type: SemanticType::Field,
2019 column_id: 6,
2020 },
2021 ],
2022 };
2023 let err = kind.validate(&metadata).unwrap_err();
2024 assert!(err.to_string().contains("not a primary key"));
2025
2026 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2028 let ts_column = column_metadatas_with_different_ts_column
2029 .iter_mut()
2030 .find(|c| c.semantic_type == SemanticType::Timestamp)
2031 .unwrap();
2032 ts_column.column_schema.name = "ts1".to_string();
2033
2034 let kind = AlterKind::SyncColumns {
2035 column_metadatas: column_metadatas_with_different_ts_column,
2036 };
2037 let err = kind.validate(&metadata).unwrap_err();
2038 assert!(
2039 err.to_string()
2040 .contains("timestamp column ts has different id")
2041 );
2042
2043 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2045 let pk_column = column_metadatas_with_different_pk_column
2046 .iter_mut()
2047 .find(|c| c.column_schema.name == "tag_0")
2048 .unwrap();
2049 pk_column.column_id = 100;
2050 let kind = AlterKind::SyncColumns {
2051 column_metadatas: column_metadatas_with_different_pk_column,
2052 };
2053 let err = kind.validate(&metadata).unwrap_err();
2054 assert!(
2055 err.to_string()
2056 .contains("column with same name tag_0 has different id")
2057 );
2058
2059 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2061 column_metadatas_with_new_field_column.push(ColumnMetadata {
2062 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2063 semantic_type: SemanticType::Field,
2064 column_id: 4,
2065 });
2066 let kind = AlterKind::SyncColumns {
2067 column_metadatas: column_metadatas_with_new_field_column,
2068 };
2069 kind.validate(&metadata).unwrap();
2070 }
2071
2072 #[test]
2073 fn test_cast_path_type_to_primitive() {
2074 assert_eq!(PathType::Bare as u8, 0);
2075 assert_eq!(PathType::Data as u8, 1);
2076 assert_eq!(PathType::Metadata as u8, 2);
2077 assert_eq!(
2078 PathType::try_from(PathType::Bare as u8).unwrap(),
2079 PathType::Bare
2080 );
2081 assert_eq!(
2082 PathType::try_from(PathType::Data as u8).unwrap(),
2083 PathType::Data
2084 );
2085 assert_eq!(
2086 PathType::try_from(PathType::Metadata as u8).unwrap(),
2087 PathType::Metadata
2088 );
2089 }
2090}