1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
56 TWCS_TRIGGER_FILE_NUM,
57};
58use crate::path_utils::table_dir;
59use crate::storage::{ColumnId, RegionId, ScanRequest};
60
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
63#[repr(u8)]
64pub enum PathType {
65 Bare,
69 Data,
73 Metadata,
77}
78
79#[derive(Debug, IntoStaticStr)]
80pub enum BatchRegionDdlRequest {
81 Create(Vec<(RegionId, RegionCreateRequest)>),
82 Drop(Vec<(RegionId, RegionDropRequest)>),
83 Alter(Vec<(RegionId, RegionAlterRequest)>),
84}
85
86impl BatchRegionDdlRequest {
87 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
89 match body {
90 region_request::Body::Creates(creates) => {
91 let requests = creates
92 .requests
93 .into_iter()
94 .map(parse_region_create)
95 .collect::<Result<Vec<_>>>()?;
96 Ok(Some(Self::Create(requests)))
97 }
98 region_request::Body::Drops(drops) => {
99 let requests = drops
100 .requests
101 .into_iter()
102 .map(parse_region_drop)
103 .collect::<Result<Vec<_>>>()?;
104 Ok(Some(Self::Drop(requests)))
105 }
106 region_request::Body::Alters(alters) => {
107 let requests = alters
108 .requests
109 .into_iter()
110 .map(parse_region_alter)
111 .collect::<Result<Vec<_>>>()?;
112 Ok(Some(Self::Alter(requests)))
113 }
114 _ => Ok(None),
115 }
116 }
117
118 pub fn request_type(&self) -> &'static str {
119 self.into()
120 }
121
122 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
123 match self {
124 Self::Create(requests) => requests
125 .into_iter()
126 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
127 .collect(),
128 Self::Drop(requests) => requests
129 .into_iter()
130 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
131 .collect(),
132 Self::Alter(requests) => requests
133 .into_iter()
134 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
135 .collect(),
136 }
137 }
138}
139
140#[derive(Debug, IntoStaticStr)]
141pub enum RegionRequest {
142 Put(RegionPutRequest),
143 Delete(RegionDeleteRequest),
144 Create(RegionCreateRequest),
145 Drop(RegionDropRequest),
146 Open(RegionOpenRequest),
147 Close(RegionCloseRequest),
148 Alter(RegionAlterRequest),
149 Flush(RegionFlushRequest),
150 Compact(RegionCompactRequest),
151 BuildIndex(RegionBuildIndexRequest),
152 Truncate(RegionTruncateRequest),
153 Catchup(RegionCatchupRequest),
154 BulkInserts(RegionBulkInsertsRequest),
155 EnterStaging(EnterStagingRequest),
156 ApplyStagingManifest(ApplyStagingManifestRequest),
157}
158
159impl RegionRequest {
160 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
163 match body {
164 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
165 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
166 region_request::Body::Create(create) => make_region_create(create),
167 region_request::Body::Drop(drop) => make_region_drop(drop),
168 region_request::Body::Open(open) => make_region_open(open),
169 region_request::Body::Close(close) => make_region_close(close),
170 region_request::Body::Alter(alter) => make_region_alter(alter),
171 region_request::Body::Flush(flush) => make_region_flush(flush),
172 region_request::Body::Compact(compact) => make_region_compact(compact),
173 region_request::Body::BuildIndex(index) => make_region_build_index(index),
174 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
175 region_request::Body::Creates(creates) => make_region_creates(creates),
176 region_request::Body::Drops(drops) => make_region_drops(drops),
177 region_request::Body::Alters(alters) => make_region_alters(alters),
178 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
179 region_request::Body::Sync(_) => UnexpectedSnafu {
180 reason: "Sync request should be handled separately by RegionServer",
181 }
182 .fail(),
183 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
184 reason: "ListMetadata request should be handled separately by RegionServer",
185 }
186 .fail(),
187 region_request::Body::ApplyStagingManifest(apply) => {
188 make_region_apply_staging_manifest(apply)
189 }
190 }
191 }
192
193 pub fn request_type(&self) -> &'static str {
195 self.into()
196 }
197}
198
199fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
200 let requests = inserts
201 .requests
202 .into_iter()
203 .filter_map(|r| {
204 let region_id = r.region_id.into();
205 r.rows.map(|rows| {
206 (
207 region_id,
208 RegionRequest::Put(RegionPutRequest {
209 rows,
210 hint: None,
211 partition_expr_version: r.partition_expr_version.map(|v| v.value),
212 }),
213 )
214 })
215 })
216 .collect();
217 Ok(requests)
218}
219
220fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
221 let requests = deletes
222 .requests
223 .into_iter()
224 .filter_map(|r| {
225 let region_id = r.region_id.into();
226 r.rows.map(|rows| {
227 (
228 region_id,
229 RegionRequest::Delete(RegionDeleteRequest {
230 rows,
231 hint: None,
232 partition_expr_version: r.partition_expr_version.map(|v| v.value),
233 }),
234 )
235 })
236 })
237 .collect();
238 Ok(requests)
239}
240
241fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
242 let column_metadatas = create
243 .column_defs
244 .into_iter()
245 .map(ColumnMetadata::try_from_column_def)
246 .collect::<Result<Vec<_>>>()?;
247 let region_id = RegionId::from(create.region_id);
248 let table_dir = table_dir(&create.path, region_id.table_id());
249 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
250 Ok((
251 region_id,
252 RegionCreateRequest {
253 engine: create.engine,
254 column_metadatas,
255 primary_key: create.primary_key,
256 options: create.options,
257 table_dir,
258 path_type: PathType::Bare,
259 partition_expr_json,
260 },
261 ))
262}
263
264fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
265 let (region_id, request) = parse_region_create(create)?;
266 Ok(vec![(region_id, RegionRequest::Create(request))])
267}
268
269fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
270 let mut requests = Vec::with_capacity(creates.requests.len());
271 for create in creates.requests {
272 requests.extend(make_region_create(create)?);
273 }
274 Ok(requests)
275}
276
277fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
278 let region_id = drop.region_id.into();
279 Ok((
280 region_id,
281 RegionDropRequest {
282 fast_path: drop.fast_path,
283 force: drop.force,
284 partial_drop: drop.partial_drop,
285 },
286 ))
287}
288
289fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
290 let (region_id, request) = parse_region_drop(drop)?;
291 Ok(vec![(region_id, RegionRequest::Drop(request))])
292}
293
294fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
295 let mut requests = Vec::with_capacity(drops.requests.len());
296 for drop in drops.requests {
297 requests.extend(make_region_drop(drop)?);
298 }
299 Ok(requests)
300}
301
302fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
303 let region_id = RegionId::from(open.region_id);
304 let table_dir = table_dir(&open.path, region_id.table_id());
305 Ok(vec![(
306 region_id,
307 RegionRequest::Open(RegionOpenRequest {
308 engine: open.engine,
309 table_dir,
310 path_type: PathType::Bare,
311 options: open.options,
312 skip_wal_replay: false,
313 checkpoint: None,
314 }),
315 )])
316}
317
318fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
319 let region_id = close.region_id.into();
320 Ok(vec![(
321 region_id,
322 RegionRequest::Close(RegionCloseRequest {}),
323 )])
324}
325
326fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
327 let region_id = alter.region_id.into();
328 let request = RegionAlterRequest::try_from(alter)?;
329 Ok((region_id, request))
330}
331
332fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
333 let (region_id, request) = parse_region_alter(alter)?;
334 Ok(vec![(region_id, RegionRequest::Alter(request))])
335}
336
337fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
338 let mut requests = Vec::with_capacity(alters.requests.len());
339 for alter in alters.requests {
340 requests.extend(make_region_alter(alter)?);
341 }
342 Ok(requests)
343}
344
345fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
346 let region_id = flush.region_id.into();
347 Ok(vec![(
348 region_id,
349 RegionRequest::Flush(RegionFlushRequest {
350 row_group_size: None,
351 }),
352 )])
353}
354
355fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
356 let region_id = compact.region_id.into();
357 let options = compact
358 .options
359 .unwrap_or(compact_request::Options::Regular(Default::default()));
360 let parallelism = if compact.parallelism == 0 {
362 None
363 } else {
364 Some(compact.parallelism)
365 };
366 Ok(vec![(
367 region_id,
368 RegionRequest::Compact(RegionCompactRequest {
369 options,
370 parallelism,
371 }),
372 )])
373}
374
375fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
376 let region_id = index.region_id.into();
377 Ok(vec![(
378 region_id,
379 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
380 )])
381}
382
383fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
384 let region_id = truncate.region_id.into();
385 match truncate.kind {
386 None => InvalidRawRegionRequestSnafu {
387 err: "missing kind in TruncateRequest".to_string(),
388 }
389 .fail(),
390 Some(truncate_request::Kind::All(_)) => Ok(vec![(
391 region_id,
392 RegionRequest::Truncate(RegionTruncateRequest::All),
393 )]),
394 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
395 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
396
397 Ok(vec![(
398 region_id,
399 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
400 )])
401 }
402 }
403}
404
405fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
407 let region_id = request.region_id.into();
408 let partition_expr_version = request.partition_expr_version.map(|v| v.value);
409 let Some(Body::ArrowIpc(request)) = request.body else {
410 return Ok(vec![]);
411 };
412
413 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
414 .with_label_values(&["decode"])
415 .start_timer();
416 let mut decoder =
417 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
418 let payload = decoder
419 .try_decode_record_batch(&request.data_header, &request.payload)
420 .context(FlightCodecSnafu)?;
421 decoder_timer.observe_duration();
422 Ok(vec![(
423 region_id,
424 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
425 region_id,
426 payload,
427 raw_data: request,
428 partition_expr_version,
429 }),
430 )])
431}
432
433fn make_region_apply_staging_manifest(
434 api::v1::region::ApplyStagingManifestRequest {
435 region_id,
436 partition_expr,
437 central_region_id,
438 manifest_path,
439 }: api::v1::region::ApplyStagingManifestRequest,
440) -> Result<Vec<(RegionId, RegionRequest)>> {
441 let region_id = region_id.into();
442 Ok(vec![(
443 region_id,
444 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
445 partition_expr,
446 central_region_id: central_region_id.into(),
447 manifest_path,
448 }),
449 )])
450}
451
452#[derive(Debug)]
454pub struct RegionPutRequest {
455 pub rows: Rows,
457 pub hint: Option<WriteHint>,
459 pub partition_expr_version: Option<u64>,
461}
462
463#[derive(Debug)]
464pub struct RegionReadRequest {
465 pub request: ScanRequest,
466}
467
468#[derive(Debug)]
470pub struct RegionDeleteRequest {
471 pub rows: Rows,
475 pub hint: Option<WriteHint>,
477 pub partition_expr_version: Option<u64>,
479}
480
481#[derive(Debug, Clone)]
482pub struct RegionCreateRequest {
483 pub engine: String,
485 pub column_metadatas: Vec<ColumnMetadata>,
487 pub primary_key: Vec<ColumnId>,
489 pub options: HashMap<String, String>,
491 pub table_dir: String,
493 pub path_type: PathType,
495 pub partition_expr_json: Option<String>,
498}
499
500impl RegionCreateRequest {
501 pub fn validate(&self) -> Result<()> {
503 ensure!(
505 self.column_metadatas
506 .iter()
507 .any(|x| x.semantic_type == SemanticType::Timestamp),
508 InvalidRegionRequestSnafu {
509 region_id: RegionId::new(0, 0),
510 err: "missing timestamp column in create region request".to_string(),
511 }
512 );
513
514 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
516 for (i, c) in self.column_metadatas.iter().enumerate() {
517 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
518 return InvalidRegionRequestSnafu {
519 region_id: RegionId::new(0, 0),
520 err: format!(
521 "duplicate column id {} (at position {} and {}) in create region request",
522 c.column_id, previous, i
523 ),
524 }
525 .fail();
526 }
527 }
528
529 for column_id in &self.primary_key {
531 ensure!(
532 column_id_to_indices.contains_key(column_id),
533 InvalidRegionRequestSnafu {
534 region_id: RegionId::new(0, 0),
535 err: format!(
536 "missing primary key column {} in create region request",
537 column_id
538 ),
539 }
540 );
541 }
542
543 Ok(())
544 }
545
546 pub fn is_physical_table(&self) -> bool {
548 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
549 }
550}
551
552#[derive(Debug, Clone)]
553pub struct RegionDropRequest {
554 pub fast_path: bool,
557
558 pub force: bool,
561
562 pub partial_drop: bool,
565}
566
567#[derive(Debug, Clone)]
569pub struct RegionOpenRequest {
570 pub engine: String,
572 pub table_dir: String,
574 pub path_type: PathType,
576 pub options: HashMap<String, String>,
578 pub skip_wal_replay: bool,
580 pub checkpoint: Option<ReplayCheckpoint>,
582}
583
584#[derive(Debug, Clone, Copy, PartialEq, Eq)]
585pub struct ReplayCheckpoint {
586 pub entry_id: u64,
587 pub metadata_entry_id: Option<u64>,
588}
589
590impl RegionOpenRequest {
591 pub fn is_physical_table(&self) -> bool {
593 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
594 }
595}
596
597#[derive(Debug)]
599pub struct RegionCloseRequest {}
600
601#[derive(Debug, PartialEq, Eq, Clone)]
603pub struct RegionAlterRequest {
604 pub kind: AlterKind,
606}
607
608impl RegionAlterRequest {
609 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
611 self.kind.validate(metadata)?;
612
613 Ok(())
614 }
615
616 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
620 debug_assert!(self.validate(metadata).is_ok());
621 self.kind.need_alter(metadata)
622 }
623}
624
625impl TryFrom<AlterRequest> for RegionAlterRequest {
626 type Error = MetadataError;
627
628 fn try_from(value: AlterRequest) -> Result<Self> {
629 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
630 err: "missing kind in AlterRequest",
631 })?;
632
633 let kind = AlterKind::try_from(kind)?;
634 Ok(RegionAlterRequest { kind })
635 }
636}
637
638#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
640pub enum AlterKind {
641 AddColumns {
643 columns: Vec<AddColumn>,
645 },
646 DropColumns {
648 names: Vec<String>,
650 },
651 ModifyColumnTypes {
653 columns: Vec<ModifyColumnType>,
655 },
656 SetRegionOptions { options: Vec<SetRegionOption> },
658 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
660 SetIndexes { options: Vec<SetIndexOption> },
662 UnsetIndexes { options: Vec<UnsetIndexOption> },
664 DropDefaults {
666 names: Vec<String>,
668 },
669 SetDefaults {
671 columns: Vec<SetDefault>,
673 },
674 SyncColumns {
676 column_metadatas: Vec<ColumnMetadata>,
677 },
678}
679#[derive(Debug, PartialEq, Eq, Clone)]
680pub struct SetDefault {
681 pub name: String,
682 pub default_constraint: Vec<u8>,
683}
684
685#[derive(Debug, PartialEq, Eq, Clone)]
686pub enum SetIndexOption {
687 Fulltext {
688 column_name: String,
689 options: FulltextOptions,
690 },
691 Inverted {
692 column_name: String,
693 },
694 Skipping {
695 column_name: String,
696 options: SkippingIndexOptions,
697 },
698}
699
700impl SetIndexOption {
701 pub fn column_name(&self) -> &String {
703 match self {
704 SetIndexOption::Fulltext { column_name, .. } => column_name,
705 SetIndexOption::Inverted { column_name } => column_name,
706 SetIndexOption::Skipping { column_name, .. } => column_name,
707 }
708 }
709
710 pub fn is_fulltext(&self) -> bool {
712 match self {
713 SetIndexOption::Fulltext { .. } => true,
714 SetIndexOption::Inverted { .. } => false,
715 SetIndexOption::Skipping { .. } => false,
716 }
717 }
718}
719
720impl TryFrom<v1::SetIndex> for SetIndexOption {
721 type Error = MetadataError;
722
723 fn try_from(value: v1::SetIndex) -> Result<Self> {
724 let option = value.options.context(InvalidRawRegionRequestSnafu {
725 err: "missing options in SetIndex",
726 })?;
727
728 let opt = match option {
729 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
730 column_name: x.column_name.clone(),
731 options: FulltextOptions::new(
732 x.enable,
733 as_fulltext_option_analyzer(
734 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
735 ),
736 x.case_sensitive,
737 as_fulltext_option_backend(
738 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
739 ),
740 x.granularity as u32,
741 x.false_positive_rate,
742 )
743 .context(InvalidIndexOptionSnafu)?,
744 },
745 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
746 column_name: i.column_name,
747 },
748 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
749 column_name: s.column_name,
750 options: SkippingIndexOptions::new(
751 s.granularity as u32,
752 s.false_positive_rate,
753 as_skipping_index_type(
754 PbSkippingIndexType::try_from(s.skipping_index_type)
755 .context(DecodeProtoSnafu)?,
756 ),
757 )
758 .context(InvalidIndexOptionSnafu)?,
759 },
760 };
761
762 Ok(opt)
763 }
764}
765
766#[derive(Debug, PartialEq, Eq, Clone)]
767pub enum UnsetIndexOption {
768 Fulltext { column_name: String },
769 Inverted { column_name: String },
770 Skipping { column_name: String },
771}
772
773impl UnsetIndexOption {
774 pub fn column_name(&self) -> &String {
775 match self {
776 UnsetIndexOption::Fulltext { column_name } => column_name,
777 UnsetIndexOption::Inverted { column_name } => column_name,
778 UnsetIndexOption::Skipping { column_name } => column_name,
779 }
780 }
781
782 pub fn is_fulltext(&self) -> bool {
783 match self {
784 UnsetIndexOption::Fulltext { .. } => true,
785 UnsetIndexOption::Inverted { .. } => false,
786 UnsetIndexOption::Skipping { .. } => false,
787 }
788 }
789}
790
791impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
792 type Error = MetadataError;
793
794 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
795 let option = value.options.context(InvalidRawRegionRequestSnafu {
796 err: "missing options in UnsetIndex",
797 })?;
798
799 let opt = match option {
800 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
801 column_name: f.column_name,
802 },
803 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
804 column_name: i.column_name,
805 },
806 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
807 column_name: s.column_name,
808 },
809 };
810
811 Ok(opt)
812 }
813}
814
815impl AlterKind {
816 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
820 match self {
821 AlterKind::AddColumns { columns } => {
822 for col_to_add in columns {
823 col_to_add.validate(metadata)?;
824 }
825 }
826 AlterKind::DropColumns { names } => {
827 for name in names {
828 Self::validate_column_to_drop(name, metadata)?;
829 }
830 }
831 AlterKind::ModifyColumnTypes { columns } => {
832 for col_to_change in columns {
833 col_to_change.validate(metadata)?;
834 }
835 }
836 AlterKind::SetRegionOptions { .. } => {}
837 AlterKind::UnsetRegionOptions { .. } => {}
838 AlterKind::SetIndexes { options } => {
839 for option in options {
840 Self::validate_column_alter_index_option(
841 option.column_name(),
842 metadata,
843 option.is_fulltext(),
844 )?;
845 }
846 }
847 AlterKind::UnsetIndexes { options } => {
848 for option in options {
849 Self::validate_column_alter_index_option(
850 option.column_name(),
851 metadata,
852 option.is_fulltext(),
853 )?;
854 }
855 }
856 AlterKind::DropDefaults { names } => {
857 names
858 .iter()
859 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
860 }
861 AlterKind::SetDefaults { columns } => {
862 columns
863 .iter()
864 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
865 }
866 AlterKind::SyncColumns { column_metadatas } => {
867 let new_primary_keys = column_metadatas
868 .iter()
869 .filter(|c| c.semantic_type == SemanticType::Tag)
870 .map(|c| (c.column_schema.name.as_str(), c.column_id))
871 .collect::<HashMap<_, _>>();
872
873 let old_primary_keys = metadata
874 .column_metadatas
875 .iter()
876 .filter(|c| c.semantic_type == SemanticType::Tag)
877 .map(|c| (c.column_schema.name.as_str(), c.column_id));
878
879 for (name, id) in old_primary_keys {
880 let primary_key =
881 new_primary_keys
882 .get(name)
883 .with_context(|| InvalidRegionRequestSnafu {
884 region_id: metadata.region_id,
885 err: format!("column {} is not a primary key", name),
886 })?;
887
888 ensure!(
889 *primary_key == id,
890 InvalidRegionRequestSnafu {
891 region_id: metadata.region_id,
892 err: format!(
893 "column with same name {} has different id, existing: {}, got: {}",
894 name, id, primary_key
895 ),
896 }
897 );
898 }
899
900 let new_ts_column = column_metadatas
901 .iter()
902 .find(|c| c.semantic_type == SemanticType::Timestamp)
903 .map(|c| (c.column_schema.name.as_str(), c.column_id))
904 .context(InvalidRegionRequestSnafu {
905 region_id: metadata.region_id,
906 err: "timestamp column not found",
907 })?;
908
909 let old_ts_column = metadata
911 .column_metadatas
912 .iter()
913 .find(|c| c.semantic_type == SemanticType::Timestamp)
914 .map(|c| (c.column_schema.name.as_str(), c.column_id))
915 .unwrap();
916
917 ensure!(
918 new_ts_column == old_ts_column,
919 InvalidRegionRequestSnafu {
920 region_id: metadata.region_id,
921 err: format!(
922 "timestamp column {} has different id, existing: {}, got: {}",
923 old_ts_column.0, old_ts_column.1, new_ts_column.1
924 ),
925 }
926 );
927 }
928 }
929 Ok(())
930 }
931
932 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
934 debug_assert!(self.validate(metadata).is_ok());
935 match self {
936 AlterKind::AddColumns { columns } => columns
937 .iter()
938 .any(|col_to_add| col_to_add.need_alter(metadata)),
939 AlterKind::DropColumns { names } => names
940 .iter()
941 .any(|name| metadata.column_by_name(name).is_some()),
942 AlterKind::ModifyColumnTypes { columns } => columns
943 .iter()
944 .any(|col_to_change| col_to_change.need_alter(metadata)),
945 AlterKind::SetRegionOptions { .. } => true,
946 AlterKind::UnsetRegionOptions { .. } => true,
947 AlterKind::SetIndexes { options, .. } => options
948 .iter()
949 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
950 AlterKind::UnsetIndexes { options } => options
951 .iter()
952 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
953 AlterKind::DropDefaults { names } => names
954 .iter()
955 .any(|name| metadata.column_by_name(name).is_some()),
956
957 AlterKind::SetDefaults { columns } => columns
958 .iter()
959 .any(|x| metadata.column_by_name(&x.name).is_some()),
960 AlterKind::SyncColumns { column_metadatas } => {
961 metadata.column_metadatas != *column_metadatas
962 }
963 }
964 }
965
966 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
968 let Some(column) = metadata.column_by_name(name) else {
969 return Ok(());
970 };
971 ensure!(
972 column.semantic_type == SemanticType::Field,
973 InvalidRegionRequestSnafu {
974 region_id: metadata.region_id,
975 err: format!("column {} is not a field and could not be dropped", name),
976 }
977 );
978 Ok(())
979 }
980
981 fn validate_column_alter_index_option(
983 column_name: &String,
984 metadata: &RegionMetadata,
985 is_fulltext: bool,
986 ) -> Result<()> {
987 let column = metadata
988 .column_by_name(column_name)
989 .context(InvalidRegionRequestSnafu {
990 region_id: metadata.region_id,
991 err: format!("column {} not found", column_name),
992 })?;
993
994 if is_fulltext {
995 ensure!(
996 column.column_schema.data_type.is_string(),
997 InvalidRegionRequestSnafu {
998 region_id: metadata.region_id,
999 err: format!(
1000 "cannot change alter index options for non-string column {}",
1001 column_name
1002 ),
1003 }
1004 );
1005 }
1006
1007 Ok(())
1008 }
1009
1010 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1012 metadata
1013 .column_by_name(column_name)
1014 .context(InvalidRegionRequestSnafu {
1015 region_id: metadata.region_id,
1016 err: format!("column {} not found", column_name),
1017 })?;
1018
1019 Ok(())
1020 }
1021}
1022
1023impl TryFrom<alter_request::Kind> for AlterKind {
1024 type Error = MetadataError;
1025
1026 fn try_from(kind: alter_request::Kind) -> Result<Self> {
1027 let alter_kind = match kind {
1028 alter_request::Kind::AddColumns(x) => {
1029 let columns = x
1030 .add_columns
1031 .into_iter()
1032 .map(|x| x.try_into())
1033 .collect::<Result<Vec<_>>>()?;
1034 AlterKind::AddColumns { columns }
1035 }
1036 alter_request::Kind::ModifyColumnTypes(x) => {
1037 let columns = x
1038 .modify_column_types
1039 .into_iter()
1040 .map(|x| x.into())
1041 .collect::<Vec<_>>();
1042 AlterKind::ModifyColumnTypes { columns }
1043 }
1044 alter_request::Kind::DropColumns(x) => {
1045 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1046 AlterKind::DropColumns { names }
1047 }
1048 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1049 options: options
1050 .table_options
1051 .iter()
1052 .map(TryFrom::try_from)
1053 .collect::<Result<Vec<_>>>()?,
1054 },
1055 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1056 keys: options
1057 .keys
1058 .iter()
1059 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1060 .collect::<Result<Vec<_>>>()?,
1061 },
1062 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1063 options: vec![SetIndexOption::try_from(o)?],
1064 },
1065 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1066 options: vec![UnsetIndexOption::try_from(o)?],
1067 },
1068 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1069 options: o
1070 .set_indexes
1071 .into_iter()
1072 .map(SetIndexOption::try_from)
1073 .collect::<Result<Vec<_>>>()?,
1074 },
1075 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1076 options: o
1077 .unset_indexes
1078 .into_iter()
1079 .map(UnsetIndexOption::try_from)
1080 .collect::<Result<Vec<_>>>()?,
1081 },
1082 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1083 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1084 },
1085 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1086 columns: x
1087 .set_defaults
1088 .into_iter()
1089 .map(|x| {
1090 Ok(SetDefault {
1091 name: x.column_name,
1092 default_constraint: x.default_constraint.clone(),
1093 })
1094 })
1095 .collect::<Result<Vec<_>>>()?,
1096 },
1097 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1098 column_metadatas: x
1099 .column_defs
1100 .into_iter()
1101 .map(ColumnMetadata::try_from_column_def)
1102 .collect::<Result<Vec<_>>>()?,
1103 },
1104 };
1105
1106 Ok(alter_kind)
1107 }
1108}
1109
1110#[derive(Debug, PartialEq, Eq, Clone)]
1112pub struct AddColumn {
1113 pub column_metadata: ColumnMetadata,
1115 pub location: Option<AddColumnLocation>,
1118}
1119
1120impl AddColumn {
1121 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1126 ensure!(
1127 self.column_metadata.column_schema.is_nullable()
1128 || self
1129 .column_metadata
1130 .column_schema
1131 .default_constraint()
1132 .is_some(),
1133 InvalidRegionRequestSnafu {
1134 region_id: metadata.region_id,
1135 err: format!(
1136 "no default value for column {}",
1137 self.column_metadata.column_schema.name
1138 ),
1139 }
1140 );
1141
1142 if let Some(existing_column) =
1143 metadata.column_by_name(&self.column_metadata.column_schema.name)
1144 {
1145 ensure!(
1147 *existing_column == self.column_metadata,
1148 InvalidRegionRequestSnafu {
1149 region_id: metadata.region_id,
1150 err: format!(
1151 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1152 self.column_metadata.column_schema.name,
1153 existing_column,
1154 self.column_metadata,
1155 ),
1156 }
1157 );
1158 ensure!(
1159 self.location.is_none(),
1160 InvalidRegionRequestSnafu {
1161 region_id: metadata.region_id,
1162 err: format!(
1163 "column {} already exists, but location is specified",
1164 self.column_metadata.column_schema.name
1165 ),
1166 }
1167 );
1168 }
1169
1170 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1171 ensure!(
1173 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1174 InvalidRegionRequestSnafu {
1175 region_id: metadata.region_id,
1176 err: format!(
1177 "column id {} already exists with different name {}",
1178 self.column_metadata.column_id, existing_column.column_schema.name
1179 ),
1180 }
1181 );
1182 }
1183
1184 Ok(())
1185 }
1186
1187 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1189 debug_assert!(self.validate(metadata).is_ok());
1190 metadata
1191 .column_by_name(&self.column_metadata.column_schema.name)
1192 .is_none()
1193 }
1194}
1195
1196impl TryFrom<v1::region::AddColumn> for AddColumn {
1197 type Error = MetadataError;
1198
1199 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1200 let column_def = add_column
1201 .column_def
1202 .context(InvalidRawRegionRequestSnafu {
1203 err: "missing column_def in AddColumn",
1204 })?;
1205
1206 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1207 let location = add_column
1208 .location
1209 .map(AddColumnLocation::try_from)
1210 .transpose()?;
1211
1212 Ok(AddColumn {
1213 column_metadata,
1214 location,
1215 })
1216 }
1217}
1218
1219#[derive(Debug, PartialEq, Eq, Clone)]
1221pub enum AddColumnLocation {
1222 First,
1224 After {
1226 column_name: String,
1228 },
1229}
1230
1231impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1232 type Error = MetadataError;
1233
1234 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1235 let location_type = LocationType::try_from(location.location_type)
1236 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1237 let add_column_location = match location_type {
1238 LocationType::First => AddColumnLocation::First,
1239 LocationType::After => AddColumnLocation::After {
1240 column_name: location.after_column_name,
1241 },
1242 };
1243
1244 Ok(add_column_location)
1245 }
1246}
1247
1248#[derive(Debug, PartialEq, Eq, Clone)]
1250pub struct ModifyColumnType {
1251 pub column_name: String,
1253 pub target_type: ConcreteDataType,
1255}
1256
1257impl ModifyColumnType {
1258 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1260 let column_meta = metadata
1261 .column_by_name(&self.column_name)
1262 .with_context(|| InvalidRegionRequestSnafu {
1263 region_id: metadata.region_id,
1264 err: format!("column {} not found", self.column_name),
1265 })?;
1266
1267 ensure!(
1268 matches!(column_meta.semantic_type, SemanticType::Field),
1269 InvalidRegionRequestSnafu {
1270 region_id: metadata.region_id,
1271 err: "'timestamp' or 'tag' column cannot change type".to_string()
1272 }
1273 );
1274 ensure!(
1275 column_meta
1276 .column_schema
1277 .data_type
1278 .can_arrow_type_cast_to(&self.target_type),
1279 InvalidRegionRequestSnafu {
1280 region_id: metadata.region_id,
1281 err: format!(
1282 "column '{}' cannot be cast automatically to type '{}'",
1283 self.column_name, self.target_type
1284 ),
1285 }
1286 );
1287
1288 Ok(())
1289 }
1290
1291 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1293 debug_assert!(self.validate(metadata).is_ok());
1294 metadata.column_by_name(&self.column_name).is_some()
1295 }
1296}
1297
1298impl From<v1::ModifyColumnType> for ModifyColumnType {
1299 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1300 let target_type = ColumnDataTypeWrapper::new(
1301 modify_column_type.target_type(),
1302 modify_column_type.target_type_extension,
1303 )
1304 .into();
1305
1306 ModifyColumnType {
1307 column_name: modify_column_type.column_name,
1308 target_type,
1309 }
1310 }
1311}
1312
1313#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1314pub enum SetRegionOption {
1315 Ttl(Option<TimeToLive>),
1316 Twsc(String, String),
1318 Format(String),
1320 AppendMode(bool),
1322}
1323
1324impl TryFrom<&PbOption> for SetRegionOption {
1325 type Error = MetadataError;
1326
1327 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1328 let PbOption { key, value } = value;
1329 match key.as_str() {
1330 TTL_KEY => {
1331 let ttl = TimeToLive::from_humantime_or_str(value)
1332 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1333
1334 Ok(Self::Ttl(Some(ttl)))
1335 }
1336 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1337 Ok(Self::Twsc(key.clone(), value.clone()))
1338 }
1339 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1340 APPEND_MODE_KEY => {
1341 let append_mode = value
1342 .parse::<bool>()
1343 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1344 Ok(Self::AppendMode(append_mode))
1345 }
1346 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1347 }
1348 }
1349}
1350
1351impl From<&UnsetRegionOption> for SetRegionOption {
1352 fn from(unset_option: &UnsetRegionOption) -> Self {
1353 match unset_option {
1354 UnsetRegionOption::TwcsTriggerFileNum => {
1355 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1356 }
1357 UnsetRegionOption::TwcsMaxOutputFileSize => {
1358 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1359 }
1360 UnsetRegionOption::TwcsTimeWindow => {
1361 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1362 }
1363 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1364 }
1365 }
1366}
1367
1368impl TryFrom<&str> for UnsetRegionOption {
1369 type Error = MetadataError;
1370
1371 fn try_from(key: &str) -> Result<Self> {
1372 match key.to_ascii_lowercase().as_str() {
1373 TTL_KEY => Ok(Self::Ttl),
1374 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1375 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1376 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1377 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1378 }
1379 }
1380}
1381
1382#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1383pub enum UnsetRegionOption {
1384 TwcsTriggerFileNum,
1385 TwcsMaxOutputFileSize,
1386 TwcsTimeWindow,
1387 Ttl,
1388}
1389
1390impl UnsetRegionOption {
1391 pub fn as_str(&self) -> &str {
1392 match self {
1393 Self::Ttl => TTL_KEY,
1394 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1395 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1396 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1397 }
1398 }
1399}
1400
1401impl Display for UnsetRegionOption {
1402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1403 write!(f, "{}", self.as_str())
1404 }
1405}
1406
1407#[derive(Debug, Clone, Default)]
1408pub struct RegionFlushRequest {
1409 pub row_group_size: Option<usize>,
1410}
1411
1412#[derive(Debug)]
1413pub struct RegionCompactRequest {
1414 pub options: compact_request::Options,
1415 pub parallelism: Option<u32>,
1416}
1417
1418impl Default for RegionCompactRequest {
1419 fn default() -> Self {
1420 Self {
1421 options: compact_request::Options::Regular(Default::default()),
1423 parallelism: None,
1424 }
1425 }
1426}
1427
1428#[derive(Debug, Clone, Default)]
1429pub struct RegionBuildIndexRequest {}
1430
1431#[derive(Debug)]
1433pub enum RegionTruncateRequest {
1434 All,
1436 ByTimeRanges {
1437 time_ranges: Vec<(Timestamp, Timestamp)>,
1441 },
1442}
1443
1444#[derive(Debug, Clone, Copy, Default)]
1449pub struct RegionCatchupRequest {
1450 pub set_writable: bool,
1452 pub entry_id: Option<entry::Id>,
1455 pub metadata_entry_id: Option<entry::Id>,
1459 pub location_id: Option<u64>,
1461 pub checkpoint: Option<ReplayCheckpoint>,
1463}
1464
1465#[derive(Debug, Clone)]
1466pub struct RegionBulkInsertsRequest {
1467 pub region_id: RegionId,
1468 pub payload: DfRecordBatch,
1469 pub raw_data: ArrowIpc,
1470 pub partition_expr_version: Option<u64>,
1471}
1472
1473impl RegionBulkInsertsRequest {
1474 pub fn estimated_size(&self) -> usize {
1475 self.payload.get_array_memory_size()
1476 }
1477}
1478
1479#[derive(Debug, Clone, PartialEq, Eq)]
1485pub enum StagingPartitionDirective {
1486 UpdatePartitionExpr(String),
1487 RejectAllWrites,
1488}
1489
1490impl StagingPartitionDirective {
1491 pub fn partition_expr(&self) -> Option<&str> {
1493 match self {
1494 Self::UpdatePartitionExpr(expr) => Some(expr),
1495 Self::RejectAllWrites => None,
1496 }
1497 }
1498}
1499
1500#[derive(Debug, Clone)]
1501pub struct EnterStagingRequest {
1502 pub partition_directive: StagingPartitionDirective,
1504}
1505
1506impl EnterStagingRequest {
1507 pub fn with_partition_expr(partition_expr: String) -> Self {
1509 Self {
1510 partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr),
1511 }
1512 }
1513}
1514
1515#[derive(Debug, Clone)]
1534pub struct ApplyStagingManifestRequest {
1535 pub partition_expr: String,
1537 pub central_region_id: RegionId,
1539 pub manifest_path: String,
1542}
1543
1544impl fmt::Display for RegionRequest {
1545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1546 match self {
1547 RegionRequest::Put(_) => write!(f, "Put"),
1548 RegionRequest::Delete(_) => write!(f, "Delete"),
1549 RegionRequest::Create(_) => write!(f, "Create"),
1550 RegionRequest::Drop(_) => write!(f, "Drop"),
1551 RegionRequest::Open(_) => write!(f, "Open"),
1552 RegionRequest::Close(_) => write!(f, "Close"),
1553 RegionRequest::Alter(_) => write!(f, "Alter"),
1554 RegionRequest::Flush(_) => write!(f, "Flush"),
1555 RegionRequest::Compact(_) => write!(f, "Compact"),
1556 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1557 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1558 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1559 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1560 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1561 RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1562 }
1563 }
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568
1569 use api::v1::region::RegionColumnDef;
1570 use api::v1::{ColumnDataType, ColumnDef};
1571 use datatypes::prelude::ConcreteDataType;
1572 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1573
1574 use super::*;
1575 use crate::metadata::RegionMetadataBuilder;
1576
1577 #[test]
1578 fn test_from_proto_location() {
1579 let proto_location = v1::AddColumnLocation {
1580 location_type: LocationType::First as i32,
1581 after_column_name: String::default(),
1582 };
1583 let location = AddColumnLocation::try_from(proto_location).unwrap();
1584 assert_eq!(location, AddColumnLocation::First);
1585
1586 let proto_location = v1::AddColumnLocation {
1587 location_type: 10,
1588 after_column_name: String::default(),
1589 };
1590 AddColumnLocation::try_from(proto_location).unwrap_err();
1591
1592 let proto_location = v1::AddColumnLocation {
1593 location_type: LocationType::After as i32,
1594 after_column_name: "a".to_string(),
1595 };
1596 let location = AddColumnLocation::try_from(proto_location).unwrap();
1597 assert_eq!(
1598 location,
1599 AddColumnLocation::After {
1600 column_name: "a".to_string()
1601 }
1602 );
1603 }
1604
1605 #[test]
1606 fn test_from_none_proto_add_column() {
1607 AddColumn::try_from(v1::region::AddColumn {
1608 column_def: None,
1609 location: None,
1610 })
1611 .unwrap_err();
1612 }
1613
1614 #[test]
1615 fn test_from_proto_alter_request() {
1616 RegionAlterRequest::try_from(AlterRequest {
1617 region_id: 0,
1618 schema_version: 1,
1619 kind: None,
1620 })
1621 .unwrap_err();
1622
1623 let request = RegionAlterRequest::try_from(AlterRequest {
1624 region_id: 0,
1625 schema_version: 1,
1626 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1627 add_columns: vec![v1::region::AddColumn {
1628 column_def: Some(RegionColumnDef {
1629 column_def: Some(ColumnDef {
1630 name: "a".to_string(),
1631 data_type: ColumnDataType::String as i32,
1632 is_nullable: true,
1633 default_constraint: vec![],
1634 semantic_type: SemanticType::Field as i32,
1635 comment: String::new(),
1636 ..Default::default()
1637 }),
1638 column_id: 1,
1639 }),
1640 location: Some(v1::AddColumnLocation {
1641 location_type: LocationType::First as i32,
1642 after_column_name: String::default(),
1643 }),
1644 }],
1645 })),
1646 })
1647 .unwrap();
1648
1649 assert_eq!(
1650 request,
1651 RegionAlterRequest {
1652 kind: AlterKind::AddColumns {
1653 columns: vec![AddColumn {
1654 column_metadata: ColumnMetadata {
1655 column_schema: ColumnSchema::new(
1656 "a",
1657 ConcreteDataType::string_datatype(),
1658 true,
1659 ),
1660 semantic_type: SemanticType::Field,
1661 column_id: 1,
1662 },
1663 location: Some(AddColumnLocation::First),
1664 }]
1665 },
1666 }
1667 );
1668 }
1669
1670 fn new_metadata() -> RegionMetadata {
1673 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1674 builder
1675 .push_column_metadata(ColumnMetadata {
1676 column_schema: ColumnSchema::new(
1677 "ts",
1678 ConcreteDataType::timestamp_millisecond_datatype(),
1679 false,
1680 ),
1681 semantic_type: SemanticType::Timestamp,
1682 column_id: 1,
1683 })
1684 .push_column_metadata(ColumnMetadata {
1685 column_schema: ColumnSchema::new(
1686 "tag_0",
1687 ConcreteDataType::string_datatype(),
1688 true,
1689 ),
1690 semantic_type: SemanticType::Tag,
1691 column_id: 2,
1692 })
1693 .push_column_metadata(ColumnMetadata {
1694 column_schema: ColumnSchema::new(
1695 "field_0",
1696 ConcreteDataType::string_datatype(),
1697 true,
1698 ),
1699 semantic_type: SemanticType::Field,
1700 column_id: 3,
1701 })
1702 .push_column_metadata(ColumnMetadata {
1703 column_schema: ColumnSchema::new(
1704 "field_1",
1705 ConcreteDataType::boolean_datatype(),
1706 true,
1707 ),
1708 semantic_type: SemanticType::Field,
1709 column_id: 4,
1710 })
1711 .primary_key(vec![2]);
1712 builder.build().unwrap()
1713 }
1714
1715 #[test]
1716 fn test_add_column_validate() {
1717 let metadata = new_metadata();
1718 let add_column = AddColumn {
1719 column_metadata: ColumnMetadata {
1720 column_schema: ColumnSchema::new(
1721 "tag_1",
1722 ConcreteDataType::string_datatype(),
1723 true,
1724 ),
1725 semantic_type: SemanticType::Tag,
1726 column_id: 5,
1727 },
1728 location: None,
1729 };
1730 add_column.validate(&metadata).unwrap();
1731 assert!(add_column.need_alter(&metadata));
1732
1733 AddColumn {
1735 column_metadata: ColumnMetadata {
1736 column_schema: ColumnSchema::new(
1737 "tag_1",
1738 ConcreteDataType::string_datatype(),
1739 false,
1740 ),
1741 semantic_type: SemanticType::Tag,
1742 column_id: 5,
1743 },
1744 location: None,
1745 }
1746 .validate(&metadata)
1747 .unwrap_err();
1748
1749 let add_column = AddColumn {
1751 column_metadata: ColumnMetadata {
1752 column_schema: ColumnSchema::new(
1753 "tag_0",
1754 ConcreteDataType::string_datatype(),
1755 true,
1756 ),
1757 semantic_type: SemanticType::Tag,
1758 column_id: 2,
1759 },
1760 location: None,
1761 };
1762 add_column.validate(&metadata).unwrap();
1763 assert!(!add_column.need_alter(&metadata));
1764 }
1765
1766 #[test]
1767 fn test_add_duplicate_columns() {
1768 let kind = AlterKind::AddColumns {
1769 columns: vec![
1770 AddColumn {
1771 column_metadata: ColumnMetadata {
1772 column_schema: ColumnSchema::new(
1773 "tag_1",
1774 ConcreteDataType::string_datatype(),
1775 true,
1776 ),
1777 semantic_type: SemanticType::Tag,
1778 column_id: 5,
1779 },
1780 location: None,
1781 },
1782 AddColumn {
1783 column_metadata: ColumnMetadata {
1784 column_schema: ColumnSchema::new(
1785 "tag_1",
1786 ConcreteDataType::string_datatype(),
1787 true,
1788 ),
1789 semantic_type: SemanticType::Field,
1790 column_id: 6,
1791 },
1792 location: None,
1793 },
1794 ],
1795 };
1796 let metadata = new_metadata();
1797 kind.validate(&metadata).unwrap();
1798 assert!(kind.need_alter(&metadata));
1799 }
1800
1801 #[test]
1802 fn test_add_existing_column_different_metadata() {
1803 let metadata = new_metadata();
1804
1805 let kind = AlterKind::AddColumns {
1807 columns: vec![AddColumn {
1808 column_metadata: ColumnMetadata {
1809 column_schema: ColumnSchema::new(
1810 "tag_0",
1811 ConcreteDataType::string_datatype(),
1812 true,
1813 ),
1814 semantic_type: SemanticType::Tag,
1815 column_id: 4,
1816 },
1817 location: None,
1818 }],
1819 };
1820 kind.validate(&metadata).unwrap_err();
1821
1822 let kind = AlterKind::AddColumns {
1824 columns: vec![AddColumn {
1825 column_metadata: ColumnMetadata {
1826 column_schema: ColumnSchema::new(
1827 "tag_0",
1828 ConcreteDataType::int64_datatype(),
1829 true,
1830 ),
1831 semantic_type: SemanticType::Tag,
1832 column_id: 2,
1833 },
1834 location: None,
1835 }],
1836 };
1837 kind.validate(&metadata).unwrap_err();
1838
1839 let kind = AlterKind::AddColumns {
1841 columns: vec![AddColumn {
1842 column_metadata: ColumnMetadata {
1843 column_schema: ColumnSchema::new(
1844 "tag_1",
1845 ConcreteDataType::string_datatype(),
1846 true,
1847 ),
1848 semantic_type: SemanticType::Tag,
1849 column_id: 2,
1850 },
1851 location: None,
1852 }],
1853 };
1854 kind.validate(&metadata).unwrap_err();
1855 }
1856
1857 #[test]
1858 fn test_add_existing_column_with_location() {
1859 let metadata = new_metadata();
1860 let kind = AlterKind::AddColumns {
1861 columns: vec![AddColumn {
1862 column_metadata: ColumnMetadata {
1863 column_schema: ColumnSchema::new(
1864 "tag_0",
1865 ConcreteDataType::string_datatype(),
1866 true,
1867 ),
1868 semantic_type: SemanticType::Tag,
1869 column_id: 2,
1870 },
1871 location: Some(AddColumnLocation::First),
1872 }],
1873 };
1874 kind.validate(&metadata).unwrap_err();
1875 }
1876
1877 #[test]
1878 fn test_validate_drop_column() {
1879 let metadata = new_metadata();
1880 let kind = AlterKind::DropColumns {
1881 names: vec!["xxxx".to_string()],
1882 };
1883 kind.validate(&metadata).unwrap();
1884 assert!(!kind.need_alter(&metadata));
1885
1886 AlterKind::DropColumns {
1887 names: vec!["tag_0".to_string()],
1888 }
1889 .validate(&metadata)
1890 .unwrap_err();
1891
1892 let kind = AlterKind::DropColumns {
1893 names: vec!["field_0".to_string()],
1894 };
1895 kind.validate(&metadata).unwrap();
1896 assert!(kind.need_alter(&metadata));
1897 }
1898
1899 #[test]
1900 fn test_validate_modify_column_type() {
1901 let metadata = new_metadata();
1902 AlterKind::ModifyColumnTypes {
1903 columns: vec![ModifyColumnType {
1904 column_name: "xxxx".to_string(),
1905 target_type: ConcreteDataType::string_datatype(),
1906 }],
1907 }
1908 .validate(&metadata)
1909 .unwrap_err();
1910
1911 AlterKind::ModifyColumnTypes {
1912 columns: vec![ModifyColumnType {
1913 column_name: "field_1".to_string(),
1914 target_type: ConcreteDataType::date_datatype(),
1915 }],
1916 }
1917 .validate(&metadata)
1918 .unwrap_err();
1919
1920 AlterKind::ModifyColumnTypes {
1921 columns: vec![ModifyColumnType {
1922 column_name: "ts".to_string(),
1923 target_type: ConcreteDataType::date_datatype(),
1924 }],
1925 }
1926 .validate(&metadata)
1927 .unwrap_err();
1928
1929 AlterKind::ModifyColumnTypes {
1930 columns: vec![ModifyColumnType {
1931 column_name: "tag_0".to_string(),
1932 target_type: ConcreteDataType::date_datatype(),
1933 }],
1934 }
1935 .validate(&metadata)
1936 .unwrap_err();
1937
1938 let kind = AlterKind::ModifyColumnTypes {
1939 columns: vec![ModifyColumnType {
1940 column_name: "field_0".to_string(),
1941 target_type: ConcreteDataType::int32_datatype(),
1942 }],
1943 };
1944 kind.validate(&metadata).unwrap();
1945 assert!(kind.need_alter(&metadata));
1946 }
1947
1948 #[test]
1949 fn test_validate_add_columns() {
1950 let kind = AlterKind::AddColumns {
1951 columns: vec![
1952 AddColumn {
1953 column_metadata: ColumnMetadata {
1954 column_schema: ColumnSchema::new(
1955 "tag_1",
1956 ConcreteDataType::string_datatype(),
1957 true,
1958 ),
1959 semantic_type: SemanticType::Tag,
1960 column_id: 5,
1961 },
1962 location: None,
1963 },
1964 AddColumn {
1965 column_metadata: ColumnMetadata {
1966 column_schema: ColumnSchema::new(
1967 "field_2",
1968 ConcreteDataType::string_datatype(),
1969 true,
1970 ),
1971 semantic_type: SemanticType::Field,
1972 column_id: 6,
1973 },
1974 location: None,
1975 },
1976 ],
1977 };
1978 let request = RegionAlterRequest { kind };
1979 let mut metadata = new_metadata();
1980 metadata.schema_version = 1;
1981 request.validate(&metadata).unwrap();
1982 }
1983
1984 #[test]
1985 fn test_validate_create_region() {
1986 let column_metadatas = vec![
1987 ColumnMetadata {
1988 column_schema: ColumnSchema::new(
1989 "ts",
1990 ConcreteDataType::timestamp_millisecond_datatype(),
1991 false,
1992 ),
1993 semantic_type: SemanticType::Timestamp,
1994 column_id: 1,
1995 },
1996 ColumnMetadata {
1997 column_schema: ColumnSchema::new(
1998 "tag_0",
1999 ConcreteDataType::string_datatype(),
2000 true,
2001 ),
2002 semantic_type: SemanticType::Tag,
2003 column_id: 2,
2004 },
2005 ColumnMetadata {
2006 column_schema: ColumnSchema::new(
2007 "field_0",
2008 ConcreteDataType::string_datatype(),
2009 true,
2010 ),
2011 semantic_type: SemanticType::Field,
2012 column_id: 3,
2013 },
2014 ];
2015 let create = RegionCreateRequest {
2016 engine: "mito".to_string(),
2017 column_metadatas,
2018 primary_key: vec![3, 4],
2019 options: HashMap::new(),
2020 table_dir: "path".to_string(),
2021 path_type: PathType::Bare,
2022 partition_expr_json: Some("".to_string()),
2023 };
2024
2025 assert!(create.validate().is_err());
2026 }
2027
2028 #[test]
2029 fn test_validate_modify_column_fulltext_options() {
2030 let kind = AlterKind::SetIndexes {
2031 options: vec![SetIndexOption::Fulltext {
2032 column_name: "tag_0".to_string(),
2033 options: FulltextOptions::new_unchecked(
2034 true,
2035 FulltextAnalyzer::Chinese,
2036 false,
2037 FulltextBackend::Bloom,
2038 1000,
2039 0.01,
2040 ),
2041 }],
2042 };
2043 let request = RegionAlterRequest { kind };
2044 let mut metadata = new_metadata();
2045 metadata.schema_version = 1;
2046 request.validate(&metadata).unwrap();
2047
2048 let kind = AlterKind::UnsetIndexes {
2049 options: vec![UnsetIndexOption::Fulltext {
2050 column_name: "tag_0".to_string(),
2051 }],
2052 };
2053 let request = RegionAlterRequest { kind };
2054 let mut metadata = new_metadata();
2055 metadata.schema_version = 1;
2056 request.validate(&metadata).unwrap();
2057 }
2058
2059 #[test]
2060 fn test_validate_sync_columns() {
2061 let metadata = new_metadata();
2062 let kind = AlterKind::SyncColumns {
2063 column_metadatas: vec![
2064 ColumnMetadata {
2065 column_schema: ColumnSchema::new(
2066 "tag_1",
2067 ConcreteDataType::string_datatype(),
2068 true,
2069 ),
2070 semantic_type: SemanticType::Tag,
2071 column_id: 5,
2072 },
2073 ColumnMetadata {
2074 column_schema: ColumnSchema::new(
2075 "field_2",
2076 ConcreteDataType::string_datatype(),
2077 true,
2078 ),
2079 semantic_type: SemanticType::Field,
2080 column_id: 6,
2081 },
2082 ],
2083 };
2084 let err = kind.validate(&metadata).unwrap_err();
2085 assert!(err.to_string().contains("not a primary key"));
2086
2087 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2089 let ts_column = column_metadatas_with_different_ts_column
2090 .iter_mut()
2091 .find(|c| c.semantic_type == SemanticType::Timestamp)
2092 .unwrap();
2093 ts_column.column_schema.name = "ts1".to_string();
2094
2095 let kind = AlterKind::SyncColumns {
2096 column_metadatas: column_metadatas_with_different_ts_column,
2097 };
2098 let err = kind.validate(&metadata).unwrap_err();
2099 assert!(
2100 err.to_string()
2101 .contains("timestamp column ts has different id")
2102 );
2103
2104 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2106 let pk_column = column_metadatas_with_different_pk_column
2107 .iter_mut()
2108 .find(|c| c.column_schema.name == "tag_0")
2109 .unwrap();
2110 pk_column.column_id = 100;
2111 let kind = AlterKind::SyncColumns {
2112 column_metadatas: column_metadatas_with_different_pk_column,
2113 };
2114 let err = kind.validate(&metadata).unwrap_err();
2115 assert!(
2116 err.to_string()
2117 .contains("column with same name tag_0 has different id")
2118 );
2119
2120 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2122 column_metadatas_with_new_field_column.push(ColumnMetadata {
2123 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2124 semantic_type: SemanticType::Field,
2125 column_id: 4,
2126 });
2127 let kind = AlterKind::SyncColumns {
2128 column_metadatas: column_metadatas_with_new_field_column,
2129 };
2130 kind.validate(&metadata).unwrap();
2131 }
2132
2133 #[test]
2134 fn test_cast_path_type_to_primitive() {
2135 assert_eq!(PathType::Bare as u8, 0);
2136 assert_eq!(PathType::Data as u8, 1);
2137 assert_eq!(PathType::Metadata as u8, 2);
2138 assert_eq!(
2139 PathType::try_from(PathType::Bare as u8).unwrap(),
2140 PathType::Bare
2141 );
2142 assert_eq!(
2143 PathType::try_from(PathType::Data as u8).unwrap(),
2144 PathType::Data
2145 );
2146 assert_eq!(
2147 PathType::try_from(PathType::Metadata as u8).unwrap(),
2148 PathType::Metadata
2149 );
2150 }
2151}