1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26 CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27 OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49 RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54 TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63 Bare,
67 Data,
71 Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79 Create(Vec<(RegionId, RegionCreateRequest)>),
80 Drop(Vec<(RegionId, RegionDropRequest)>),
81 Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87 match body {
88 region_request::Body::Creates(creates) => {
89 let requests = creates
90 .requests
91 .into_iter()
92 .map(parse_region_create)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Some(Self::Create(requests)))
95 }
96 region_request::Body::Drops(drops) => {
97 let requests = drops
98 .requests
99 .into_iter()
100 .map(parse_region_drop)
101 .collect::<Result<Vec<_>>>()?;
102 Ok(Some(Self::Drop(requests)))
103 }
104 region_request::Body::Alters(alters) => {
105 let requests = alters
106 .requests
107 .into_iter()
108 .map(parse_region_alter)
109 .collect::<Result<Vec<_>>>()?;
110 Ok(Some(Self::Alter(requests)))
111 }
112 _ => Ok(None),
113 }
114 }
115
116 pub fn request_type(&self) -> &'static str {
117 self.into()
118 }
119
120 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121 match self {
122 Self::Create(requests) => requests
123 .into_iter()
124 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125 .collect(),
126 Self::Drop(requests) => requests
127 .into_iter()
128 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129 .collect(),
130 Self::Alter(requests) => requests
131 .into_iter()
132 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133 .collect(),
134 }
135 }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140 Put(RegionPutRequest),
141 Delete(RegionDeleteRequest),
142 Create(RegionCreateRequest),
143 Drop(RegionDropRequest),
144 Open(RegionOpenRequest),
145 Close(RegionCloseRequest),
146 Alter(RegionAlterRequest),
147 Flush(RegionFlushRequest),
148 Compact(RegionCompactRequest),
149 Truncate(RegionTruncateRequest),
150 Catchup(RegionCatchupRequest),
151 BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158 match body {
159 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161 region_request::Body::Create(create) => make_region_create(create),
162 region_request::Body::Drop(drop) => make_region_drop(drop),
163 region_request::Body::Open(open) => make_region_open(open),
164 region_request::Body::Close(close) => make_region_close(close),
165 region_request::Body::Alter(alter) => make_region_alter(alter),
166 region_request::Body::Flush(flush) => make_region_flush(flush),
167 region_request::Body::Compact(compact) => make_region_compact(compact),
168 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169 region_request::Body::Creates(creates) => make_region_creates(creates),
170 region_request::Body::Drops(drops) => make_region_drops(drops),
171 region_request::Body::Alters(alters) => make_region_alters(alters),
172 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173 region_request::Body::Sync(_) => UnexpectedSnafu {
174 reason: "Sync request should be handled separately by RegionServer",
175 }
176 .fail(),
177 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178 reason: "ListMetadata request should be handled separately by RegionServer",
179 }
180 .fail(),
181 }
182 }
183
184 pub fn request_type(&self) -> &'static str {
186 self.into()
187 }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191 let requests = inserts
192 .requests
193 .into_iter()
194 .filter_map(|r| {
195 let region_id = r.region_id.into();
196 r.rows.map(|rows| {
197 (
198 region_id,
199 RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200 )
201 })
202 })
203 .collect();
204 Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208 let requests = deletes
209 .requests
210 .into_iter()
211 .filter_map(|r| {
212 let region_id = r.region_id.into();
213 r.rows.map(|rows| {
214 (
215 region_id,
216 RegionRequest::Delete(RegionDeleteRequest { rows }),
217 )
218 })
219 })
220 .collect();
221 Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225 let column_metadatas = create
226 .column_defs
227 .into_iter()
228 .map(ColumnMetadata::try_from_column_def)
229 .collect::<Result<Vec<_>>>()?;
230 let region_id = RegionId::from(create.region_id);
231 let table_dir = table_dir(&create.path, region_id.table_id());
232 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233 Ok((
234 region_id,
235 RegionCreateRequest {
236 engine: create.engine,
237 column_metadatas,
238 primary_key: create.primary_key,
239 options: create.options,
240 table_dir,
241 path_type: PathType::Bare,
242 partition_expr_json,
243 },
244 ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248 let (region_id, request) = parse_region_create(create)?;
249 Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253 let mut requests = Vec::with_capacity(creates.requests.len());
254 for create in creates.requests {
255 requests.extend(make_region_create(create)?);
256 }
257 Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261 let region_id = drop.region_id.into();
262 Ok((
263 region_id,
264 RegionDropRequest {
265 fast_path: drop.fast_path,
266 },
267 ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271 let (region_id, request) = parse_region_drop(drop)?;
272 Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276 let mut requests = Vec::with_capacity(drops.requests.len());
277 for drop in drops.requests {
278 requests.extend(make_region_drop(drop)?);
279 }
280 Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284 let region_id = RegionId::from(open.region_id);
285 let table_dir = table_dir(&open.path, region_id.table_id());
286 Ok(vec![(
287 region_id,
288 RegionRequest::Open(RegionOpenRequest {
289 engine: open.engine,
290 table_dir,
291 path_type: PathType::Bare,
292 options: open.options,
293 skip_wal_replay: false,
294 checkpoint: None,
295 }),
296 )])
297}
298
299fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
300 let region_id = close.region_id.into();
301 Ok(vec![(
302 region_id,
303 RegionRequest::Close(RegionCloseRequest {}),
304 )])
305}
306
307fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
308 let region_id = alter.region_id.into();
309 let request = RegionAlterRequest::try_from(alter)?;
310 Ok((region_id, request))
311}
312
313fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
314 let (region_id, request) = parse_region_alter(alter)?;
315 Ok(vec![(region_id, RegionRequest::Alter(request))])
316}
317
318fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
319 let mut requests = Vec::with_capacity(alters.requests.len());
320 for alter in alters.requests {
321 requests.extend(make_region_alter(alter)?);
322 }
323 Ok(requests)
324}
325
326fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
327 let region_id = flush.region_id.into();
328 Ok(vec![(
329 region_id,
330 RegionRequest::Flush(RegionFlushRequest {
331 row_group_size: None,
332 }),
333 )])
334}
335
336fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337 let region_id = compact.region_id.into();
338 let options = compact
339 .options
340 .unwrap_or(compact_request::Options::Regular(Default::default()));
341 Ok(vec![(
342 region_id,
343 RegionRequest::Compact(RegionCompactRequest { options }),
344 )])
345}
346
347fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
348 let region_id = truncate.region_id.into();
349 match truncate.kind {
350 None => InvalidRawRegionRequestSnafu {
351 err: "missing kind in TruncateRequest".to_string(),
352 }
353 .fail(),
354 Some(truncate_request::Kind::All(_)) => Ok(vec![(
355 region_id,
356 RegionRequest::Truncate(RegionTruncateRequest::All),
357 )]),
358 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
359 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
360
361 Ok(vec![(
362 region_id,
363 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
364 )])
365 }
366 }
367}
368
369fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
371 let region_id = request.region_id.into();
372 let Some(Body::ArrowIpc(request)) = request.body else {
373 return Ok(vec![]);
374 };
375
376 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
377 .with_label_values(&["decode"])
378 .start_timer();
379 let mut decoder =
380 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
381 let payload = decoder
382 .try_decode_record_batch(&request.data_header, &request.payload)
383 .context(FlightCodecSnafu)?;
384 decoder_timer.observe_duration();
385 Ok(vec![(
386 region_id,
387 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
388 region_id,
389 payload,
390 raw_data: request,
391 }),
392 )])
393}
394
395#[derive(Debug)]
397pub struct RegionPutRequest {
398 pub rows: Rows,
400 pub hint: Option<WriteHint>,
402}
403
404#[derive(Debug)]
405pub struct RegionReadRequest {
406 pub request: ScanRequest,
407}
408
409#[derive(Debug)]
411pub struct RegionDeleteRequest {
412 pub rows: Rows,
416}
417
418#[derive(Debug, Clone)]
419pub struct RegionCreateRequest {
420 pub engine: String,
422 pub column_metadatas: Vec<ColumnMetadata>,
424 pub primary_key: Vec<ColumnId>,
426 pub options: HashMap<String, String>,
428 pub table_dir: String,
430 pub path_type: PathType,
432 pub partition_expr_json: Option<String>,
435}
436
437impl RegionCreateRequest {
438 pub fn validate(&self) -> Result<()> {
440 ensure!(
442 self.column_metadatas
443 .iter()
444 .any(|x| x.semantic_type == SemanticType::Timestamp),
445 InvalidRegionRequestSnafu {
446 region_id: RegionId::new(0, 0),
447 err: "missing timestamp column in create region request".to_string(),
448 }
449 );
450
451 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
453 for (i, c) in self.column_metadatas.iter().enumerate() {
454 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
455 return InvalidRegionRequestSnafu {
456 region_id: RegionId::new(0, 0),
457 err: format!(
458 "duplicate column id {} (at position {} and {}) in create region request",
459 c.column_id, previous, i
460 ),
461 }
462 .fail();
463 }
464 }
465
466 for column_id in &self.primary_key {
468 ensure!(
469 column_id_to_indices.contains_key(column_id),
470 InvalidRegionRequestSnafu {
471 region_id: RegionId::new(0, 0),
472 err: format!(
473 "missing primary key column {} in create region request",
474 column_id
475 ),
476 }
477 );
478 }
479
480 Ok(())
481 }
482
483 pub fn is_physical_table(&self) -> bool {
485 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
486 }
487}
488
489#[derive(Debug, Clone)]
490pub struct RegionDropRequest {
491 pub fast_path: bool,
492}
493
494#[derive(Debug, Clone)]
496pub struct RegionOpenRequest {
497 pub engine: String,
499 pub table_dir: String,
501 pub path_type: PathType,
503 pub options: HashMap<String, String>,
505 pub skip_wal_replay: bool,
507 pub checkpoint: Option<ReplayCheckpoint>,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct ReplayCheckpoint {
513 pub entry_id: u64,
514 pub metadata_entry_id: Option<u64>,
515}
516
517impl RegionOpenRequest {
518 pub fn is_physical_table(&self) -> bool {
520 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
521 }
522}
523
524#[derive(Debug)]
526pub struct RegionCloseRequest {}
527
528#[derive(Debug, PartialEq, Eq, Clone)]
530pub struct RegionAlterRequest {
531 pub kind: AlterKind,
533}
534
535impl RegionAlterRequest {
536 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
538 self.kind.validate(metadata)?;
539
540 Ok(())
541 }
542
543 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
547 debug_assert!(self.validate(metadata).is_ok());
548 self.kind.need_alter(metadata)
549 }
550}
551
552impl TryFrom<AlterRequest> for RegionAlterRequest {
553 type Error = MetadataError;
554
555 fn try_from(value: AlterRequest) -> Result<Self> {
556 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
557 err: "missing kind in AlterRequest",
558 })?;
559
560 let kind = AlterKind::try_from(kind)?;
561 Ok(RegionAlterRequest { kind })
562 }
563}
564
565#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
567pub enum AlterKind {
568 AddColumns {
570 columns: Vec<AddColumn>,
572 },
573 DropColumns {
575 names: Vec<String>,
577 },
578 ModifyColumnTypes {
580 columns: Vec<ModifyColumnType>,
582 },
583 SetRegionOptions { options: Vec<SetRegionOption> },
585 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
587 SetIndexes { options: Vec<SetIndexOption> },
589 UnsetIndexes { options: Vec<UnsetIndexOption> },
591 DropDefaults {
593 names: Vec<String>,
595 },
596 SetDefaults {
598 columns: Vec<SetDefault>,
600 },
601 SyncColumns {
603 column_metadatas: Vec<ColumnMetadata>,
604 },
605}
606#[derive(Debug, PartialEq, Eq, Clone)]
607pub struct SetDefault {
608 pub name: String,
609 pub default_constraint: Vec<u8>,
610}
611
612#[derive(Debug, PartialEq, Eq, Clone)]
613pub enum SetIndexOption {
614 Fulltext {
615 column_name: String,
616 options: FulltextOptions,
617 },
618 Inverted {
619 column_name: String,
620 },
621 Skipping {
622 column_name: String,
623 options: SkippingIndexOptions,
624 },
625}
626
627impl SetIndexOption {
628 pub fn column_name(&self) -> &String {
630 match self {
631 SetIndexOption::Fulltext { column_name, .. } => column_name,
632 SetIndexOption::Inverted { column_name } => column_name,
633 SetIndexOption::Skipping { column_name, .. } => column_name,
634 }
635 }
636
637 pub fn is_fulltext(&self) -> bool {
639 match self {
640 SetIndexOption::Fulltext { .. } => true,
641 SetIndexOption::Inverted { .. } => false,
642 SetIndexOption::Skipping { .. } => false,
643 }
644 }
645}
646
647impl TryFrom<v1::SetIndex> for SetIndexOption {
648 type Error = MetadataError;
649
650 fn try_from(value: v1::SetIndex) -> Result<Self> {
651 let option = value.options.context(InvalidRawRegionRequestSnafu {
652 err: "missing options in SetIndex",
653 })?;
654
655 let opt = match option {
656 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
657 column_name: x.column_name.clone(),
658 options: FulltextOptions::new(
659 x.enable,
660 as_fulltext_option_analyzer(
661 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
662 ),
663 x.case_sensitive,
664 as_fulltext_option_backend(
665 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
666 ),
667 x.granularity as u32,
668 x.false_positive_rate,
669 )
670 .context(InvalidIndexOptionSnafu)?,
671 },
672 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
673 column_name: i.column_name,
674 },
675 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
676 column_name: s.column_name,
677 options: SkippingIndexOptions::new(
678 s.granularity as u32,
679 s.false_positive_rate,
680 as_skipping_index_type(
681 PbSkippingIndexType::try_from(s.skipping_index_type)
682 .context(DecodeProtoSnafu)?,
683 ),
684 )
685 .context(InvalidIndexOptionSnafu)?,
686 },
687 };
688
689 Ok(opt)
690 }
691}
692
693#[derive(Debug, PartialEq, Eq, Clone)]
694pub enum UnsetIndexOption {
695 Fulltext { column_name: String },
696 Inverted { column_name: String },
697 Skipping { column_name: String },
698}
699
700impl UnsetIndexOption {
701 pub fn column_name(&self) -> &String {
702 match self {
703 UnsetIndexOption::Fulltext { column_name } => column_name,
704 UnsetIndexOption::Inverted { column_name } => column_name,
705 UnsetIndexOption::Skipping { column_name } => column_name,
706 }
707 }
708
709 pub fn is_fulltext(&self) -> bool {
710 match self {
711 UnsetIndexOption::Fulltext { .. } => true,
712 UnsetIndexOption::Inverted { .. } => false,
713 UnsetIndexOption::Skipping { .. } => false,
714 }
715 }
716}
717
718impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
719 type Error = MetadataError;
720
721 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
722 let option = value.options.context(InvalidRawRegionRequestSnafu {
723 err: "missing options in UnsetIndex",
724 })?;
725
726 let opt = match option {
727 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
728 column_name: f.column_name,
729 },
730 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
731 column_name: i.column_name,
732 },
733 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
734 column_name: s.column_name,
735 },
736 };
737
738 Ok(opt)
739 }
740}
741
742impl AlterKind {
743 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
747 match self {
748 AlterKind::AddColumns { columns } => {
749 for col_to_add in columns {
750 col_to_add.validate(metadata)?;
751 }
752 }
753 AlterKind::DropColumns { names } => {
754 for name in names {
755 Self::validate_column_to_drop(name, metadata)?;
756 }
757 }
758 AlterKind::ModifyColumnTypes { columns } => {
759 for col_to_change in columns {
760 col_to_change.validate(metadata)?;
761 }
762 }
763 AlterKind::SetRegionOptions { .. } => {}
764 AlterKind::UnsetRegionOptions { .. } => {}
765 AlterKind::SetIndexes { options } => {
766 for option in options {
767 Self::validate_column_alter_index_option(
768 option.column_name(),
769 metadata,
770 option.is_fulltext(),
771 )?;
772 }
773 }
774 AlterKind::UnsetIndexes { options } => {
775 for option in options {
776 Self::validate_column_alter_index_option(
777 option.column_name(),
778 metadata,
779 option.is_fulltext(),
780 )?;
781 }
782 }
783 AlterKind::DropDefaults { names } => {
784 names
785 .iter()
786 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
787 }
788 AlterKind::SetDefaults { columns } => {
789 columns
790 .iter()
791 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
792 }
793 AlterKind::SyncColumns { column_metadatas } => {
794 let new_primary_keys = column_metadatas
795 .iter()
796 .filter(|c| c.semantic_type == SemanticType::Tag)
797 .map(|c| (c.column_schema.name.as_str(), c.column_id))
798 .collect::<HashMap<_, _>>();
799
800 let old_primary_keys = metadata
801 .column_metadatas
802 .iter()
803 .filter(|c| c.semantic_type == SemanticType::Tag)
804 .map(|c| (c.column_schema.name.as_str(), c.column_id));
805
806 for (name, id) in old_primary_keys {
807 let primary_key =
808 new_primary_keys
809 .get(name)
810 .with_context(|| InvalidRegionRequestSnafu {
811 region_id: metadata.region_id,
812 err: format!("column {} is not a primary key", name),
813 })?;
814
815 ensure!(
816 *primary_key == id,
817 InvalidRegionRequestSnafu {
818 region_id: metadata.region_id,
819 err: format!(
820 "column with same name {} has different id, existing: {}, got: {}",
821 name, id, primary_key
822 ),
823 }
824 );
825 }
826
827 let new_ts_column = column_metadatas
828 .iter()
829 .find(|c| c.semantic_type == SemanticType::Timestamp)
830 .map(|c| (c.column_schema.name.as_str(), c.column_id))
831 .context(InvalidRegionRequestSnafu {
832 region_id: metadata.region_id,
833 err: "timestamp column not found",
834 })?;
835
836 let old_ts_column = metadata
838 .column_metadatas
839 .iter()
840 .find(|c| c.semantic_type == SemanticType::Timestamp)
841 .map(|c| (c.column_schema.name.as_str(), c.column_id))
842 .unwrap();
843
844 ensure!(
845 new_ts_column == old_ts_column,
846 InvalidRegionRequestSnafu {
847 region_id: metadata.region_id,
848 err: format!(
849 "timestamp column {} has different id, existing: {}, got: {}",
850 old_ts_column.0, old_ts_column.1, new_ts_column.1
851 ),
852 }
853 );
854 }
855 }
856 Ok(())
857 }
858
859 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
861 debug_assert!(self.validate(metadata).is_ok());
862 match self {
863 AlterKind::AddColumns { columns } => columns
864 .iter()
865 .any(|col_to_add| col_to_add.need_alter(metadata)),
866 AlterKind::DropColumns { names } => names
867 .iter()
868 .any(|name| metadata.column_by_name(name).is_some()),
869 AlterKind::ModifyColumnTypes { columns } => columns
870 .iter()
871 .any(|col_to_change| col_to_change.need_alter(metadata)),
872 AlterKind::SetRegionOptions { .. } => {
873 true
876 }
877 AlterKind::UnsetRegionOptions { .. } => true,
878 AlterKind::SetIndexes { options, .. } => options
879 .iter()
880 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
881 AlterKind::UnsetIndexes { options } => options
882 .iter()
883 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
884 AlterKind::DropDefaults { names } => names
885 .iter()
886 .any(|name| metadata.column_by_name(name).is_some()),
887
888 AlterKind::SetDefaults { columns } => columns
889 .iter()
890 .any(|x| metadata.column_by_name(&x.name).is_some()),
891 AlterKind::SyncColumns { column_metadatas } => {
892 metadata.column_metadatas != *column_metadatas
893 }
894 }
895 }
896
897 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
899 let Some(column) = metadata.column_by_name(name) else {
900 return Ok(());
901 };
902 ensure!(
903 column.semantic_type == SemanticType::Field,
904 InvalidRegionRequestSnafu {
905 region_id: metadata.region_id,
906 err: format!("column {} is not a field and could not be dropped", name),
907 }
908 );
909 Ok(())
910 }
911
912 fn validate_column_alter_index_option(
914 column_name: &String,
915 metadata: &RegionMetadata,
916 is_fulltext: bool,
917 ) -> Result<()> {
918 let column = metadata
919 .column_by_name(column_name)
920 .context(InvalidRegionRequestSnafu {
921 region_id: metadata.region_id,
922 err: format!("column {} not found", column_name),
923 })?;
924
925 if is_fulltext {
926 ensure!(
927 column.column_schema.data_type.is_string(),
928 InvalidRegionRequestSnafu {
929 region_id: metadata.region_id,
930 err: format!(
931 "cannot change alter index options for non-string column {}",
932 column_name
933 ),
934 }
935 );
936 }
937
938 Ok(())
939 }
940
941 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
943 metadata
944 .column_by_name(column_name)
945 .context(InvalidRegionRequestSnafu {
946 region_id: metadata.region_id,
947 err: format!("column {} not found", column_name),
948 })?;
949
950 Ok(())
951 }
952}
953
954impl TryFrom<alter_request::Kind> for AlterKind {
955 type Error = MetadataError;
956
957 fn try_from(kind: alter_request::Kind) -> Result<Self> {
958 let alter_kind = match kind {
959 alter_request::Kind::AddColumns(x) => {
960 let columns = x
961 .add_columns
962 .into_iter()
963 .map(|x| x.try_into())
964 .collect::<Result<Vec<_>>>()?;
965 AlterKind::AddColumns { columns }
966 }
967 alter_request::Kind::ModifyColumnTypes(x) => {
968 let columns = x
969 .modify_column_types
970 .into_iter()
971 .map(|x| x.into())
972 .collect::<Vec<_>>();
973 AlterKind::ModifyColumnTypes { columns }
974 }
975 alter_request::Kind::DropColumns(x) => {
976 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
977 AlterKind::DropColumns { names }
978 }
979 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
980 options: options
981 .table_options
982 .iter()
983 .map(TryFrom::try_from)
984 .collect::<Result<Vec<_>>>()?,
985 },
986 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
987 keys: options
988 .keys
989 .iter()
990 .map(|key| UnsetRegionOption::try_from(key.as_str()))
991 .collect::<Result<Vec<_>>>()?,
992 },
993 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
994 options: vec![SetIndexOption::try_from(o)?],
995 },
996 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
997 options: vec![UnsetIndexOption::try_from(o)?],
998 },
999 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1000 options: o
1001 .set_indexes
1002 .into_iter()
1003 .map(SetIndexOption::try_from)
1004 .collect::<Result<Vec<_>>>()?,
1005 },
1006 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1007 options: o
1008 .unset_indexes
1009 .into_iter()
1010 .map(UnsetIndexOption::try_from)
1011 .collect::<Result<Vec<_>>>()?,
1012 },
1013 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1014 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1015 },
1016 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1017 columns: x
1018 .set_defaults
1019 .into_iter()
1020 .map(|x| {
1021 Ok(SetDefault {
1022 name: x.column_name,
1023 default_constraint: x.default_constraint.clone(),
1024 })
1025 })
1026 .collect::<Result<Vec<_>>>()?,
1027 },
1028 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1029 column_metadatas: x
1030 .column_defs
1031 .into_iter()
1032 .map(ColumnMetadata::try_from_column_def)
1033 .collect::<Result<Vec<_>>>()?,
1034 },
1035 };
1036
1037 Ok(alter_kind)
1038 }
1039}
1040
1041#[derive(Debug, PartialEq, Eq, Clone)]
1043pub struct AddColumn {
1044 pub column_metadata: ColumnMetadata,
1046 pub location: Option<AddColumnLocation>,
1049}
1050
1051impl AddColumn {
1052 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1057 ensure!(
1058 self.column_metadata.column_schema.is_nullable()
1059 || self
1060 .column_metadata
1061 .column_schema
1062 .default_constraint()
1063 .is_some(),
1064 InvalidRegionRequestSnafu {
1065 region_id: metadata.region_id,
1066 err: format!(
1067 "no default value for column {}",
1068 self.column_metadata.column_schema.name
1069 ),
1070 }
1071 );
1072
1073 if let Some(existing_column) =
1074 metadata.column_by_name(&self.column_metadata.column_schema.name)
1075 {
1076 ensure!(
1078 *existing_column == self.column_metadata,
1079 InvalidRegionRequestSnafu {
1080 region_id: metadata.region_id,
1081 err: format!(
1082 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1083 self.column_metadata.column_schema.name,
1084 existing_column,
1085 self.column_metadata,
1086 ),
1087 }
1088 );
1089 ensure!(
1090 self.location.is_none(),
1091 InvalidRegionRequestSnafu {
1092 region_id: metadata.region_id,
1093 err: format!(
1094 "column {} already exists, but location is specified",
1095 self.column_metadata.column_schema.name
1096 ),
1097 }
1098 );
1099 }
1100
1101 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1102 ensure!(
1104 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1105 InvalidRegionRequestSnafu {
1106 region_id: metadata.region_id,
1107 err: format!(
1108 "column id {} already exists with different name {}",
1109 self.column_metadata.column_id, existing_column.column_schema.name
1110 ),
1111 }
1112 );
1113 }
1114
1115 Ok(())
1116 }
1117
1118 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1120 debug_assert!(self.validate(metadata).is_ok());
1121 metadata
1122 .column_by_name(&self.column_metadata.column_schema.name)
1123 .is_none()
1124 }
1125}
1126
1127impl TryFrom<v1::region::AddColumn> for AddColumn {
1128 type Error = MetadataError;
1129
1130 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1131 let column_def = add_column
1132 .column_def
1133 .context(InvalidRawRegionRequestSnafu {
1134 err: "missing column_def in AddColumn",
1135 })?;
1136
1137 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1138 let location = add_column
1139 .location
1140 .map(AddColumnLocation::try_from)
1141 .transpose()?;
1142
1143 Ok(AddColumn {
1144 column_metadata,
1145 location,
1146 })
1147 }
1148}
1149
1150#[derive(Debug, PartialEq, Eq, Clone)]
1152pub enum AddColumnLocation {
1153 First,
1155 After {
1157 column_name: String,
1159 },
1160}
1161
1162impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1163 type Error = MetadataError;
1164
1165 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1166 let location_type = LocationType::try_from(location.location_type)
1167 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1168 let add_column_location = match location_type {
1169 LocationType::First => AddColumnLocation::First,
1170 LocationType::After => AddColumnLocation::After {
1171 column_name: location.after_column_name,
1172 },
1173 };
1174
1175 Ok(add_column_location)
1176 }
1177}
1178
1179#[derive(Debug, PartialEq, Eq, Clone)]
1181pub struct ModifyColumnType {
1182 pub column_name: String,
1184 pub target_type: ConcreteDataType,
1186}
1187
1188impl ModifyColumnType {
1189 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1191 let column_meta = metadata
1192 .column_by_name(&self.column_name)
1193 .with_context(|| InvalidRegionRequestSnafu {
1194 region_id: metadata.region_id,
1195 err: format!("column {} not found", self.column_name),
1196 })?;
1197
1198 ensure!(
1199 matches!(column_meta.semantic_type, SemanticType::Field),
1200 InvalidRegionRequestSnafu {
1201 region_id: metadata.region_id,
1202 err: "'timestamp' or 'tag' column cannot change type".to_string()
1203 }
1204 );
1205 ensure!(
1206 column_meta
1207 .column_schema
1208 .data_type
1209 .can_arrow_type_cast_to(&self.target_type),
1210 InvalidRegionRequestSnafu {
1211 region_id: metadata.region_id,
1212 err: format!(
1213 "column '{}' cannot be cast automatically to type '{}'",
1214 self.column_name, self.target_type
1215 ),
1216 }
1217 );
1218
1219 Ok(())
1220 }
1221
1222 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1224 debug_assert!(self.validate(metadata).is_ok());
1225 metadata.column_by_name(&self.column_name).is_some()
1226 }
1227}
1228
1229impl From<v1::ModifyColumnType> for ModifyColumnType {
1230 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1231 let target_type = ColumnDataTypeWrapper::new(
1232 modify_column_type.target_type(),
1233 modify_column_type.target_type_extension,
1234 )
1235 .into();
1236
1237 ModifyColumnType {
1238 column_name: modify_column_type.column_name,
1239 target_type,
1240 }
1241 }
1242}
1243
1244#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1245pub enum SetRegionOption {
1246 Ttl(Option<TimeToLive>),
1247 Twsc(String, String),
1249}
1250
1251impl TryFrom<&PbOption> for SetRegionOption {
1252 type Error = MetadataError;
1253
1254 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1255 let PbOption { key, value } = value;
1256 match key.as_str() {
1257 TTL_KEY => {
1258 let ttl = TimeToLive::from_humantime_or_str(value)
1259 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1260
1261 Ok(Self::Ttl(Some(ttl)))
1262 }
1263 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1264 Ok(Self::Twsc(key.to_string(), value.to_string()))
1265 }
1266 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1267 }
1268 }
1269}
1270
1271impl From<&UnsetRegionOption> for SetRegionOption {
1272 fn from(unset_option: &UnsetRegionOption) -> Self {
1273 match unset_option {
1274 UnsetRegionOption::TwcsTriggerFileNum => {
1275 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1276 }
1277 UnsetRegionOption::TwcsMaxOutputFileSize => {
1278 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1279 }
1280 UnsetRegionOption::TwcsTimeWindow => {
1281 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1282 }
1283 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1284 }
1285 }
1286}
1287
1288impl TryFrom<&str> for UnsetRegionOption {
1289 type Error = MetadataError;
1290
1291 fn try_from(key: &str) -> Result<Self> {
1292 match key.to_ascii_lowercase().as_str() {
1293 TTL_KEY => Ok(Self::Ttl),
1294 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1295 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1296 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1297 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1298 }
1299 }
1300}
1301
1302#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1303pub enum UnsetRegionOption {
1304 TwcsTriggerFileNum,
1305 TwcsMaxOutputFileSize,
1306 TwcsTimeWindow,
1307 Ttl,
1308}
1309
1310impl UnsetRegionOption {
1311 pub fn as_str(&self) -> &str {
1312 match self {
1313 Self::Ttl => TTL_KEY,
1314 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1315 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1316 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1317 }
1318 }
1319}
1320
1321impl Display for UnsetRegionOption {
1322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1323 write!(f, "{}", self.as_str())
1324 }
1325}
1326
1327#[derive(Debug, Clone, Default)]
1328pub struct RegionFlushRequest {
1329 pub row_group_size: Option<usize>,
1330}
1331
1332#[derive(Debug)]
1333pub struct RegionCompactRequest {
1334 pub options: compact_request::Options,
1335}
1336
1337impl Default for RegionCompactRequest {
1338 fn default() -> Self {
1339 Self {
1340 options: compact_request::Options::Regular(Default::default()),
1342 }
1343 }
1344}
1345
1346#[derive(Debug)]
1348pub enum RegionTruncateRequest {
1349 All,
1351 ByTimeRanges {
1352 time_ranges: Vec<(Timestamp, Timestamp)>,
1356 },
1357}
1358
1359#[derive(Debug, Clone, Copy, Default)]
1364pub struct RegionCatchupRequest {
1365 pub set_writable: bool,
1367 pub entry_id: Option<entry::Id>,
1370 pub metadata_entry_id: Option<entry::Id>,
1374 pub location_id: Option<u64>,
1376 pub checkpoint: Option<ReplayCheckpoint>,
1378}
1379
1380#[derive(Debug, Clone)]
1381pub struct RegionBulkInsertsRequest {
1382 pub region_id: RegionId,
1383 pub payload: DfRecordBatch,
1384 pub raw_data: ArrowIpc,
1385}
1386
1387impl RegionBulkInsertsRequest {
1388 pub fn estimated_size(&self) -> usize {
1389 self.payload.get_array_memory_size()
1390 }
1391}
1392
1393impl fmt::Display for RegionRequest {
1394 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1395 match self {
1396 RegionRequest::Put(_) => write!(f, "Put"),
1397 RegionRequest::Delete(_) => write!(f, "Delete"),
1398 RegionRequest::Create(_) => write!(f, "Create"),
1399 RegionRequest::Drop(_) => write!(f, "Drop"),
1400 RegionRequest::Open(_) => write!(f, "Open"),
1401 RegionRequest::Close(_) => write!(f, "Close"),
1402 RegionRequest::Alter(_) => write!(f, "Alter"),
1403 RegionRequest::Flush(_) => write!(f, "Flush"),
1404 RegionRequest::Compact(_) => write!(f, "Compact"),
1405 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1406 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1407 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1408 }
1409 }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414
1415 use api::v1::region::RegionColumnDef;
1416 use api::v1::{ColumnDataType, ColumnDef};
1417 use datatypes::prelude::ConcreteDataType;
1418 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1419
1420 use super::*;
1421 use crate::metadata::RegionMetadataBuilder;
1422
1423 #[test]
1424 fn test_from_proto_location() {
1425 let proto_location = v1::AddColumnLocation {
1426 location_type: LocationType::First as i32,
1427 after_column_name: String::default(),
1428 };
1429 let location = AddColumnLocation::try_from(proto_location).unwrap();
1430 assert_eq!(location, AddColumnLocation::First);
1431
1432 let proto_location = v1::AddColumnLocation {
1433 location_type: 10,
1434 after_column_name: String::default(),
1435 };
1436 AddColumnLocation::try_from(proto_location).unwrap_err();
1437
1438 let proto_location = v1::AddColumnLocation {
1439 location_type: LocationType::After as i32,
1440 after_column_name: "a".to_string(),
1441 };
1442 let location = AddColumnLocation::try_from(proto_location).unwrap();
1443 assert_eq!(
1444 location,
1445 AddColumnLocation::After {
1446 column_name: "a".to_string()
1447 }
1448 );
1449 }
1450
1451 #[test]
1452 fn test_from_none_proto_add_column() {
1453 AddColumn::try_from(v1::region::AddColumn {
1454 column_def: None,
1455 location: None,
1456 })
1457 .unwrap_err();
1458 }
1459
1460 #[test]
1461 fn test_from_proto_alter_request() {
1462 RegionAlterRequest::try_from(AlterRequest {
1463 region_id: 0,
1464 schema_version: 1,
1465 kind: None,
1466 })
1467 .unwrap_err();
1468
1469 let request = RegionAlterRequest::try_from(AlterRequest {
1470 region_id: 0,
1471 schema_version: 1,
1472 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1473 add_columns: vec![v1::region::AddColumn {
1474 column_def: Some(RegionColumnDef {
1475 column_def: Some(ColumnDef {
1476 name: "a".to_string(),
1477 data_type: ColumnDataType::String as i32,
1478 is_nullable: true,
1479 default_constraint: vec![],
1480 semantic_type: SemanticType::Field as i32,
1481 comment: String::new(),
1482 ..Default::default()
1483 }),
1484 column_id: 1,
1485 }),
1486 location: Some(v1::AddColumnLocation {
1487 location_type: LocationType::First as i32,
1488 after_column_name: String::default(),
1489 }),
1490 }],
1491 })),
1492 })
1493 .unwrap();
1494
1495 assert_eq!(
1496 request,
1497 RegionAlterRequest {
1498 kind: AlterKind::AddColumns {
1499 columns: vec![AddColumn {
1500 column_metadata: ColumnMetadata {
1501 column_schema: ColumnSchema::new(
1502 "a",
1503 ConcreteDataType::string_datatype(),
1504 true,
1505 ),
1506 semantic_type: SemanticType::Field,
1507 column_id: 1,
1508 },
1509 location: Some(AddColumnLocation::First),
1510 }]
1511 },
1512 }
1513 );
1514 }
1515
1516 fn new_metadata() -> RegionMetadata {
1519 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1520 builder
1521 .push_column_metadata(ColumnMetadata {
1522 column_schema: ColumnSchema::new(
1523 "ts",
1524 ConcreteDataType::timestamp_millisecond_datatype(),
1525 false,
1526 ),
1527 semantic_type: SemanticType::Timestamp,
1528 column_id: 1,
1529 })
1530 .push_column_metadata(ColumnMetadata {
1531 column_schema: ColumnSchema::new(
1532 "tag_0",
1533 ConcreteDataType::string_datatype(),
1534 true,
1535 ),
1536 semantic_type: SemanticType::Tag,
1537 column_id: 2,
1538 })
1539 .push_column_metadata(ColumnMetadata {
1540 column_schema: ColumnSchema::new(
1541 "field_0",
1542 ConcreteDataType::string_datatype(),
1543 true,
1544 ),
1545 semantic_type: SemanticType::Field,
1546 column_id: 3,
1547 })
1548 .push_column_metadata(ColumnMetadata {
1549 column_schema: ColumnSchema::new(
1550 "field_1",
1551 ConcreteDataType::boolean_datatype(),
1552 true,
1553 ),
1554 semantic_type: SemanticType::Field,
1555 column_id: 4,
1556 })
1557 .primary_key(vec![2]);
1558 builder.build().unwrap()
1559 }
1560
1561 #[test]
1562 fn test_add_column_validate() {
1563 let metadata = new_metadata();
1564 let add_column = AddColumn {
1565 column_metadata: ColumnMetadata {
1566 column_schema: ColumnSchema::new(
1567 "tag_1",
1568 ConcreteDataType::string_datatype(),
1569 true,
1570 ),
1571 semantic_type: SemanticType::Tag,
1572 column_id: 5,
1573 },
1574 location: None,
1575 };
1576 add_column.validate(&metadata).unwrap();
1577 assert!(add_column.need_alter(&metadata));
1578
1579 AddColumn {
1581 column_metadata: ColumnMetadata {
1582 column_schema: ColumnSchema::new(
1583 "tag_1",
1584 ConcreteDataType::string_datatype(),
1585 false,
1586 ),
1587 semantic_type: SemanticType::Tag,
1588 column_id: 5,
1589 },
1590 location: None,
1591 }
1592 .validate(&metadata)
1593 .unwrap_err();
1594
1595 let add_column = AddColumn {
1597 column_metadata: ColumnMetadata {
1598 column_schema: ColumnSchema::new(
1599 "tag_0",
1600 ConcreteDataType::string_datatype(),
1601 true,
1602 ),
1603 semantic_type: SemanticType::Tag,
1604 column_id: 2,
1605 },
1606 location: None,
1607 };
1608 add_column.validate(&metadata).unwrap();
1609 assert!(!add_column.need_alter(&metadata));
1610 }
1611
1612 #[test]
1613 fn test_add_duplicate_columns() {
1614 let kind = AlterKind::AddColumns {
1615 columns: vec![
1616 AddColumn {
1617 column_metadata: ColumnMetadata {
1618 column_schema: ColumnSchema::new(
1619 "tag_1",
1620 ConcreteDataType::string_datatype(),
1621 true,
1622 ),
1623 semantic_type: SemanticType::Tag,
1624 column_id: 5,
1625 },
1626 location: None,
1627 },
1628 AddColumn {
1629 column_metadata: ColumnMetadata {
1630 column_schema: ColumnSchema::new(
1631 "tag_1",
1632 ConcreteDataType::string_datatype(),
1633 true,
1634 ),
1635 semantic_type: SemanticType::Field,
1636 column_id: 6,
1637 },
1638 location: None,
1639 },
1640 ],
1641 };
1642 let metadata = new_metadata();
1643 kind.validate(&metadata).unwrap();
1644 assert!(kind.need_alter(&metadata));
1645 }
1646
1647 #[test]
1648 fn test_add_existing_column_different_metadata() {
1649 let metadata = new_metadata();
1650
1651 let kind = AlterKind::AddColumns {
1653 columns: vec![AddColumn {
1654 column_metadata: ColumnMetadata {
1655 column_schema: ColumnSchema::new(
1656 "tag_0",
1657 ConcreteDataType::string_datatype(),
1658 true,
1659 ),
1660 semantic_type: SemanticType::Tag,
1661 column_id: 4,
1662 },
1663 location: None,
1664 }],
1665 };
1666 kind.validate(&metadata).unwrap_err();
1667
1668 let kind = AlterKind::AddColumns {
1670 columns: vec![AddColumn {
1671 column_metadata: ColumnMetadata {
1672 column_schema: ColumnSchema::new(
1673 "tag_0",
1674 ConcreteDataType::int64_datatype(),
1675 true,
1676 ),
1677 semantic_type: SemanticType::Tag,
1678 column_id: 2,
1679 },
1680 location: None,
1681 }],
1682 };
1683 kind.validate(&metadata).unwrap_err();
1684
1685 let kind = AlterKind::AddColumns {
1687 columns: vec![AddColumn {
1688 column_metadata: ColumnMetadata {
1689 column_schema: ColumnSchema::new(
1690 "tag_1",
1691 ConcreteDataType::string_datatype(),
1692 true,
1693 ),
1694 semantic_type: SemanticType::Tag,
1695 column_id: 2,
1696 },
1697 location: None,
1698 }],
1699 };
1700 kind.validate(&metadata).unwrap_err();
1701 }
1702
1703 #[test]
1704 fn test_add_existing_column_with_location() {
1705 let metadata = new_metadata();
1706 let kind = AlterKind::AddColumns {
1707 columns: vec![AddColumn {
1708 column_metadata: ColumnMetadata {
1709 column_schema: ColumnSchema::new(
1710 "tag_0",
1711 ConcreteDataType::string_datatype(),
1712 true,
1713 ),
1714 semantic_type: SemanticType::Tag,
1715 column_id: 2,
1716 },
1717 location: Some(AddColumnLocation::First),
1718 }],
1719 };
1720 kind.validate(&metadata).unwrap_err();
1721 }
1722
1723 #[test]
1724 fn test_validate_drop_column() {
1725 let metadata = new_metadata();
1726 let kind = AlterKind::DropColumns {
1727 names: vec!["xxxx".to_string()],
1728 };
1729 kind.validate(&metadata).unwrap();
1730 assert!(!kind.need_alter(&metadata));
1731
1732 AlterKind::DropColumns {
1733 names: vec!["tag_0".to_string()],
1734 }
1735 .validate(&metadata)
1736 .unwrap_err();
1737
1738 let kind = AlterKind::DropColumns {
1739 names: vec!["field_0".to_string()],
1740 };
1741 kind.validate(&metadata).unwrap();
1742 assert!(kind.need_alter(&metadata));
1743 }
1744
1745 #[test]
1746 fn test_validate_modify_column_type() {
1747 let metadata = new_metadata();
1748 AlterKind::ModifyColumnTypes {
1749 columns: vec![ModifyColumnType {
1750 column_name: "xxxx".to_string(),
1751 target_type: ConcreteDataType::string_datatype(),
1752 }],
1753 }
1754 .validate(&metadata)
1755 .unwrap_err();
1756
1757 AlterKind::ModifyColumnTypes {
1758 columns: vec![ModifyColumnType {
1759 column_name: "field_1".to_string(),
1760 target_type: ConcreteDataType::date_datatype(),
1761 }],
1762 }
1763 .validate(&metadata)
1764 .unwrap_err();
1765
1766 AlterKind::ModifyColumnTypes {
1767 columns: vec![ModifyColumnType {
1768 column_name: "ts".to_string(),
1769 target_type: ConcreteDataType::date_datatype(),
1770 }],
1771 }
1772 .validate(&metadata)
1773 .unwrap_err();
1774
1775 AlterKind::ModifyColumnTypes {
1776 columns: vec![ModifyColumnType {
1777 column_name: "tag_0".to_string(),
1778 target_type: ConcreteDataType::date_datatype(),
1779 }],
1780 }
1781 .validate(&metadata)
1782 .unwrap_err();
1783
1784 let kind = AlterKind::ModifyColumnTypes {
1785 columns: vec![ModifyColumnType {
1786 column_name: "field_0".to_string(),
1787 target_type: ConcreteDataType::int32_datatype(),
1788 }],
1789 };
1790 kind.validate(&metadata).unwrap();
1791 assert!(kind.need_alter(&metadata));
1792 }
1793
1794 #[test]
1795 fn test_validate_add_columns() {
1796 let kind = AlterKind::AddColumns {
1797 columns: vec![
1798 AddColumn {
1799 column_metadata: ColumnMetadata {
1800 column_schema: ColumnSchema::new(
1801 "tag_1",
1802 ConcreteDataType::string_datatype(),
1803 true,
1804 ),
1805 semantic_type: SemanticType::Tag,
1806 column_id: 5,
1807 },
1808 location: None,
1809 },
1810 AddColumn {
1811 column_metadata: ColumnMetadata {
1812 column_schema: ColumnSchema::new(
1813 "field_2",
1814 ConcreteDataType::string_datatype(),
1815 true,
1816 ),
1817 semantic_type: SemanticType::Field,
1818 column_id: 6,
1819 },
1820 location: None,
1821 },
1822 ],
1823 };
1824 let request = RegionAlterRequest { kind };
1825 let mut metadata = new_metadata();
1826 metadata.schema_version = 1;
1827 request.validate(&metadata).unwrap();
1828 }
1829
1830 #[test]
1831 fn test_validate_create_region() {
1832 let column_metadatas = vec![
1833 ColumnMetadata {
1834 column_schema: ColumnSchema::new(
1835 "ts",
1836 ConcreteDataType::timestamp_millisecond_datatype(),
1837 false,
1838 ),
1839 semantic_type: SemanticType::Timestamp,
1840 column_id: 1,
1841 },
1842 ColumnMetadata {
1843 column_schema: ColumnSchema::new(
1844 "tag_0",
1845 ConcreteDataType::string_datatype(),
1846 true,
1847 ),
1848 semantic_type: SemanticType::Tag,
1849 column_id: 2,
1850 },
1851 ColumnMetadata {
1852 column_schema: ColumnSchema::new(
1853 "field_0",
1854 ConcreteDataType::string_datatype(),
1855 true,
1856 ),
1857 semantic_type: SemanticType::Field,
1858 column_id: 3,
1859 },
1860 ];
1861 let create = RegionCreateRequest {
1862 engine: "mito".to_string(),
1863 column_metadatas,
1864 primary_key: vec![3, 4],
1865 options: HashMap::new(),
1866 table_dir: "path".to_string(),
1867 path_type: PathType::Bare,
1868 partition_expr_json: Some("".to_string()),
1869 };
1870
1871 assert!(create.validate().is_err());
1872 }
1873
1874 #[test]
1875 fn test_validate_modify_column_fulltext_options() {
1876 let kind = AlterKind::SetIndexes {
1877 options: vec![SetIndexOption::Fulltext {
1878 column_name: "tag_0".to_string(),
1879 options: FulltextOptions::new_unchecked(
1880 true,
1881 FulltextAnalyzer::Chinese,
1882 false,
1883 FulltextBackend::Bloom,
1884 1000,
1885 0.01,
1886 ),
1887 }],
1888 };
1889 let request = RegionAlterRequest { kind };
1890 let mut metadata = new_metadata();
1891 metadata.schema_version = 1;
1892 request.validate(&metadata).unwrap();
1893
1894 let kind = AlterKind::UnsetIndexes {
1895 options: vec![UnsetIndexOption::Fulltext {
1896 column_name: "tag_0".to_string(),
1897 }],
1898 };
1899 let request = RegionAlterRequest { kind };
1900 let mut metadata = new_metadata();
1901 metadata.schema_version = 1;
1902 request.validate(&metadata).unwrap();
1903 }
1904
1905 #[test]
1906 fn test_validate_sync_columns() {
1907 let metadata = new_metadata();
1908 let kind = AlterKind::SyncColumns {
1909 column_metadatas: vec![
1910 ColumnMetadata {
1911 column_schema: ColumnSchema::new(
1912 "tag_1",
1913 ConcreteDataType::string_datatype(),
1914 true,
1915 ),
1916 semantic_type: SemanticType::Tag,
1917 column_id: 5,
1918 },
1919 ColumnMetadata {
1920 column_schema: ColumnSchema::new(
1921 "field_2",
1922 ConcreteDataType::string_datatype(),
1923 true,
1924 ),
1925 semantic_type: SemanticType::Field,
1926 column_id: 6,
1927 },
1928 ],
1929 };
1930 let err = kind.validate(&metadata).unwrap_err();
1931 assert!(err.to_string().contains("not a primary key"));
1932
1933 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1935 let ts_column = column_metadatas_with_different_ts_column
1936 .iter_mut()
1937 .find(|c| c.semantic_type == SemanticType::Timestamp)
1938 .unwrap();
1939 ts_column.column_schema.name = "ts1".to_string();
1940
1941 let kind = AlterKind::SyncColumns {
1942 column_metadatas: column_metadatas_with_different_ts_column,
1943 };
1944 let err = kind.validate(&metadata).unwrap_err();
1945 assert!(
1946 err.to_string()
1947 .contains("timestamp column ts has different id")
1948 );
1949
1950 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1952 let pk_column = column_metadatas_with_different_pk_column
1953 .iter_mut()
1954 .find(|c| c.column_schema.name == "tag_0")
1955 .unwrap();
1956 pk_column.column_id = 100;
1957 let kind = AlterKind::SyncColumns {
1958 column_metadatas: column_metadatas_with_different_pk_column,
1959 };
1960 let err = kind.validate(&metadata).unwrap_err();
1961 assert!(
1962 err.to_string()
1963 .contains("column with same name tag_0 has different id")
1964 );
1965
1966 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1968 column_metadatas_with_new_field_column.push(ColumnMetadata {
1969 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1970 semantic_type: SemanticType::Field,
1971 column_id: 4,
1972 });
1973 let kind = AlterKind::SyncColumns {
1974 column_metadatas: column_metadatas_with_new_field_column,
1975 };
1976 kind.validate(&metadata).unwrap();
1977 }
1978
1979 #[test]
1980 fn test_cast_path_type_to_primitive() {
1981 assert_eq!(PathType::Bare as u8, 0);
1982 assert_eq!(PathType::Data as u8, 1);
1983 assert_eq!(PathType::Metadata as u8, 2);
1984 assert_eq!(
1985 PathType::try_from(PathType::Bare as u8).unwrap(),
1986 PathType::Bare
1987 );
1988 assert_eq!(
1989 PathType::try_from(PathType::Data as u8).unwrap(),
1990 PathType::Data
1991 );
1992 assert_eq!(
1993 PathType::try_from(PathType::Metadata as u8).unwrap(),
1994 PathType::Metadata
1995 );
1996 }
1997}