1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26 CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27 OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49 RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63 Bare,
67 Data,
71 Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79 Create(Vec<(RegionId, RegionCreateRequest)>),
80 Drop(Vec<(RegionId, RegionDropRequest)>),
81 Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87 match body {
88 region_request::Body::Creates(creates) => {
89 let requests = creates
90 .requests
91 .into_iter()
92 .map(parse_region_create)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Some(Self::Create(requests)))
95 }
96 region_request::Body::Drops(drops) => {
97 let requests = drops
98 .requests
99 .into_iter()
100 .map(parse_region_drop)
101 .collect::<Result<Vec<_>>>()?;
102 Ok(Some(Self::Drop(requests)))
103 }
104 region_request::Body::Alters(alters) => {
105 let requests = alters
106 .requests
107 .into_iter()
108 .map(parse_region_alter)
109 .collect::<Result<Vec<_>>>()?;
110 Ok(Some(Self::Alter(requests)))
111 }
112 _ => Ok(None),
113 }
114 }
115
116 pub fn request_type(&self) -> &'static str {
117 self.into()
118 }
119
120 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121 match self {
122 Self::Create(requests) => requests
123 .into_iter()
124 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125 .collect(),
126 Self::Drop(requests) => requests
127 .into_iter()
128 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129 .collect(),
130 Self::Alter(requests) => requests
131 .into_iter()
132 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133 .collect(),
134 }
135 }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140 Put(RegionPutRequest),
141 Delete(RegionDeleteRequest),
142 Create(RegionCreateRequest),
143 Drop(RegionDropRequest),
144 Open(RegionOpenRequest),
145 Close(RegionCloseRequest),
146 Alter(RegionAlterRequest),
147 Flush(RegionFlushRequest),
148 Compact(RegionCompactRequest),
149 BuildIndex(RegionBuildIndexRequest),
150 Truncate(RegionTruncateRequest),
151 Catchup(RegionCatchupRequest),
152 BulkInserts(RegionBulkInsertsRequest),
153}
154
155impl RegionRequest {
156 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
159 match body {
160 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
161 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
162 region_request::Body::Create(create) => make_region_create(create),
163 region_request::Body::Drop(drop) => make_region_drop(drop),
164 region_request::Body::Open(open) => make_region_open(open),
165 region_request::Body::Close(close) => make_region_close(close),
166 region_request::Body::Alter(alter) => make_region_alter(alter),
167 region_request::Body::Flush(flush) => make_region_flush(flush),
168 region_request::Body::Compact(compact) => make_region_compact(compact),
169 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
170 region_request::Body::Creates(creates) => make_region_creates(creates),
171 region_request::Body::Drops(drops) => make_region_drops(drops),
172 region_request::Body::Alters(alters) => make_region_alters(alters),
173 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
174 region_request::Body::Sync(_) => UnexpectedSnafu {
175 reason: "Sync request should be handled separately by RegionServer",
176 }
177 .fail(),
178 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
179 reason: "ListMetadata request should be handled separately by RegionServer",
180 }
181 .fail(),
182 }
183 }
184
185 pub fn request_type(&self) -> &'static str {
187 self.into()
188 }
189}
190
191fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
192 let requests = inserts
193 .requests
194 .into_iter()
195 .filter_map(|r| {
196 let region_id = r.region_id.into();
197 r.rows.map(|rows| {
198 (
199 region_id,
200 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
201 )
202 })
203 })
204 .collect();
205 Ok(requests)
206}
207
208fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
209 let requests = deletes
210 .requests
211 .into_iter()
212 .filter_map(|r| {
213 let region_id = r.region_id.into();
214 r.rows.map(|rows| {
215 (
216 region_id,
217 RegionRequest::Delete(RegionDeleteRequest { rows }),
218 )
219 })
220 })
221 .collect();
222 Ok(requests)
223}
224
225fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
226 let column_metadatas = create
227 .column_defs
228 .into_iter()
229 .map(ColumnMetadata::try_from_column_def)
230 .collect::<Result<Vec<_>>>()?;
231 let region_id = RegionId::from(create.region_id);
232 let table_dir = table_dir(&create.path, region_id.table_id());
233 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
234 Ok((
235 region_id,
236 RegionCreateRequest {
237 engine: create.engine,
238 column_metadatas,
239 primary_key: create.primary_key,
240 options: create.options,
241 table_dir,
242 path_type: PathType::Bare,
243 partition_expr_json,
244 },
245 ))
246}
247
248fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249 let (region_id, request) = parse_region_create(create)?;
250 Ok(vec![(region_id, RegionRequest::Create(request))])
251}
252
253fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254 let mut requests = Vec::with_capacity(creates.requests.len());
255 for create in creates.requests {
256 requests.extend(make_region_create(create)?);
257 }
258 Ok(requests)
259}
260
261fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
262 let region_id = drop.region_id.into();
263 Ok((
264 region_id,
265 RegionDropRequest {
266 fast_path: drop.fast_path,
267 },
268 ))
269}
270
271fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
272 let (region_id, request) = parse_region_drop(drop)?;
273 Ok(vec![(region_id, RegionRequest::Drop(request))])
274}
275
276fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
277 let mut requests = Vec::with_capacity(drops.requests.len());
278 for drop in drops.requests {
279 requests.extend(make_region_drop(drop)?);
280 }
281 Ok(requests)
282}
283
284fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285 let region_id = RegionId::from(open.region_id);
286 let table_dir = table_dir(&open.path, region_id.table_id());
287 Ok(vec![(
288 region_id,
289 RegionRequest::Open(RegionOpenRequest {
290 engine: open.engine,
291 table_dir,
292 path_type: PathType::Bare,
293 options: open.options,
294 skip_wal_replay: false,
295 checkpoint: None,
296 }),
297 )])
298}
299
300fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301 let region_id = close.region_id.into();
302 Ok(vec![(
303 region_id,
304 RegionRequest::Close(RegionCloseRequest {}),
305 )])
306}
307
308fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
309 let region_id = alter.region_id.into();
310 let request = RegionAlterRequest::try_from(alter)?;
311 Ok((region_id, request))
312}
313
314fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
315 let (region_id, request) = parse_region_alter(alter)?;
316 Ok(vec![(region_id, RegionRequest::Alter(request))])
317}
318
319fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
320 let mut requests = Vec::with_capacity(alters.requests.len());
321 for alter in alters.requests {
322 requests.extend(make_region_alter(alter)?);
323 }
324 Ok(requests)
325}
326
327fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328 let region_id = flush.region_id.into();
329 Ok(vec![(
330 region_id,
331 RegionRequest::Flush(RegionFlushRequest {
332 row_group_size: None,
333 }),
334 )])
335}
336
337fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
338 let region_id = compact.region_id.into();
339 let options = compact
340 .options
341 .unwrap_or(compact_request::Options::Regular(Default::default()));
342 let parallelism = if compact.parallelism == 0 {
344 None
345 } else {
346 Some(compact.parallelism)
347 };
348 Ok(vec![(
349 region_id,
350 RegionRequest::Compact(RegionCompactRequest {
351 options,
352 parallelism,
353 }),
354 )])
355}
356
357fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
358 let region_id = truncate.region_id.into();
359 match truncate.kind {
360 None => InvalidRawRegionRequestSnafu {
361 err: "missing kind in TruncateRequest".to_string(),
362 }
363 .fail(),
364 Some(truncate_request::Kind::All(_)) => Ok(vec![(
365 region_id,
366 RegionRequest::Truncate(RegionTruncateRequest::All),
367 )]),
368 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
369 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
370
371 Ok(vec![(
372 region_id,
373 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
374 )])
375 }
376 }
377}
378
379fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
381 let region_id = request.region_id.into();
382 let Some(Body::ArrowIpc(request)) = request.body else {
383 return Ok(vec![]);
384 };
385
386 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
387 .with_label_values(&["decode"])
388 .start_timer();
389 let mut decoder =
390 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
391 let payload = decoder
392 .try_decode_record_batch(&request.data_header, &request.payload)
393 .context(FlightCodecSnafu)?;
394 decoder_timer.observe_duration();
395 Ok(vec![(
396 region_id,
397 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
398 region_id,
399 payload,
400 raw_data: request,
401 }),
402 )])
403}
404
405#[derive(Debug)]
407pub struct RegionPutRequest {
408 pub rows: Rows,
410 pub hint: Option<WriteHint>,
412}
413
414#[derive(Debug)]
415pub struct RegionReadRequest {
416 pub request: ScanRequest,
417}
418
419#[derive(Debug)]
421pub struct RegionDeleteRequest {
422 pub rows: Rows,
426}
427
428#[derive(Debug, Clone)]
429pub struct RegionCreateRequest {
430 pub engine: String,
432 pub column_metadatas: Vec<ColumnMetadata>,
434 pub primary_key: Vec<ColumnId>,
436 pub options: HashMap<String, String>,
438 pub table_dir: String,
440 pub path_type: PathType,
442 pub partition_expr_json: Option<String>,
445}
446
447impl RegionCreateRequest {
448 pub fn validate(&self) -> Result<()> {
450 ensure!(
452 self.column_metadatas
453 .iter()
454 .any(|x| x.semantic_type == SemanticType::Timestamp),
455 InvalidRegionRequestSnafu {
456 region_id: RegionId::new(0, 0),
457 err: "missing timestamp column in create region request".to_string(),
458 }
459 );
460
461 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
463 for (i, c) in self.column_metadatas.iter().enumerate() {
464 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
465 return InvalidRegionRequestSnafu {
466 region_id: RegionId::new(0, 0),
467 err: format!(
468 "duplicate column id {} (at position {} and {}) in create region request",
469 c.column_id, previous, i
470 ),
471 }
472 .fail();
473 }
474 }
475
476 for column_id in &self.primary_key {
478 ensure!(
479 column_id_to_indices.contains_key(column_id),
480 InvalidRegionRequestSnafu {
481 region_id: RegionId::new(0, 0),
482 err: format!(
483 "missing primary key column {} in create region request",
484 column_id
485 ),
486 }
487 );
488 }
489
490 Ok(())
491 }
492
493 pub fn is_physical_table(&self) -> bool {
495 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
496 }
497}
498
499#[derive(Debug, Clone)]
500pub struct RegionDropRequest {
501 pub fast_path: bool,
502}
503
504#[derive(Debug, Clone)]
506pub struct RegionOpenRequest {
507 pub engine: String,
509 pub table_dir: String,
511 pub path_type: PathType,
513 pub options: HashMap<String, String>,
515 pub skip_wal_replay: bool,
517 pub checkpoint: Option<ReplayCheckpoint>,
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub struct ReplayCheckpoint {
523 pub entry_id: u64,
524 pub metadata_entry_id: Option<u64>,
525}
526
527impl RegionOpenRequest {
528 pub fn is_physical_table(&self) -> bool {
530 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
531 }
532}
533
534#[derive(Debug)]
536pub struct RegionCloseRequest {}
537
538#[derive(Debug, PartialEq, Eq, Clone)]
540pub struct RegionAlterRequest {
541 pub kind: AlterKind,
543}
544
545impl RegionAlterRequest {
546 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
548 self.kind.validate(metadata)?;
549
550 Ok(())
551 }
552
553 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
557 debug_assert!(self.validate(metadata).is_ok());
558 self.kind.need_alter(metadata)
559 }
560}
561
562impl TryFrom<AlterRequest> for RegionAlterRequest {
563 type Error = MetadataError;
564
565 fn try_from(value: AlterRequest) -> Result<Self> {
566 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
567 err: "missing kind in AlterRequest",
568 })?;
569
570 let kind = AlterKind::try_from(kind)?;
571 Ok(RegionAlterRequest { kind })
572 }
573}
574
575#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
577pub enum AlterKind {
578 AddColumns {
580 columns: Vec<AddColumn>,
582 },
583 DropColumns {
585 names: Vec<String>,
587 },
588 ModifyColumnTypes {
590 columns: Vec<ModifyColumnType>,
592 },
593 SetRegionOptions { options: Vec<SetRegionOption> },
595 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
597 SetIndexes { options: Vec<SetIndexOption> },
599 UnsetIndexes { options: Vec<UnsetIndexOption> },
601 DropDefaults {
603 names: Vec<String>,
605 },
606 SetDefaults {
608 columns: Vec<SetDefault>,
610 },
611 SyncColumns {
613 column_metadatas: Vec<ColumnMetadata>,
614 },
615}
616#[derive(Debug, PartialEq, Eq, Clone)]
617pub struct SetDefault {
618 pub name: String,
619 pub default_constraint: Vec<u8>,
620}
621
622#[derive(Debug, PartialEq, Eq, Clone)]
623pub enum SetIndexOption {
624 Fulltext {
625 column_name: String,
626 options: FulltextOptions,
627 },
628 Inverted {
629 column_name: String,
630 },
631 Skipping {
632 column_name: String,
633 options: SkippingIndexOptions,
634 },
635}
636
637impl SetIndexOption {
638 pub fn column_name(&self) -> &String {
640 match self {
641 SetIndexOption::Fulltext { column_name, .. } => column_name,
642 SetIndexOption::Inverted { column_name } => column_name,
643 SetIndexOption::Skipping { column_name, .. } => column_name,
644 }
645 }
646
647 pub fn is_fulltext(&self) -> bool {
649 match self {
650 SetIndexOption::Fulltext { .. } => true,
651 SetIndexOption::Inverted { .. } => false,
652 SetIndexOption::Skipping { .. } => false,
653 }
654 }
655}
656
657impl TryFrom<v1::SetIndex> for SetIndexOption {
658 type Error = MetadataError;
659
660 fn try_from(value: v1::SetIndex) -> Result<Self> {
661 let option = value.options.context(InvalidRawRegionRequestSnafu {
662 err: "missing options in SetIndex",
663 })?;
664
665 let opt = match option {
666 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
667 column_name: x.column_name.clone(),
668 options: FulltextOptions::new(
669 x.enable,
670 as_fulltext_option_analyzer(
671 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
672 ),
673 x.case_sensitive,
674 as_fulltext_option_backend(
675 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
676 ),
677 x.granularity as u32,
678 x.false_positive_rate,
679 )
680 .context(InvalidIndexOptionSnafu)?,
681 },
682 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
683 column_name: i.column_name,
684 },
685 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
686 column_name: s.column_name,
687 options: SkippingIndexOptions::new(
688 s.granularity as u32,
689 s.false_positive_rate,
690 as_skipping_index_type(
691 PbSkippingIndexType::try_from(s.skipping_index_type)
692 .context(DecodeProtoSnafu)?,
693 ),
694 )
695 .context(InvalidIndexOptionSnafu)?,
696 },
697 };
698
699 Ok(opt)
700 }
701}
702
703#[derive(Debug, PartialEq, Eq, Clone)]
704pub enum UnsetIndexOption {
705 Fulltext { column_name: String },
706 Inverted { column_name: String },
707 Skipping { column_name: String },
708}
709
710impl UnsetIndexOption {
711 pub fn column_name(&self) -> &String {
712 match self {
713 UnsetIndexOption::Fulltext { column_name } => column_name,
714 UnsetIndexOption::Inverted { column_name } => column_name,
715 UnsetIndexOption::Skipping { column_name } => column_name,
716 }
717 }
718
719 pub fn is_fulltext(&self) -> bool {
720 match self {
721 UnsetIndexOption::Fulltext { .. } => true,
722 UnsetIndexOption::Inverted { .. } => false,
723 UnsetIndexOption::Skipping { .. } => false,
724 }
725 }
726}
727
728impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
729 type Error = MetadataError;
730
731 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
732 let option = value.options.context(InvalidRawRegionRequestSnafu {
733 err: "missing options in UnsetIndex",
734 })?;
735
736 let opt = match option {
737 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
738 column_name: f.column_name,
739 },
740 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
741 column_name: i.column_name,
742 },
743 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
744 column_name: s.column_name,
745 },
746 };
747
748 Ok(opt)
749 }
750}
751
752impl AlterKind {
753 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
757 match self {
758 AlterKind::AddColumns { columns } => {
759 for col_to_add in columns {
760 col_to_add.validate(metadata)?;
761 }
762 }
763 AlterKind::DropColumns { names } => {
764 for name in names {
765 Self::validate_column_to_drop(name, metadata)?;
766 }
767 }
768 AlterKind::ModifyColumnTypes { columns } => {
769 for col_to_change in columns {
770 col_to_change.validate(metadata)?;
771 }
772 }
773 AlterKind::SetRegionOptions { .. } => {}
774 AlterKind::UnsetRegionOptions { .. } => {}
775 AlterKind::SetIndexes { options } => {
776 for option in options {
777 Self::validate_column_alter_index_option(
778 option.column_name(),
779 metadata,
780 option.is_fulltext(),
781 )?;
782 }
783 }
784 AlterKind::UnsetIndexes { options } => {
785 for option in options {
786 Self::validate_column_alter_index_option(
787 option.column_name(),
788 metadata,
789 option.is_fulltext(),
790 )?;
791 }
792 }
793 AlterKind::DropDefaults { names } => {
794 names
795 .iter()
796 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
797 }
798 AlterKind::SetDefaults { columns } => {
799 columns
800 .iter()
801 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
802 }
803 AlterKind::SyncColumns { column_metadatas } => {
804 let new_primary_keys = column_metadatas
805 .iter()
806 .filter(|c| c.semantic_type == SemanticType::Tag)
807 .map(|c| (c.column_schema.name.as_str(), c.column_id))
808 .collect::<HashMap<_, _>>();
809
810 let old_primary_keys = metadata
811 .column_metadatas
812 .iter()
813 .filter(|c| c.semantic_type == SemanticType::Tag)
814 .map(|c| (c.column_schema.name.as_str(), c.column_id));
815
816 for (name, id) in old_primary_keys {
817 let primary_key =
818 new_primary_keys
819 .get(name)
820 .with_context(|| InvalidRegionRequestSnafu {
821 region_id: metadata.region_id,
822 err: format!("column {} is not a primary key", name),
823 })?;
824
825 ensure!(
826 *primary_key == id,
827 InvalidRegionRequestSnafu {
828 region_id: metadata.region_id,
829 err: format!(
830 "column with same name {} has different id, existing: {}, got: {}",
831 name, id, primary_key
832 ),
833 }
834 );
835 }
836
837 let new_ts_column = column_metadatas
838 .iter()
839 .find(|c| c.semantic_type == SemanticType::Timestamp)
840 .map(|c| (c.column_schema.name.as_str(), c.column_id))
841 .context(InvalidRegionRequestSnafu {
842 region_id: metadata.region_id,
843 err: "timestamp column not found",
844 })?;
845
846 let old_ts_column = metadata
848 .column_metadatas
849 .iter()
850 .find(|c| c.semantic_type == SemanticType::Timestamp)
851 .map(|c| (c.column_schema.name.as_str(), c.column_id))
852 .unwrap();
853
854 ensure!(
855 new_ts_column == old_ts_column,
856 InvalidRegionRequestSnafu {
857 region_id: metadata.region_id,
858 err: format!(
859 "timestamp column {} has different id, existing: {}, got: {}",
860 old_ts_column.0, old_ts_column.1, new_ts_column.1
861 ),
862 }
863 );
864 }
865 }
866 Ok(())
867 }
868
869 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
871 debug_assert!(self.validate(metadata).is_ok());
872 match self {
873 AlterKind::AddColumns { columns } => columns
874 .iter()
875 .any(|col_to_add| col_to_add.need_alter(metadata)),
876 AlterKind::DropColumns { names } => names
877 .iter()
878 .any(|name| metadata.column_by_name(name).is_some()),
879 AlterKind::ModifyColumnTypes { columns } => columns
880 .iter()
881 .any(|col_to_change| col_to_change.need_alter(metadata)),
882 AlterKind::SetRegionOptions { .. } => {
883 true
886 }
887 AlterKind::UnsetRegionOptions { .. } => true,
888 AlterKind::SetIndexes { options, .. } => options
889 .iter()
890 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
891 AlterKind::UnsetIndexes { options } => options
892 .iter()
893 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
894 AlterKind::DropDefaults { names } => names
895 .iter()
896 .any(|name| metadata.column_by_name(name).is_some()),
897
898 AlterKind::SetDefaults { columns } => columns
899 .iter()
900 .any(|x| metadata.column_by_name(&x.name).is_some()),
901 AlterKind::SyncColumns { column_metadatas } => {
902 metadata.column_metadatas != *column_metadatas
903 }
904 }
905 }
906
907 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
909 let Some(column) = metadata.column_by_name(name) else {
910 return Ok(());
911 };
912 ensure!(
913 column.semantic_type == SemanticType::Field,
914 InvalidRegionRequestSnafu {
915 region_id: metadata.region_id,
916 err: format!("column {} is not a field and could not be dropped", name),
917 }
918 );
919 Ok(())
920 }
921
922 fn validate_column_alter_index_option(
924 column_name: &String,
925 metadata: &RegionMetadata,
926 is_fulltext: bool,
927 ) -> Result<()> {
928 let column = metadata
929 .column_by_name(column_name)
930 .context(InvalidRegionRequestSnafu {
931 region_id: metadata.region_id,
932 err: format!("column {} not found", column_name),
933 })?;
934
935 if is_fulltext {
936 ensure!(
937 column.column_schema.data_type.is_string(),
938 InvalidRegionRequestSnafu {
939 region_id: metadata.region_id,
940 err: format!(
941 "cannot change alter index options for non-string column {}",
942 column_name
943 ),
944 }
945 );
946 }
947
948 Ok(())
949 }
950
951 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
953 metadata
954 .column_by_name(column_name)
955 .context(InvalidRegionRequestSnafu {
956 region_id: metadata.region_id,
957 err: format!("column {} not found", column_name),
958 })?;
959
960 Ok(())
961 }
962}
963
964impl TryFrom<alter_request::Kind> for AlterKind {
965 type Error = MetadataError;
966
967 fn try_from(kind: alter_request::Kind) -> Result<Self> {
968 let alter_kind = match kind {
969 alter_request::Kind::AddColumns(x) => {
970 let columns = x
971 .add_columns
972 .into_iter()
973 .map(|x| x.try_into())
974 .collect::<Result<Vec<_>>>()?;
975 AlterKind::AddColumns { columns }
976 }
977 alter_request::Kind::ModifyColumnTypes(x) => {
978 let columns = x
979 .modify_column_types
980 .into_iter()
981 .map(|x| x.into())
982 .collect::<Vec<_>>();
983 AlterKind::ModifyColumnTypes { columns }
984 }
985 alter_request::Kind::DropColumns(x) => {
986 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
987 AlterKind::DropColumns { names }
988 }
989 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
990 options: options
991 .table_options
992 .iter()
993 .map(TryFrom::try_from)
994 .collect::<Result<Vec<_>>>()?,
995 },
996 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
997 keys: options
998 .keys
999 .iter()
1000 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1001 .collect::<Result<Vec<_>>>()?,
1002 },
1003 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1004 options: vec![SetIndexOption::try_from(o)?],
1005 },
1006 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1007 options: vec![UnsetIndexOption::try_from(o)?],
1008 },
1009 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1010 options: o
1011 .set_indexes
1012 .into_iter()
1013 .map(SetIndexOption::try_from)
1014 .collect::<Result<Vec<_>>>()?,
1015 },
1016 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1017 options: o
1018 .unset_indexes
1019 .into_iter()
1020 .map(UnsetIndexOption::try_from)
1021 .collect::<Result<Vec<_>>>()?,
1022 },
1023 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1024 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1025 },
1026 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1027 columns: x
1028 .set_defaults
1029 .into_iter()
1030 .map(|x| {
1031 Ok(SetDefault {
1032 name: x.column_name,
1033 default_constraint: x.default_constraint.clone(),
1034 })
1035 })
1036 .collect::<Result<Vec<_>>>()?,
1037 },
1038 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1039 column_metadatas: x
1040 .column_defs
1041 .into_iter()
1042 .map(ColumnMetadata::try_from_column_def)
1043 .collect::<Result<Vec<_>>>()?,
1044 },
1045 };
1046
1047 Ok(alter_kind)
1048 }
1049}
1050
1051#[derive(Debug, PartialEq, Eq, Clone)]
1053pub struct AddColumn {
1054 pub column_metadata: ColumnMetadata,
1056 pub location: Option<AddColumnLocation>,
1059}
1060
1061impl AddColumn {
1062 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1067 ensure!(
1068 self.column_metadata.column_schema.is_nullable()
1069 || self
1070 .column_metadata
1071 .column_schema
1072 .default_constraint()
1073 .is_some(),
1074 InvalidRegionRequestSnafu {
1075 region_id: metadata.region_id,
1076 err: format!(
1077 "no default value for column {}",
1078 self.column_metadata.column_schema.name
1079 ),
1080 }
1081 );
1082
1083 if let Some(existing_column) =
1084 metadata.column_by_name(&self.column_metadata.column_schema.name)
1085 {
1086 ensure!(
1088 *existing_column == self.column_metadata,
1089 InvalidRegionRequestSnafu {
1090 region_id: metadata.region_id,
1091 err: format!(
1092 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1093 self.column_metadata.column_schema.name,
1094 existing_column,
1095 self.column_metadata,
1096 ),
1097 }
1098 );
1099 ensure!(
1100 self.location.is_none(),
1101 InvalidRegionRequestSnafu {
1102 region_id: metadata.region_id,
1103 err: format!(
1104 "column {} already exists, but location is specified",
1105 self.column_metadata.column_schema.name
1106 ),
1107 }
1108 );
1109 }
1110
1111 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1112 ensure!(
1114 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1115 InvalidRegionRequestSnafu {
1116 region_id: metadata.region_id,
1117 err: format!(
1118 "column id {} already exists with different name {}",
1119 self.column_metadata.column_id, existing_column.column_schema.name
1120 ),
1121 }
1122 );
1123 }
1124
1125 Ok(())
1126 }
1127
1128 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1130 debug_assert!(self.validate(metadata).is_ok());
1131 metadata
1132 .column_by_name(&self.column_metadata.column_schema.name)
1133 .is_none()
1134 }
1135}
1136
1137impl TryFrom<v1::region::AddColumn> for AddColumn {
1138 type Error = MetadataError;
1139
1140 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1141 let column_def = add_column
1142 .column_def
1143 .context(InvalidRawRegionRequestSnafu {
1144 err: "missing column_def in AddColumn",
1145 })?;
1146
1147 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1148 let location = add_column
1149 .location
1150 .map(AddColumnLocation::try_from)
1151 .transpose()?;
1152
1153 Ok(AddColumn {
1154 column_metadata,
1155 location,
1156 })
1157 }
1158}
1159
1160#[derive(Debug, PartialEq, Eq, Clone)]
1162pub enum AddColumnLocation {
1163 First,
1165 After {
1167 column_name: String,
1169 },
1170}
1171
1172impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1173 type Error = MetadataError;
1174
1175 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1176 let location_type = LocationType::try_from(location.location_type)
1177 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1178 let add_column_location = match location_type {
1179 LocationType::First => AddColumnLocation::First,
1180 LocationType::After => AddColumnLocation::After {
1181 column_name: location.after_column_name,
1182 },
1183 };
1184
1185 Ok(add_column_location)
1186 }
1187}
1188
1189#[derive(Debug, PartialEq, Eq, Clone)]
1191pub struct ModifyColumnType {
1192 pub column_name: String,
1194 pub target_type: ConcreteDataType,
1196}
1197
1198impl ModifyColumnType {
1199 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1201 let column_meta = metadata
1202 .column_by_name(&self.column_name)
1203 .with_context(|| InvalidRegionRequestSnafu {
1204 region_id: metadata.region_id,
1205 err: format!("column {} not found", self.column_name),
1206 })?;
1207
1208 ensure!(
1209 matches!(column_meta.semantic_type, SemanticType::Field),
1210 InvalidRegionRequestSnafu {
1211 region_id: metadata.region_id,
1212 err: "'timestamp' or 'tag' column cannot change type".to_string()
1213 }
1214 );
1215 ensure!(
1216 column_meta
1217 .column_schema
1218 .data_type
1219 .can_arrow_type_cast_to(&self.target_type),
1220 InvalidRegionRequestSnafu {
1221 region_id: metadata.region_id,
1222 err: format!(
1223 "column '{}' cannot be cast automatically to type '{}'",
1224 self.column_name, self.target_type
1225 ),
1226 }
1227 );
1228
1229 Ok(())
1230 }
1231
1232 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1234 debug_assert!(self.validate(metadata).is_ok());
1235 metadata.column_by_name(&self.column_name).is_some()
1236 }
1237}
1238
1239impl From<v1::ModifyColumnType> for ModifyColumnType {
1240 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1241 let target_type = ColumnDataTypeWrapper::new(
1242 modify_column_type.target_type(),
1243 modify_column_type.target_type_extension,
1244 )
1245 .into();
1246
1247 ModifyColumnType {
1248 column_name: modify_column_type.column_name,
1249 target_type,
1250 }
1251 }
1252}
1253
1254#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1255pub enum SetRegionOption {
1256 Ttl(Option<TimeToLive>),
1257 Twsc(String, String),
1259}
1260
1261impl TryFrom<&PbOption> for SetRegionOption {
1262 type Error = MetadataError;
1263
1264 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1265 let PbOption { key, value } = value;
1266 match key.as_str() {
1267 TTL_KEY => {
1268 let ttl = TimeToLive::from_humantime_or_str(value)
1269 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1270
1271 Ok(Self::Ttl(Some(ttl)))
1272 }
1273 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1274 Ok(Self::Twsc(key.clone(), value.clone()))
1275 }
1276 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1277 }
1278 }
1279}
1280
1281impl From<&UnsetRegionOption> for SetRegionOption {
1282 fn from(unset_option: &UnsetRegionOption) -> Self {
1283 match unset_option {
1284 UnsetRegionOption::TwcsTriggerFileNum => {
1285 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1286 }
1287 UnsetRegionOption::TwcsMaxOutputFileSize => {
1288 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1289 }
1290 UnsetRegionOption::TwcsTimeWindow => {
1291 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1292 }
1293 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1294 }
1295 }
1296}
1297
1298impl TryFrom<&str> for UnsetRegionOption {
1299 type Error = MetadataError;
1300
1301 fn try_from(key: &str) -> Result<Self> {
1302 match key.to_ascii_lowercase().as_str() {
1303 TTL_KEY => Ok(Self::Ttl),
1304 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1305 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1306 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1307 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1308 }
1309 }
1310}
1311
1312#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1313pub enum UnsetRegionOption {
1314 TwcsTriggerFileNum,
1315 TwcsMaxOutputFileSize,
1316 TwcsTimeWindow,
1317 Ttl,
1318}
1319
1320impl UnsetRegionOption {
1321 pub fn as_str(&self) -> &str {
1322 match self {
1323 Self::Ttl => TTL_KEY,
1324 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1325 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1326 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1327 }
1328 }
1329}
1330
1331impl Display for UnsetRegionOption {
1332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1333 write!(f, "{}", self.as_str())
1334 }
1335}
1336
1337#[derive(Debug, Clone, Default)]
1338pub struct RegionFlushRequest {
1339 pub row_group_size: Option<usize>,
1340}
1341
1342#[derive(Debug)]
1343pub struct RegionCompactRequest {
1344 pub options: compact_request::Options,
1345 pub parallelism: Option<u32>,
1346}
1347
1348impl Default for RegionCompactRequest {
1349 fn default() -> Self {
1350 Self {
1351 options: compact_request::Options::Regular(Default::default()),
1353 parallelism: None,
1354 }
1355 }
1356}
1357
1358#[derive(Debug, Clone, Default)]
1359pub struct RegionBuildIndexRequest {}
1360
1361#[derive(Debug)]
1363pub enum RegionTruncateRequest {
1364 All,
1366 ByTimeRanges {
1367 time_ranges: Vec<(Timestamp, Timestamp)>,
1371 },
1372}
1373
1374#[derive(Debug, Clone, Copy, Default)]
1379pub struct RegionCatchupRequest {
1380 pub set_writable: bool,
1382 pub entry_id: Option<entry::Id>,
1385 pub metadata_entry_id: Option<entry::Id>,
1389 pub location_id: Option<u64>,
1391 pub checkpoint: Option<ReplayCheckpoint>,
1393}
1394
1395#[derive(Debug, Clone)]
1396pub struct RegionBulkInsertsRequest {
1397 pub region_id: RegionId,
1398 pub payload: DfRecordBatch,
1399 pub raw_data: ArrowIpc,
1400}
1401
1402impl RegionBulkInsertsRequest {
1403 pub fn estimated_size(&self) -> usize {
1404 self.payload.get_array_memory_size()
1405 }
1406}
1407
1408impl fmt::Display for RegionRequest {
1409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1410 match self {
1411 RegionRequest::Put(_) => write!(f, "Put"),
1412 RegionRequest::Delete(_) => write!(f, "Delete"),
1413 RegionRequest::Create(_) => write!(f, "Create"),
1414 RegionRequest::Drop(_) => write!(f, "Drop"),
1415 RegionRequest::Open(_) => write!(f, "Open"),
1416 RegionRequest::Close(_) => write!(f, "Close"),
1417 RegionRequest::Alter(_) => write!(f, "Alter"),
1418 RegionRequest::Flush(_) => write!(f, "Flush"),
1419 RegionRequest::Compact(_) => write!(f, "Compact"),
1420 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1421 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1422 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1423 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1424 }
1425 }
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430
1431 use api::v1::region::RegionColumnDef;
1432 use api::v1::{ColumnDataType, ColumnDef};
1433 use datatypes::prelude::ConcreteDataType;
1434 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1435
1436 use super::*;
1437 use crate::metadata::RegionMetadataBuilder;
1438
1439 #[test]
1440 fn test_from_proto_location() {
1441 let proto_location = v1::AddColumnLocation {
1442 location_type: LocationType::First as i32,
1443 after_column_name: String::default(),
1444 };
1445 let location = AddColumnLocation::try_from(proto_location).unwrap();
1446 assert_eq!(location, AddColumnLocation::First);
1447
1448 let proto_location = v1::AddColumnLocation {
1449 location_type: 10,
1450 after_column_name: String::default(),
1451 };
1452 AddColumnLocation::try_from(proto_location).unwrap_err();
1453
1454 let proto_location = v1::AddColumnLocation {
1455 location_type: LocationType::After as i32,
1456 after_column_name: "a".to_string(),
1457 };
1458 let location = AddColumnLocation::try_from(proto_location).unwrap();
1459 assert_eq!(
1460 location,
1461 AddColumnLocation::After {
1462 column_name: "a".to_string()
1463 }
1464 );
1465 }
1466
1467 #[test]
1468 fn test_from_none_proto_add_column() {
1469 AddColumn::try_from(v1::region::AddColumn {
1470 column_def: None,
1471 location: None,
1472 })
1473 .unwrap_err();
1474 }
1475
1476 #[test]
1477 fn test_from_proto_alter_request() {
1478 RegionAlterRequest::try_from(AlterRequest {
1479 region_id: 0,
1480 schema_version: 1,
1481 kind: None,
1482 })
1483 .unwrap_err();
1484
1485 let request = RegionAlterRequest::try_from(AlterRequest {
1486 region_id: 0,
1487 schema_version: 1,
1488 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1489 add_columns: vec![v1::region::AddColumn {
1490 column_def: Some(RegionColumnDef {
1491 column_def: Some(ColumnDef {
1492 name: "a".to_string(),
1493 data_type: ColumnDataType::String as i32,
1494 is_nullable: true,
1495 default_constraint: vec![],
1496 semantic_type: SemanticType::Field as i32,
1497 comment: String::new(),
1498 ..Default::default()
1499 }),
1500 column_id: 1,
1501 }),
1502 location: Some(v1::AddColumnLocation {
1503 location_type: LocationType::First as i32,
1504 after_column_name: String::default(),
1505 }),
1506 }],
1507 })),
1508 })
1509 .unwrap();
1510
1511 assert_eq!(
1512 request,
1513 RegionAlterRequest {
1514 kind: AlterKind::AddColumns {
1515 columns: vec![AddColumn {
1516 column_metadata: ColumnMetadata {
1517 column_schema: ColumnSchema::new(
1518 "a",
1519 ConcreteDataType::string_datatype(),
1520 true,
1521 ),
1522 semantic_type: SemanticType::Field,
1523 column_id: 1,
1524 },
1525 location: Some(AddColumnLocation::First),
1526 }]
1527 },
1528 }
1529 );
1530 }
1531
1532 fn new_metadata() -> RegionMetadata {
1535 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1536 builder
1537 .push_column_metadata(ColumnMetadata {
1538 column_schema: ColumnSchema::new(
1539 "ts",
1540 ConcreteDataType::timestamp_millisecond_datatype(),
1541 false,
1542 ),
1543 semantic_type: SemanticType::Timestamp,
1544 column_id: 1,
1545 })
1546 .push_column_metadata(ColumnMetadata {
1547 column_schema: ColumnSchema::new(
1548 "tag_0",
1549 ConcreteDataType::string_datatype(),
1550 true,
1551 ),
1552 semantic_type: SemanticType::Tag,
1553 column_id: 2,
1554 })
1555 .push_column_metadata(ColumnMetadata {
1556 column_schema: ColumnSchema::new(
1557 "field_0",
1558 ConcreteDataType::string_datatype(),
1559 true,
1560 ),
1561 semantic_type: SemanticType::Field,
1562 column_id: 3,
1563 })
1564 .push_column_metadata(ColumnMetadata {
1565 column_schema: ColumnSchema::new(
1566 "field_1",
1567 ConcreteDataType::boolean_datatype(),
1568 true,
1569 ),
1570 semantic_type: SemanticType::Field,
1571 column_id: 4,
1572 })
1573 .primary_key(vec![2]);
1574 builder.build().unwrap()
1575 }
1576
1577 #[test]
1578 fn test_add_column_validate() {
1579 let metadata = new_metadata();
1580 let add_column = AddColumn {
1581 column_metadata: ColumnMetadata {
1582 column_schema: ColumnSchema::new(
1583 "tag_1",
1584 ConcreteDataType::string_datatype(),
1585 true,
1586 ),
1587 semantic_type: SemanticType::Tag,
1588 column_id: 5,
1589 },
1590 location: None,
1591 };
1592 add_column.validate(&metadata).unwrap();
1593 assert!(add_column.need_alter(&metadata));
1594
1595 AddColumn {
1597 column_metadata: ColumnMetadata {
1598 column_schema: ColumnSchema::new(
1599 "tag_1",
1600 ConcreteDataType::string_datatype(),
1601 false,
1602 ),
1603 semantic_type: SemanticType::Tag,
1604 column_id: 5,
1605 },
1606 location: None,
1607 }
1608 .validate(&metadata)
1609 .unwrap_err();
1610
1611 let add_column = AddColumn {
1613 column_metadata: ColumnMetadata {
1614 column_schema: ColumnSchema::new(
1615 "tag_0",
1616 ConcreteDataType::string_datatype(),
1617 true,
1618 ),
1619 semantic_type: SemanticType::Tag,
1620 column_id: 2,
1621 },
1622 location: None,
1623 };
1624 add_column.validate(&metadata).unwrap();
1625 assert!(!add_column.need_alter(&metadata));
1626 }
1627
1628 #[test]
1629 fn test_add_duplicate_columns() {
1630 let kind = AlterKind::AddColumns {
1631 columns: vec![
1632 AddColumn {
1633 column_metadata: ColumnMetadata {
1634 column_schema: ColumnSchema::new(
1635 "tag_1",
1636 ConcreteDataType::string_datatype(),
1637 true,
1638 ),
1639 semantic_type: SemanticType::Tag,
1640 column_id: 5,
1641 },
1642 location: None,
1643 },
1644 AddColumn {
1645 column_metadata: ColumnMetadata {
1646 column_schema: ColumnSchema::new(
1647 "tag_1",
1648 ConcreteDataType::string_datatype(),
1649 true,
1650 ),
1651 semantic_type: SemanticType::Field,
1652 column_id: 6,
1653 },
1654 location: None,
1655 },
1656 ],
1657 };
1658 let metadata = new_metadata();
1659 kind.validate(&metadata).unwrap();
1660 assert!(kind.need_alter(&metadata));
1661 }
1662
1663 #[test]
1664 fn test_add_existing_column_different_metadata() {
1665 let metadata = new_metadata();
1666
1667 let kind = AlterKind::AddColumns {
1669 columns: vec![AddColumn {
1670 column_metadata: ColumnMetadata {
1671 column_schema: ColumnSchema::new(
1672 "tag_0",
1673 ConcreteDataType::string_datatype(),
1674 true,
1675 ),
1676 semantic_type: SemanticType::Tag,
1677 column_id: 4,
1678 },
1679 location: None,
1680 }],
1681 };
1682 kind.validate(&metadata).unwrap_err();
1683
1684 let kind = AlterKind::AddColumns {
1686 columns: vec![AddColumn {
1687 column_metadata: ColumnMetadata {
1688 column_schema: ColumnSchema::new(
1689 "tag_0",
1690 ConcreteDataType::int64_datatype(),
1691 true,
1692 ),
1693 semantic_type: SemanticType::Tag,
1694 column_id: 2,
1695 },
1696 location: None,
1697 }],
1698 };
1699 kind.validate(&metadata).unwrap_err();
1700
1701 let kind = AlterKind::AddColumns {
1703 columns: vec![AddColumn {
1704 column_metadata: ColumnMetadata {
1705 column_schema: ColumnSchema::new(
1706 "tag_1",
1707 ConcreteDataType::string_datatype(),
1708 true,
1709 ),
1710 semantic_type: SemanticType::Tag,
1711 column_id: 2,
1712 },
1713 location: None,
1714 }],
1715 };
1716 kind.validate(&metadata).unwrap_err();
1717 }
1718
1719 #[test]
1720 fn test_add_existing_column_with_location() {
1721 let metadata = new_metadata();
1722 let kind = AlterKind::AddColumns {
1723 columns: vec![AddColumn {
1724 column_metadata: ColumnMetadata {
1725 column_schema: ColumnSchema::new(
1726 "tag_0",
1727 ConcreteDataType::string_datatype(),
1728 true,
1729 ),
1730 semantic_type: SemanticType::Tag,
1731 column_id: 2,
1732 },
1733 location: Some(AddColumnLocation::First),
1734 }],
1735 };
1736 kind.validate(&metadata).unwrap_err();
1737 }
1738
1739 #[test]
1740 fn test_validate_drop_column() {
1741 let metadata = new_metadata();
1742 let kind = AlterKind::DropColumns {
1743 names: vec!["xxxx".to_string()],
1744 };
1745 kind.validate(&metadata).unwrap();
1746 assert!(!kind.need_alter(&metadata));
1747
1748 AlterKind::DropColumns {
1749 names: vec!["tag_0".to_string()],
1750 }
1751 .validate(&metadata)
1752 .unwrap_err();
1753
1754 let kind = AlterKind::DropColumns {
1755 names: vec!["field_0".to_string()],
1756 };
1757 kind.validate(&metadata).unwrap();
1758 assert!(kind.need_alter(&metadata));
1759 }
1760
1761 #[test]
1762 fn test_validate_modify_column_type() {
1763 let metadata = new_metadata();
1764 AlterKind::ModifyColumnTypes {
1765 columns: vec![ModifyColumnType {
1766 column_name: "xxxx".to_string(),
1767 target_type: ConcreteDataType::string_datatype(),
1768 }],
1769 }
1770 .validate(&metadata)
1771 .unwrap_err();
1772
1773 AlterKind::ModifyColumnTypes {
1774 columns: vec![ModifyColumnType {
1775 column_name: "field_1".to_string(),
1776 target_type: ConcreteDataType::date_datatype(),
1777 }],
1778 }
1779 .validate(&metadata)
1780 .unwrap_err();
1781
1782 AlterKind::ModifyColumnTypes {
1783 columns: vec![ModifyColumnType {
1784 column_name: "ts".to_string(),
1785 target_type: ConcreteDataType::date_datatype(),
1786 }],
1787 }
1788 .validate(&metadata)
1789 .unwrap_err();
1790
1791 AlterKind::ModifyColumnTypes {
1792 columns: vec![ModifyColumnType {
1793 column_name: "tag_0".to_string(),
1794 target_type: ConcreteDataType::date_datatype(),
1795 }],
1796 }
1797 .validate(&metadata)
1798 .unwrap_err();
1799
1800 let kind = AlterKind::ModifyColumnTypes {
1801 columns: vec![ModifyColumnType {
1802 column_name: "field_0".to_string(),
1803 target_type: ConcreteDataType::int32_datatype(),
1804 }],
1805 };
1806 kind.validate(&metadata).unwrap();
1807 assert!(kind.need_alter(&metadata));
1808 }
1809
1810 #[test]
1811 fn test_validate_add_columns() {
1812 let kind = AlterKind::AddColumns {
1813 columns: vec![
1814 AddColumn {
1815 column_metadata: ColumnMetadata {
1816 column_schema: ColumnSchema::new(
1817 "tag_1",
1818 ConcreteDataType::string_datatype(),
1819 true,
1820 ),
1821 semantic_type: SemanticType::Tag,
1822 column_id: 5,
1823 },
1824 location: None,
1825 },
1826 AddColumn {
1827 column_metadata: ColumnMetadata {
1828 column_schema: ColumnSchema::new(
1829 "field_2",
1830 ConcreteDataType::string_datatype(),
1831 true,
1832 ),
1833 semantic_type: SemanticType::Field,
1834 column_id: 6,
1835 },
1836 location: None,
1837 },
1838 ],
1839 };
1840 let request = RegionAlterRequest { kind };
1841 let mut metadata = new_metadata();
1842 metadata.schema_version = 1;
1843 request.validate(&metadata).unwrap();
1844 }
1845
1846 #[test]
1847 fn test_validate_create_region() {
1848 let column_metadatas = vec![
1849 ColumnMetadata {
1850 column_schema: ColumnSchema::new(
1851 "ts",
1852 ConcreteDataType::timestamp_millisecond_datatype(),
1853 false,
1854 ),
1855 semantic_type: SemanticType::Timestamp,
1856 column_id: 1,
1857 },
1858 ColumnMetadata {
1859 column_schema: ColumnSchema::new(
1860 "tag_0",
1861 ConcreteDataType::string_datatype(),
1862 true,
1863 ),
1864 semantic_type: SemanticType::Tag,
1865 column_id: 2,
1866 },
1867 ColumnMetadata {
1868 column_schema: ColumnSchema::new(
1869 "field_0",
1870 ConcreteDataType::string_datatype(),
1871 true,
1872 ),
1873 semantic_type: SemanticType::Field,
1874 column_id: 3,
1875 },
1876 ];
1877 let create = RegionCreateRequest {
1878 engine: "mito".to_string(),
1879 column_metadatas,
1880 primary_key: vec![3, 4],
1881 options: HashMap::new(),
1882 table_dir: "path".to_string(),
1883 path_type: PathType::Bare,
1884 partition_expr_json: Some("".to_string()),
1885 };
1886
1887 assert!(create.validate().is_err());
1888 }
1889
1890 #[test]
1891 fn test_validate_modify_column_fulltext_options() {
1892 let kind = AlterKind::SetIndexes {
1893 options: vec![SetIndexOption::Fulltext {
1894 column_name: "tag_0".to_string(),
1895 options: FulltextOptions::new_unchecked(
1896 true,
1897 FulltextAnalyzer::Chinese,
1898 false,
1899 FulltextBackend::Bloom,
1900 1000,
1901 0.01,
1902 ),
1903 }],
1904 };
1905 let request = RegionAlterRequest { kind };
1906 let mut metadata = new_metadata();
1907 metadata.schema_version = 1;
1908 request.validate(&metadata).unwrap();
1909
1910 let kind = AlterKind::UnsetIndexes {
1911 options: vec![UnsetIndexOption::Fulltext {
1912 column_name: "tag_0".to_string(),
1913 }],
1914 };
1915 let request = RegionAlterRequest { kind };
1916 let mut metadata = new_metadata();
1917 metadata.schema_version = 1;
1918 request.validate(&metadata).unwrap();
1919 }
1920
1921 #[test]
1922 fn test_validate_sync_columns() {
1923 let metadata = new_metadata();
1924 let kind = AlterKind::SyncColumns {
1925 column_metadatas: vec![
1926 ColumnMetadata {
1927 column_schema: ColumnSchema::new(
1928 "tag_1",
1929 ConcreteDataType::string_datatype(),
1930 true,
1931 ),
1932 semantic_type: SemanticType::Tag,
1933 column_id: 5,
1934 },
1935 ColumnMetadata {
1936 column_schema: ColumnSchema::new(
1937 "field_2",
1938 ConcreteDataType::string_datatype(),
1939 true,
1940 ),
1941 semantic_type: SemanticType::Field,
1942 column_id: 6,
1943 },
1944 ],
1945 };
1946 let err = kind.validate(&metadata).unwrap_err();
1947 assert!(err.to_string().contains("not a primary key"));
1948
1949 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1951 let ts_column = column_metadatas_with_different_ts_column
1952 .iter_mut()
1953 .find(|c| c.semantic_type == SemanticType::Timestamp)
1954 .unwrap();
1955 ts_column.column_schema.name = "ts1".to_string();
1956
1957 let kind = AlterKind::SyncColumns {
1958 column_metadatas: column_metadatas_with_different_ts_column,
1959 };
1960 let err = kind.validate(&metadata).unwrap_err();
1961 assert!(
1962 err.to_string()
1963 .contains("timestamp column ts has different id")
1964 );
1965
1966 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1968 let pk_column = column_metadatas_with_different_pk_column
1969 .iter_mut()
1970 .find(|c| c.column_schema.name == "tag_0")
1971 .unwrap();
1972 pk_column.column_id = 100;
1973 let kind = AlterKind::SyncColumns {
1974 column_metadatas: column_metadatas_with_different_pk_column,
1975 };
1976 let err = kind.validate(&metadata).unwrap_err();
1977 assert!(
1978 err.to_string()
1979 .contains("column with same name tag_0 has different id")
1980 );
1981
1982 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1984 column_metadatas_with_new_field_column.push(ColumnMetadata {
1985 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1986 semantic_type: SemanticType::Field,
1987 column_id: 4,
1988 });
1989 let kind = AlterKind::SyncColumns {
1990 column_metadatas: column_metadatas_with_new_field_column,
1991 };
1992 kind.validate(&metadata).unwrap();
1993 }
1994
1995 #[test]
1996 fn test_cast_path_type_to_primitive() {
1997 assert_eq!(PathType::Bare as u8, 0);
1998 assert_eq!(PathType::Data as u8, 1);
1999 assert_eq!(PathType::Metadata as u8, 2);
2000 assert_eq!(
2001 PathType::try_from(PathType::Bare as u8).unwrap(),
2002 PathType::Bare
2003 );
2004 assert_eq!(
2005 PathType::try_from(PathType::Data as u8).unwrap(),
2006 PathType::Data
2007 );
2008 assert_eq!(
2009 PathType::try_from(PathType::Metadata as u8).unwrap(),
2010 PathType::Metadata
2011 );
2012 }
2013}