store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60/// The type of path to generate.
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64    /// A bare path - the original path of an engine.
65    ///
66    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
67    Bare,
68    /// A path for the data region of a metric engine table.
69    ///
70    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
71    Data,
72    /// A path for the metadata region of a metric engine table.
73    ///
74    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
75    Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80    Create(Vec<(RegionId, RegionCreateRequest)>),
81    Drop(Vec<(RegionId, RegionDropRequest)>),
82    Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
87    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88        match body {
89            region_request::Body::Creates(creates) => {
90                let requests = creates
91                    .requests
92                    .into_iter()
93                    .map(parse_region_create)
94                    .collect::<Result<Vec<_>>>()?;
95                Ok(Some(Self::Create(requests)))
96            }
97            region_request::Body::Drops(drops) => {
98                let requests = drops
99                    .requests
100                    .into_iter()
101                    .map(parse_region_drop)
102                    .collect::<Result<Vec<_>>>()?;
103                Ok(Some(Self::Drop(requests)))
104            }
105            region_request::Body::Alters(alters) => {
106                let requests = alters
107                    .requests
108                    .into_iter()
109                    .map(parse_region_alter)
110                    .collect::<Result<Vec<_>>>()?;
111                Ok(Some(Self::Alter(requests)))
112            }
113            _ => Ok(None),
114        }
115    }
116
117    pub fn request_type(&self) -> &'static str {
118        self.into()
119    }
120
121    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122        match self {
123            Self::Create(requests) => requests
124                .into_iter()
125                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126                .collect(),
127            Self::Drop(requests) => requests
128                .into_iter()
129                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130                .collect(),
131            Self::Alter(requests) => requests
132                .into_iter()
133                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134                .collect(),
135        }
136    }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141    Put(RegionPutRequest),
142    Delete(RegionDeleteRequest),
143    Create(RegionCreateRequest),
144    Drop(RegionDropRequest),
145    Open(RegionOpenRequest),
146    Close(RegionCloseRequest),
147    Alter(RegionAlterRequest),
148    Flush(RegionFlushRequest),
149    Compact(RegionCompactRequest),
150    BuildIndex(RegionBuildIndexRequest),
151    Truncate(RegionTruncateRequest),
152    Catchup(RegionCatchupRequest),
153    BulkInserts(RegionBulkInsertsRequest),
154    EnterStaging(EnterStagingRequest),
155}
156
157impl RegionRequest {
158    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
159    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
160    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
161        match body {
162            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
163            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
164            region_request::Body::Create(create) => make_region_create(create),
165            region_request::Body::Drop(drop) => make_region_drop(drop),
166            region_request::Body::Open(open) => make_region_open(open),
167            region_request::Body::Close(close) => make_region_close(close),
168            region_request::Body::Alter(alter) => make_region_alter(alter),
169            region_request::Body::Flush(flush) => make_region_flush(flush),
170            region_request::Body::Compact(compact) => make_region_compact(compact),
171            region_request::Body::BuildIndex(index) => make_region_build_index(index),
172            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
173            region_request::Body::Creates(creates) => make_region_creates(creates),
174            region_request::Body::Drops(drops) => make_region_drops(drops),
175            region_request::Body::Alters(alters) => make_region_alters(alters),
176            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
177            region_request::Body::Sync(_) => UnexpectedSnafu {
178                reason: "Sync request should be handled separately by RegionServer",
179            }
180            .fail(),
181            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
182                reason: "ListMetadata request should be handled separately by RegionServer",
183            }
184            .fail(),
185        }
186    }
187
188    /// Returns the type name of the request.
189    pub fn request_type(&self) -> &'static str {
190        self.into()
191    }
192}
193
194fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
195    let requests = inserts
196        .requests
197        .into_iter()
198        .filter_map(|r| {
199            let region_id = r.region_id.into();
200            r.rows.map(|rows| {
201                (
202                    region_id,
203                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
204                )
205            })
206        })
207        .collect();
208    Ok(requests)
209}
210
211fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
212    let requests = deletes
213        .requests
214        .into_iter()
215        .filter_map(|r| {
216            let region_id = r.region_id.into();
217            r.rows.map(|rows| {
218                (
219                    region_id,
220                    RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
221                )
222            })
223        })
224        .collect();
225    Ok(requests)
226}
227
228fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
229    let column_metadatas = create
230        .column_defs
231        .into_iter()
232        .map(ColumnMetadata::try_from_column_def)
233        .collect::<Result<Vec<_>>>()?;
234    let region_id = RegionId::from(create.region_id);
235    let table_dir = table_dir(&create.path, region_id.table_id());
236    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
237    Ok((
238        region_id,
239        RegionCreateRequest {
240            engine: create.engine,
241            column_metadatas,
242            primary_key: create.primary_key,
243            options: create.options,
244            table_dir,
245            path_type: PathType::Bare,
246            partition_expr_json,
247        },
248    ))
249}
250
251fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
252    let (region_id, request) = parse_region_create(create)?;
253    Ok(vec![(region_id, RegionRequest::Create(request))])
254}
255
256fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
257    let mut requests = Vec::with_capacity(creates.requests.len());
258    for create in creates.requests {
259        requests.extend(make_region_create(create)?);
260    }
261    Ok(requests)
262}
263
264fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
265    let region_id = drop.region_id.into();
266    Ok((
267        region_id,
268        RegionDropRequest {
269            fast_path: drop.fast_path,
270        },
271    ))
272}
273
274fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
275    let (region_id, request) = parse_region_drop(drop)?;
276    Ok(vec![(region_id, RegionRequest::Drop(request))])
277}
278
279fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
280    let mut requests = Vec::with_capacity(drops.requests.len());
281    for drop in drops.requests {
282        requests.extend(make_region_drop(drop)?);
283    }
284    Ok(requests)
285}
286
287fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
288    let region_id = RegionId::from(open.region_id);
289    let table_dir = table_dir(&open.path, region_id.table_id());
290    Ok(vec![(
291        region_id,
292        RegionRequest::Open(RegionOpenRequest {
293            engine: open.engine,
294            table_dir,
295            path_type: PathType::Bare,
296            options: open.options,
297            skip_wal_replay: false,
298            checkpoint: None,
299        }),
300    )])
301}
302
303fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
304    let region_id = close.region_id.into();
305    Ok(vec![(
306        region_id,
307        RegionRequest::Close(RegionCloseRequest {}),
308    )])
309}
310
311fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
312    let region_id = alter.region_id.into();
313    let request = RegionAlterRequest::try_from(alter)?;
314    Ok((region_id, request))
315}
316
317fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
318    let (region_id, request) = parse_region_alter(alter)?;
319    Ok(vec![(region_id, RegionRequest::Alter(request))])
320}
321
322fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
323    let mut requests = Vec::with_capacity(alters.requests.len());
324    for alter in alters.requests {
325        requests.extend(make_region_alter(alter)?);
326    }
327    Ok(requests)
328}
329
330fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
331    let region_id = flush.region_id.into();
332    Ok(vec![(
333        region_id,
334        RegionRequest::Flush(RegionFlushRequest {
335            row_group_size: None,
336        }),
337    )])
338}
339
340fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
341    let region_id = compact.region_id.into();
342    let options = compact
343        .options
344        .unwrap_or(compact_request::Options::Regular(Default::default()));
345    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
346    let parallelism = if compact.parallelism == 0 {
347        None
348    } else {
349        Some(compact.parallelism)
350    };
351    Ok(vec![(
352        region_id,
353        RegionRequest::Compact(RegionCompactRequest {
354            options,
355            parallelism,
356        }),
357    )])
358}
359
360fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
361    let region_id = index.region_id.into();
362    Ok(vec![(
363        region_id,
364        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
365    )])
366}
367
368fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
369    let region_id = truncate.region_id.into();
370    match truncate.kind {
371        None => InvalidRawRegionRequestSnafu {
372            err: "missing kind in TruncateRequest".to_string(),
373        }
374        .fail(),
375        Some(truncate_request::Kind::All(_)) => Ok(vec![(
376            region_id,
377            RegionRequest::Truncate(RegionTruncateRequest::All),
378        )]),
379        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
380            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
381
382            Ok(vec![(
383                region_id,
384                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
385            )])
386        }
387    }
388}
389
390/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
391fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
392    let region_id = request.region_id.into();
393    let Some(Body::ArrowIpc(request)) = request.body else {
394        return Ok(vec![]);
395    };
396
397    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
398        .with_label_values(&["decode"])
399        .start_timer();
400    let mut decoder =
401        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
402    let payload = decoder
403        .try_decode_record_batch(&request.data_header, &request.payload)
404        .context(FlightCodecSnafu)?;
405    decoder_timer.observe_duration();
406    Ok(vec![(
407        region_id,
408        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
409            region_id,
410            payload,
411            raw_data: request,
412        }),
413    )])
414}
415
416/// Request to put data into a region.
417#[derive(Debug)]
418pub struct RegionPutRequest {
419    /// Rows to put.
420    pub rows: Rows,
421    /// Write hint.
422    pub hint: Option<WriteHint>,
423}
424
425#[derive(Debug)]
426pub struct RegionReadRequest {
427    pub request: ScanRequest,
428}
429
430/// Request to delete data from a region.
431#[derive(Debug)]
432pub struct RegionDeleteRequest {
433    /// Keys to rows to delete.
434    ///
435    /// Each row only contains primary key columns and a time index column.
436    pub rows: Rows,
437    /// Write hint.
438    pub hint: Option<WriteHint>,
439}
440
441#[derive(Debug, Clone)]
442pub struct RegionCreateRequest {
443    /// Region engine name
444    pub engine: String,
445    /// Columns in this region.
446    pub column_metadatas: Vec<ColumnMetadata>,
447    /// Columns in the primary key.
448    pub primary_key: Vec<ColumnId>,
449    /// Options of the created region.
450    pub options: HashMap<String, String>,
451    /// Directory for table's data home. Usually is composed by catalog and table id
452    pub table_dir: String,
453    /// Path type for generating paths
454    pub path_type: PathType,
455    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
456    /// `Option` to keep compatibility with old clients.
457    pub partition_expr_json: Option<String>,
458}
459
460impl RegionCreateRequest {
461    /// Checks whether the request is valid, returns an error if it is invalid.
462    pub fn validate(&self) -> Result<()> {
463        // time index must exist
464        ensure!(
465            self.column_metadatas
466                .iter()
467                .any(|x| x.semantic_type == SemanticType::Timestamp),
468            InvalidRegionRequestSnafu {
469                region_id: RegionId::new(0, 0),
470                err: "missing timestamp column in create region request".to_string(),
471            }
472        );
473
474        // build column id to indices
475        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
476        for (i, c) in self.column_metadatas.iter().enumerate() {
477            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
478                return InvalidRegionRequestSnafu {
479                    region_id: RegionId::new(0, 0),
480                    err: format!(
481                        "duplicate column id {} (at position {} and {}) in create region request",
482                        c.column_id, previous, i
483                    ),
484                }
485                .fail();
486            }
487        }
488
489        // primary key must exist
490        for column_id in &self.primary_key {
491            ensure!(
492                column_id_to_indices.contains_key(column_id),
493                InvalidRegionRequestSnafu {
494                    region_id: RegionId::new(0, 0),
495                    err: format!(
496                        "missing primary key column {} in create region request",
497                        column_id
498                    ),
499                }
500            );
501        }
502
503        Ok(())
504    }
505
506    /// Returns true when the region belongs to the metric engine's physical table.
507    pub fn is_physical_table(&self) -> bool {
508        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
509    }
510}
511
512#[derive(Debug, Clone)]
513pub struct RegionDropRequest {
514    pub fast_path: bool,
515}
516
517/// Open region request.
518#[derive(Debug, Clone)]
519pub struct RegionOpenRequest {
520    /// Region engine name
521    pub engine: String,
522    /// Directory for table's data home. Usually is composed by catalog and table id
523    pub table_dir: String,
524    /// Path type for generating paths
525    pub path_type: PathType,
526    /// Options of the opened region.
527    pub options: HashMap<String, String>,
528    /// To skip replaying the WAL.
529    pub skip_wal_replay: bool,
530    /// Replay checkpoint.
531    pub checkpoint: Option<ReplayCheckpoint>,
532}
533
534#[derive(Debug, Clone, Copy, PartialEq, Eq)]
535pub struct ReplayCheckpoint {
536    pub entry_id: u64,
537    pub metadata_entry_id: Option<u64>,
538}
539
540impl RegionOpenRequest {
541    /// Returns true when the region belongs to the metric engine's physical table.
542    pub fn is_physical_table(&self) -> bool {
543        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
544    }
545}
546
547/// Close region request.
548#[derive(Debug)]
549pub struct RegionCloseRequest {}
550
551/// Alter metadata of a region.
552#[derive(Debug, PartialEq, Eq, Clone)]
553pub struct RegionAlterRequest {
554    /// Kind of alteration to do.
555    pub kind: AlterKind,
556}
557
558impl RegionAlterRequest {
559    /// Checks whether the request is valid, returns an error if it is invalid.
560    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
561        self.kind.validate(metadata)?;
562
563        Ok(())
564    }
565
566    /// Returns true if we need to apply the request to the region.
567    ///
568    /// The `request` should be valid.
569    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
570        debug_assert!(self.validate(metadata).is_ok());
571        self.kind.need_alter(metadata)
572    }
573}
574
575impl TryFrom<AlterRequest> for RegionAlterRequest {
576    type Error = MetadataError;
577
578    fn try_from(value: AlterRequest) -> Result<Self> {
579        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
580            err: "missing kind in AlterRequest",
581        })?;
582
583        let kind = AlterKind::try_from(kind)?;
584        Ok(RegionAlterRequest { kind })
585    }
586}
587
588/// Kind of the alteration.
589#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
590pub enum AlterKind {
591    /// Add columns to the region.
592    AddColumns {
593        /// Columns to add.
594        columns: Vec<AddColumn>,
595    },
596    /// Drop columns from the region, only fields are allowed to drop.
597    DropColumns {
598        /// Name of columns to drop.
599        names: Vec<String>,
600    },
601    /// Change columns datatype form the region, only fields are allowed to change.
602    ModifyColumnTypes {
603        /// Columns to change.
604        columns: Vec<ModifyColumnType>,
605    },
606    /// Set region options.
607    SetRegionOptions { options: Vec<SetRegionOption> },
608    /// Unset region options.
609    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
610    /// Set index options.
611    SetIndexes { options: Vec<SetIndexOption> },
612    /// Unset index options.
613    UnsetIndexes { options: Vec<UnsetIndexOption> },
614    /// Drop column default value.
615    DropDefaults {
616        /// Name of columns to drop.
617        names: Vec<String>,
618    },
619    /// Set column default value.
620    SetDefaults {
621        /// Columns to change.
622        columns: Vec<SetDefault>,
623    },
624    /// Sync column metadatas.
625    SyncColumns {
626        column_metadatas: Vec<ColumnMetadata>,
627    },
628}
629#[derive(Debug, PartialEq, Eq, Clone)]
630pub struct SetDefault {
631    pub name: String,
632    pub default_constraint: Vec<u8>,
633}
634
635#[derive(Debug, PartialEq, Eq, Clone)]
636pub enum SetIndexOption {
637    Fulltext {
638        column_name: String,
639        options: FulltextOptions,
640    },
641    Inverted {
642        column_name: String,
643    },
644    Skipping {
645        column_name: String,
646        options: SkippingIndexOptions,
647    },
648}
649
650impl SetIndexOption {
651    /// Returns the column name of the index option.
652    pub fn column_name(&self) -> &String {
653        match self {
654            SetIndexOption::Fulltext { column_name, .. } => column_name,
655            SetIndexOption::Inverted { column_name } => column_name,
656            SetIndexOption::Skipping { column_name, .. } => column_name,
657        }
658    }
659
660    /// Returns true if the index option is fulltext.
661    pub fn is_fulltext(&self) -> bool {
662        match self {
663            SetIndexOption::Fulltext { .. } => true,
664            SetIndexOption::Inverted { .. } => false,
665            SetIndexOption::Skipping { .. } => false,
666        }
667    }
668}
669
670impl TryFrom<v1::SetIndex> for SetIndexOption {
671    type Error = MetadataError;
672
673    fn try_from(value: v1::SetIndex) -> Result<Self> {
674        let option = value.options.context(InvalidRawRegionRequestSnafu {
675            err: "missing options in SetIndex",
676        })?;
677
678        let opt = match option {
679            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
680                column_name: x.column_name.clone(),
681                options: FulltextOptions::new(
682                    x.enable,
683                    as_fulltext_option_analyzer(
684                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
685                    ),
686                    x.case_sensitive,
687                    as_fulltext_option_backend(
688                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
689                    ),
690                    x.granularity as u32,
691                    x.false_positive_rate,
692                )
693                .context(InvalidIndexOptionSnafu)?,
694            },
695            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
696                column_name: i.column_name,
697            },
698            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
699                column_name: s.column_name,
700                options: SkippingIndexOptions::new(
701                    s.granularity as u32,
702                    s.false_positive_rate,
703                    as_skipping_index_type(
704                        PbSkippingIndexType::try_from(s.skipping_index_type)
705                            .context(DecodeProtoSnafu)?,
706                    ),
707                )
708                .context(InvalidIndexOptionSnafu)?,
709            },
710        };
711
712        Ok(opt)
713    }
714}
715
716#[derive(Debug, PartialEq, Eq, Clone)]
717pub enum UnsetIndexOption {
718    Fulltext { column_name: String },
719    Inverted { column_name: String },
720    Skipping { column_name: String },
721}
722
723impl UnsetIndexOption {
724    pub fn column_name(&self) -> &String {
725        match self {
726            UnsetIndexOption::Fulltext { column_name } => column_name,
727            UnsetIndexOption::Inverted { column_name } => column_name,
728            UnsetIndexOption::Skipping { column_name } => column_name,
729        }
730    }
731
732    pub fn is_fulltext(&self) -> bool {
733        match self {
734            UnsetIndexOption::Fulltext { .. } => true,
735            UnsetIndexOption::Inverted { .. } => false,
736            UnsetIndexOption::Skipping { .. } => false,
737        }
738    }
739}
740
741impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
742    type Error = MetadataError;
743
744    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
745        let option = value.options.context(InvalidRawRegionRequestSnafu {
746            err: "missing options in UnsetIndex",
747        })?;
748
749        let opt = match option {
750            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
751                column_name: f.column_name,
752            },
753            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
754                column_name: i.column_name,
755            },
756            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
757                column_name: s.column_name,
758            },
759        };
760
761        Ok(opt)
762    }
763}
764
765impl AlterKind {
766    /// Returns an error if the alter kind is invalid.
767    ///
768    /// It allows adding column if not exists and dropping column if exists.
769    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
770        match self {
771            AlterKind::AddColumns { columns } => {
772                for col_to_add in columns {
773                    col_to_add.validate(metadata)?;
774                }
775            }
776            AlterKind::DropColumns { names } => {
777                for name in names {
778                    Self::validate_column_to_drop(name, metadata)?;
779                }
780            }
781            AlterKind::ModifyColumnTypes { columns } => {
782                for col_to_change in columns {
783                    col_to_change.validate(metadata)?;
784                }
785            }
786            AlterKind::SetRegionOptions { .. } => {}
787            AlterKind::UnsetRegionOptions { .. } => {}
788            AlterKind::SetIndexes { options } => {
789                for option in options {
790                    Self::validate_column_alter_index_option(
791                        option.column_name(),
792                        metadata,
793                        option.is_fulltext(),
794                    )?;
795                }
796            }
797            AlterKind::UnsetIndexes { options } => {
798                for option in options {
799                    Self::validate_column_alter_index_option(
800                        option.column_name(),
801                        metadata,
802                        option.is_fulltext(),
803                    )?;
804                }
805            }
806            AlterKind::DropDefaults { names } => {
807                names
808                    .iter()
809                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
810            }
811            AlterKind::SetDefaults { columns } => {
812                columns
813                    .iter()
814                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
815            }
816            AlterKind::SyncColumns { column_metadatas } => {
817                let new_primary_keys = column_metadatas
818                    .iter()
819                    .filter(|c| c.semantic_type == SemanticType::Tag)
820                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
821                    .collect::<HashMap<_, _>>();
822
823                let old_primary_keys = metadata
824                    .column_metadatas
825                    .iter()
826                    .filter(|c| c.semantic_type == SemanticType::Tag)
827                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
828
829                for (name, id) in old_primary_keys {
830                    let primary_key =
831                        new_primary_keys
832                            .get(name)
833                            .with_context(|| InvalidRegionRequestSnafu {
834                                region_id: metadata.region_id,
835                                err: format!("column {} is not a primary key", name),
836                            })?;
837
838                    ensure!(
839                        *primary_key == id,
840                        InvalidRegionRequestSnafu {
841                            region_id: metadata.region_id,
842                            err: format!(
843                                "column with same name {} has different id, existing: {}, got: {}",
844                                name, id, primary_key
845                            ),
846                        }
847                    );
848                }
849
850                let new_ts_column = column_metadatas
851                    .iter()
852                    .find(|c| c.semantic_type == SemanticType::Timestamp)
853                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
854                    .context(InvalidRegionRequestSnafu {
855                        region_id: metadata.region_id,
856                        err: "timestamp column not found",
857                    })?;
858
859                // Safety: timestamp column must exist.
860                let old_ts_column = metadata
861                    .column_metadatas
862                    .iter()
863                    .find(|c| c.semantic_type == SemanticType::Timestamp)
864                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
865                    .unwrap();
866
867                ensure!(
868                    new_ts_column == old_ts_column,
869                    InvalidRegionRequestSnafu {
870                        region_id: metadata.region_id,
871                        err: format!(
872                            "timestamp column {} has different id, existing: {}, got: {}",
873                            old_ts_column.0, old_ts_column.1, new_ts_column.1
874                        ),
875                    }
876                );
877            }
878        }
879        Ok(())
880    }
881
882    /// Returns true if we need to apply the alteration to the region.
883    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
884        debug_assert!(self.validate(metadata).is_ok());
885        match self {
886            AlterKind::AddColumns { columns } => columns
887                .iter()
888                .any(|col_to_add| col_to_add.need_alter(metadata)),
889            AlterKind::DropColumns { names } => names
890                .iter()
891                .any(|name| metadata.column_by_name(name).is_some()),
892            AlterKind::ModifyColumnTypes { columns } => columns
893                .iter()
894                .any(|col_to_change| col_to_change.need_alter(metadata)),
895            AlterKind::SetRegionOptions { .. } => true,
896            AlterKind::UnsetRegionOptions { .. } => true,
897            AlterKind::SetIndexes { options, .. } => options
898                .iter()
899                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
900            AlterKind::UnsetIndexes { options } => options
901                .iter()
902                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
903            AlterKind::DropDefaults { names } => names
904                .iter()
905                .any(|name| metadata.column_by_name(name).is_some()),
906
907            AlterKind::SetDefaults { columns } => columns
908                .iter()
909                .any(|x| metadata.column_by_name(&x.name).is_some()),
910            AlterKind::SyncColumns { column_metadatas } => {
911                metadata.column_metadatas != *column_metadatas
912            }
913        }
914    }
915
916    /// Returns an error if the column to drop is invalid.
917    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
918        let Some(column) = metadata.column_by_name(name) else {
919            return Ok(());
920        };
921        ensure!(
922            column.semantic_type == SemanticType::Field,
923            InvalidRegionRequestSnafu {
924                region_id: metadata.region_id,
925                err: format!("column {} is not a field and could not be dropped", name),
926            }
927        );
928        Ok(())
929    }
930
931    /// Returns an error if the column's alter index option is invalid.
932    fn validate_column_alter_index_option(
933        column_name: &String,
934        metadata: &RegionMetadata,
935        is_fulltext: bool,
936    ) -> Result<()> {
937        let column = metadata
938            .column_by_name(column_name)
939            .context(InvalidRegionRequestSnafu {
940                region_id: metadata.region_id,
941                err: format!("column {} not found", column_name),
942            })?;
943
944        if is_fulltext {
945            ensure!(
946                column.column_schema.data_type.is_string(),
947                InvalidRegionRequestSnafu {
948                    region_id: metadata.region_id,
949                    err: format!(
950                        "cannot change alter index options for non-string column {}",
951                        column_name
952                    ),
953                }
954            );
955        }
956
957        Ok(())
958    }
959
960    /// Returns an error if the column isn't exist.
961    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
962        metadata
963            .column_by_name(column_name)
964            .context(InvalidRegionRequestSnafu {
965                region_id: metadata.region_id,
966                err: format!("column {} not found", column_name),
967            })?;
968
969        Ok(())
970    }
971}
972
973impl TryFrom<alter_request::Kind> for AlterKind {
974    type Error = MetadataError;
975
976    fn try_from(kind: alter_request::Kind) -> Result<Self> {
977        let alter_kind = match kind {
978            alter_request::Kind::AddColumns(x) => {
979                let columns = x
980                    .add_columns
981                    .into_iter()
982                    .map(|x| x.try_into())
983                    .collect::<Result<Vec<_>>>()?;
984                AlterKind::AddColumns { columns }
985            }
986            alter_request::Kind::ModifyColumnTypes(x) => {
987                let columns = x
988                    .modify_column_types
989                    .into_iter()
990                    .map(|x| x.into())
991                    .collect::<Vec<_>>();
992                AlterKind::ModifyColumnTypes { columns }
993            }
994            alter_request::Kind::DropColumns(x) => {
995                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
996                AlterKind::DropColumns { names }
997            }
998            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
999                options: options
1000                    .table_options
1001                    .iter()
1002                    .map(TryFrom::try_from)
1003                    .collect::<Result<Vec<_>>>()?,
1004            },
1005            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1006                keys: options
1007                    .keys
1008                    .iter()
1009                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1010                    .collect::<Result<Vec<_>>>()?,
1011            },
1012            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1013                options: vec![SetIndexOption::try_from(o)?],
1014            },
1015            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1016                options: vec![UnsetIndexOption::try_from(o)?],
1017            },
1018            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1019                options: o
1020                    .set_indexes
1021                    .into_iter()
1022                    .map(SetIndexOption::try_from)
1023                    .collect::<Result<Vec<_>>>()?,
1024            },
1025            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1026                options: o
1027                    .unset_indexes
1028                    .into_iter()
1029                    .map(UnsetIndexOption::try_from)
1030                    .collect::<Result<Vec<_>>>()?,
1031            },
1032            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1033                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1034            },
1035            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1036                columns: x
1037                    .set_defaults
1038                    .into_iter()
1039                    .map(|x| {
1040                        Ok(SetDefault {
1041                            name: x.column_name,
1042                            default_constraint: x.default_constraint.clone(),
1043                        })
1044                    })
1045                    .collect::<Result<Vec<_>>>()?,
1046            },
1047            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1048                column_metadatas: x
1049                    .column_defs
1050                    .into_iter()
1051                    .map(ColumnMetadata::try_from_column_def)
1052                    .collect::<Result<Vec<_>>>()?,
1053            },
1054        };
1055
1056        Ok(alter_kind)
1057    }
1058}
1059
1060/// Adds a column.
1061#[derive(Debug, PartialEq, Eq, Clone)]
1062pub struct AddColumn {
1063    /// Metadata of the column to add.
1064    pub column_metadata: ColumnMetadata,
1065    /// Location to add the column. If location is None, the region adds
1066    /// the column to the last.
1067    pub location: Option<AddColumnLocation>,
1068}
1069
1070impl AddColumn {
1071    /// Returns an error if the column to add is invalid.
1072    ///
1073    /// It allows adding existing columns. However, the existing column must have the same metadata
1074    /// and the location must be None.
1075    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1076        ensure!(
1077            self.column_metadata.column_schema.is_nullable()
1078                || self
1079                    .column_metadata
1080                    .column_schema
1081                    .default_constraint()
1082                    .is_some(),
1083            InvalidRegionRequestSnafu {
1084                region_id: metadata.region_id,
1085                err: format!(
1086                    "no default value for column {}",
1087                    self.column_metadata.column_schema.name
1088                ),
1089            }
1090        );
1091
1092        if let Some(existing_column) =
1093            metadata.column_by_name(&self.column_metadata.column_schema.name)
1094        {
1095            // If the column already exists.
1096            ensure!(
1097                *existing_column == self.column_metadata,
1098                InvalidRegionRequestSnafu {
1099                    region_id: metadata.region_id,
1100                    err: format!(
1101                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1102                        self.column_metadata.column_schema.name,
1103                        existing_column,
1104                        self.column_metadata,
1105                    ),
1106                }
1107            );
1108            ensure!(
1109                self.location.is_none(),
1110                InvalidRegionRequestSnafu {
1111                    region_id: metadata.region_id,
1112                    err: format!(
1113                        "column {} already exists, but location is specified",
1114                        self.column_metadata.column_schema.name
1115                    ),
1116                }
1117            );
1118        }
1119
1120        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1121            // Ensures the existing column has the same name.
1122            ensure!(
1123                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1124                InvalidRegionRequestSnafu {
1125                    region_id: metadata.region_id,
1126                    err: format!(
1127                        "column id {} already exists with different name {}",
1128                        self.column_metadata.column_id, existing_column.column_schema.name
1129                    ),
1130                }
1131            );
1132        }
1133
1134        Ok(())
1135    }
1136
1137    /// Returns true if no column to add to the region.
1138    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1139        debug_assert!(self.validate(metadata).is_ok());
1140        metadata
1141            .column_by_name(&self.column_metadata.column_schema.name)
1142            .is_none()
1143    }
1144}
1145
1146impl TryFrom<v1::region::AddColumn> for AddColumn {
1147    type Error = MetadataError;
1148
1149    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1150        let column_def = add_column
1151            .column_def
1152            .context(InvalidRawRegionRequestSnafu {
1153                err: "missing column_def in AddColumn",
1154            })?;
1155
1156        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1157        let location = add_column
1158            .location
1159            .map(AddColumnLocation::try_from)
1160            .transpose()?;
1161
1162        Ok(AddColumn {
1163            column_metadata,
1164            location,
1165        })
1166    }
1167}
1168
1169/// Location to add a column.
1170#[derive(Debug, PartialEq, Eq, Clone)]
1171pub enum AddColumnLocation {
1172    /// Add the column to the first position of columns.
1173    First,
1174    /// Add the column after specific column.
1175    After {
1176        /// Add the column after this column.
1177        column_name: String,
1178    },
1179}
1180
1181impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1182    type Error = MetadataError;
1183
1184    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1185        let location_type = LocationType::try_from(location.location_type)
1186            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1187        let add_column_location = match location_type {
1188            LocationType::First => AddColumnLocation::First,
1189            LocationType::After => AddColumnLocation::After {
1190                column_name: location.after_column_name,
1191            },
1192        };
1193
1194        Ok(add_column_location)
1195    }
1196}
1197
1198/// Change a column's datatype.
1199#[derive(Debug, PartialEq, Eq, Clone)]
1200pub struct ModifyColumnType {
1201    /// Schema of the column to modify.
1202    pub column_name: String,
1203    /// Column will be changed to this type.
1204    pub target_type: ConcreteDataType,
1205}
1206
1207impl ModifyColumnType {
1208    /// Returns an error if the column's datatype to change is invalid.
1209    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1210        let column_meta = metadata
1211            .column_by_name(&self.column_name)
1212            .with_context(|| InvalidRegionRequestSnafu {
1213                region_id: metadata.region_id,
1214                err: format!("column {} not found", self.column_name),
1215            })?;
1216
1217        ensure!(
1218            matches!(column_meta.semantic_type, SemanticType::Field),
1219            InvalidRegionRequestSnafu {
1220                region_id: metadata.region_id,
1221                err: "'timestamp' or 'tag' column cannot change type".to_string()
1222            }
1223        );
1224        ensure!(
1225            column_meta
1226                .column_schema
1227                .data_type
1228                .can_arrow_type_cast_to(&self.target_type),
1229            InvalidRegionRequestSnafu {
1230                region_id: metadata.region_id,
1231                err: format!(
1232                    "column '{}' cannot be cast automatically to type '{}'",
1233                    self.column_name, self.target_type
1234                ),
1235            }
1236        );
1237
1238        Ok(())
1239    }
1240
1241    /// Returns true if no column's datatype to change to the region.
1242    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1243        debug_assert!(self.validate(metadata).is_ok());
1244        metadata.column_by_name(&self.column_name).is_some()
1245    }
1246}
1247
1248impl From<v1::ModifyColumnType> for ModifyColumnType {
1249    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1250        let target_type = ColumnDataTypeWrapper::new(
1251            modify_column_type.target_type(),
1252            modify_column_type.target_type_extension,
1253        )
1254        .into();
1255
1256        ModifyColumnType {
1257            column_name: modify_column_type.column_name,
1258            target_type,
1259        }
1260    }
1261}
1262
1263#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1264pub enum SetRegionOption {
1265    Ttl(Option<TimeToLive>),
1266    // Modifying TwscOptions with values as (option name, new value).
1267    Twsc(String, String),
1268    // Modifying the SST format.
1269    Format(String),
1270}
1271
1272impl TryFrom<&PbOption> for SetRegionOption {
1273    type Error = MetadataError;
1274
1275    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1276        let PbOption { key, value } = value;
1277        match key.as_str() {
1278            TTL_KEY => {
1279                let ttl = TimeToLive::from_humantime_or_str(value)
1280                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1281
1282                Ok(Self::Ttl(Some(ttl)))
1283            }
1284            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1285                Ok(Self::Twsc(key.clone(), value.clone()))
1286            }
1287            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1288            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1289        }
1290    }
1291}
1292
1293impl From<&UnsetRegionOption> for SetRegionOption {
1294    fn from(unset_option: &UnsetRegionOption) -> Self {
1295        match unset_option {
1296            UnsetRegionOption::TwcsTriggerFileNum => {
1297                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1298            }
1299            UnsetRegionOption::TwcsMaxOutputFileSize => {
1300                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1301            }
1302            UnsetRegionOption::TwcsTimeWindow => {
1303                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1304            }
1305            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1306        }
1307    }
1308}
1309
1310impl TryFrom<&str> for UnsetRegionOption {
1311    type Error = MetadataError;
1312
1313    fn try_from(key: &str) -> Result<Self> {
1314        match key.to_ascii_lowercase().as_str() {
1315            TTL_KEY => Ok(Self::Ttl),
1316            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1317            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1318            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1319            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1320        }
1321    }
1322}
1323
1324#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1325pub enum UnsetRegionOption {
1326    TwcsTriggerFileNum,
1327    TwcsMaxOutputFileSize,
1328    TwcsTimeWindow,
1329    Ttl,
1330}
1331
1332impl UnsetRegionOption {
1333    pub fn as_str(&self) -> &str {
1334        match self {
1335            Self::Ttl => TTL_KEY,
1336            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1337            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1338            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1339        }
1340    }
1341}
1342
1343impl Display for UnsetRegionOption {
1344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1345        write!(f, "{}", self.as_str())
1346    }
1347}
1348
1349#[derive(Debug, Clone, Default)]
1350pub struct RegionFlushRequest {
1351    pub row_group_size: Option<usize>,
1352}
1353
1354#[derive(Debug)]
1355pub struct RegionCompactRequest {
1356    pub options: compact_request::Options,
1357    pub parallelism: Option<u32>,
1358}
1359
1360impl Default for RegionCompactRequest {
1361    fn default() -> Self {
1362        Self {
1363            // Default to regular compaction.
1364            options: compact_request::Options::Regular(Default::default()),
1365            parallelism: None,
1366        }
1367    }
1368}
1369
1370#[derive(Debug, Clone, Default)]
1371pub struct RegionBuildIndexRequest {}
1372
1373/// Truncate region request.
1374#[derive(Debug)]
1375pub enum RegionTruncateRequest {
1376    /// Truncate all data in the region.
1377    All,
1378    ByTimeRanges {
1379        /// Time ranges to truncate. Both bound are inclusive.
1380        /// only files that are fully contained in the time range will be truncated.
1381        /// so no guarantee that all data in the time range will be truncated.
1382        time_ranges: Vec<(Timestamp, Timestamp)>,
1383    },
1384}
1385
1386/// Catchup region request.
1387///
1388/// Makes a readonly region to catch up to leader region changes.
1389/// There is no effect if it operating on a leader region.
1390#[derive(Debug, Clone, Copy, Default)]
1391pub struct RegionCatchupRequest {
1392    /// Sets it to writable if it's available after it has caught up with all changes.
1393    pub set_writable: bool,
1394    /// The `entry_id` that was expected to reply to.
1395    /// `None` stands replaying to latest.
1396    pub entry_id: Option<entry::Id>,
1397    /// Used for metrics metadata region.
1398    /// The `entry_id` that was expected to reply to.
1399    /// `None` stands replaying to latest.
1400    pub metadata_entry_id: Option<entry::Id>,
1401    /// The hint for replaying memtable.
1402    pub location_id: Option<u64>,
1403    /// Replay checkpoint.
1404    pub checkpoint: Option<ReplayCheckpoint>,
1405}
1406
1407#[derive(Debug, Clone)]
1408pub struct RegionBulkInsertsRequest {
1409    pub region_id: RegionId,
1410    pub payload: DfRecordBatch,
1411    pub raw_data: ArrowIpc,
1412}
1413
1414impl RegionBulkInsertsRequest {
1415    pub fn estimated_size(&self) -> usize {
1416        self.payload.get_array_memory_size()
1417    }
1418}
1419
1420/// Request to stage a region with a new region rule(partition expression).
1421///
1422/// This request transitions a region into the staging mode.
1423/// It first flushes the memtable for the old region rule if it is not empty,
1424/// then enters the staging mode with the new region rule.
1425#[derive(Debug, Clone)]
1426pub struct EnterStagingRequest {
1427    /// The partition expression of the staging region.
1428    pub partition_expr: String,
1429}
1430
1431impl fmt::Display for RegionRequest {
1432    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1433        match self {
1434            RegionRequest::Put(_) => write!(f, "Put"),
1435            RegionRequest::Delete(_) => write!(f, "Delete"),
1436            RegionRequest::Create(_) => write!(f, "Create"),
1437            RegionRequest::Drop(_) => write!(f, "Drop"),
1438            RegionRequest::Open(_) => write!(f, "Open"),
1439            RegionRequest::Close(_) => write!(f, "Close"),
1440            RegionRequest::Alter(_) => write!(f, "Alter"),
1441            RegionRequest::Flush(_) => write!(f, "Flush"),
1442            RegionRequest::Compact(_) => write!(f, "Compact"),
1443            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1444            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1445            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1446            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1447            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1448        }
1449    }
1450}
1451
1452#[cfg(test)]
1453mod tests {
1454
1455    use api::v1::region::RegionColumnDef;
1456    use api::v1::{ColumnDataType, ColumnDef};
1457    use datatypes::prelude::ConcreteDataType;
1458    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1459
1460    use super::*;
1461    use crate::metadata::RegionMetadataBuilder;
1462
1463    #[test]
1464    fn test_from_proto_location() {
1465        let proto_location = v1::AddColumnLocation {
1466            location_type: LocationType::First as i32,
1467            after_column_name: String::default(),
1468        };
1469        let location = AddColumnLocation::try_from(proto_location).unwrap();
1470        assert_eq!(location, AddColumnLocation::First);
1471
1472        let proto_location = v1::AddColumnLocation {
1473            location_type: 10,
1474            after_column_name: String::default(),
1475        };
1476        AddColumnLocation::try_from(proto_location).unwrap_err();
1477
1478        let proto_location = v1::AddColumnLocation {
1479            location_type: LocationType::After as i32,
1480            after_column_name: "a".to_string(),
1481        };
1482        let location = AddColumnLocation::try_from(proto_location).unwrap();
1483        assert_eq!(
1484            location,
1485            AddColumnLocation::After {
1486                column_name: "a".to_string()
1487            }
1488        );
1489    }
1490
1491    #[test]
1492    fn test_from_none_proto_add_column() {
1493        AddColumn::try_from(v1::region::AddColumn {
1494            column_def: None,
1495            location: None,
1496        })
1497        .unwrap_err();
1498    }
1499
1500    #[test]
1501    fn test_from_proto_alter_request() {
1502        RegionAlterRequest::try_from(AlterRequest {
1503            region_id: 0,
1504            schema_version: 1,
1505            kind: None,
1506        })
1507        .unwrap_err();
1508
1509        let request = RegionAlterRequest::try_from(AlterRequest {
1510            region_id: 0,
1511            schema_version: 1,
1512            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1513                add_columns: vec![v1::region::AddColumn {
1514                    column_def: Some(RegionColumnDef {
1515                        column_def: Some(ColumnDef {
1516                            name: "a".to_string(),
1517                            data_type: ColumnDataType::String as i32,
1518                            is_nullable: true,
1519                            default_constraint: vec![],
1520                            semantic_type: SemanticType::Field as i32,
1521                            comment: String::new(),
1522                            ..Default::default()
1523                        }),
1524                        column_id: 1,
1525                    }),
1526                    location: Some(v1::AddColumnLocation {
1527                        location_type: LocationType::First as i32,
1528                        after_column_name: String::default(),
1529                    }),
1530                }],
1531            })),
1532        })
1533        .unwrap();
1534
1535        assert_eq!(
1536            request,
1537            RegionAlterRequest {
1538                kind: AlterKind::AddColumns {
1539                    columns: vec![AddColumn {
1540                        column_metadata: ColumnMetadata {
1541                            column_schema: ColumnSchema::new(
1542                                "a",
1543                                ConcreteDataType::string_datatype(),
1544                                true,
1545                            ),
1546                            semantic_type: SemanticType::Field,
1547                            column_id: 1,
1548                        },
1549                        location: Some(AddColumnLocation::First),
1550                    }]
1551                },
1552            }
1553        );
1554    }
1555
1556    /// Returns a new region metadata for testing. Metadata:
1557    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1558    fn new_metadata() -> RegionMetadata {
1559        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1560        builder
1561            .push_column_metadata(ColumnMetadata {
1562                column_schema: ColumnSchema::new(
1563                    "ts",
1564                    ConcreteDataType::timestamp_millisecond_datatype(),
1565                    false,
1566                ),
1567                semantic_type: SemanticType::Timestamp,
1568                column_id: 1,
1569            })
1570            .push_column_metadata(ColumnMetadata {
1571                column_schema: ColumnSchema::new(
1572                    "tag_0",
1573                    ConcreteDataType::string_datatype(),
1574                    true,
1575                ),
1576                semantic_type: SemanticType::Tag,
1577                column_id: 2,
1578            })
1579            .push_column_metadata(ColumnMetadata {
1580                column_schema: ColumnSchema::new(
1581                    "field_0",
1582                    ConcreteDataType::string_datatype(),
1583                    true,
1584                ),
1585                semantic_type: SemanticType::Field,
1586                column_id: 3,
1587            })
1588            .push_column_metadata(ColumnMetadata {
1589                column_schema: ColumnSchema::new(
1590                    "field_1",
1591                    ConcreteDataType::boolean_datatype(),
1592                    true,
1593                ),
1594                semantic_type: SemanticType::Field,
1595                column_id: 4,
1596            })
1597            .primary_key(vec![2]);
1598        builder.build().unwrap()
1599    }
1600
1601    #[test]
1602    fn test_add_column_validate() {
1603        let metadata = new_metadata();
1604        let add_column = AddColumn {
1605            column_metadata: ColumnMetadata {
1606                column_schema: ColumnSchema::new(
1607                    "tag_1",
1608                    ConcreteDataType::string_datatype(),
1609                    true,
1610                ),
1611                semantic_type: SemanticType::Tag,
1612                column_id: 5,
1613            },
1614            location: None,
1615        };
1616        add_column.validate(&metadata).unwrap();
1617        assert!(add_column.need_alter(&metadata));
1618
1619        // Add not null column.
1620        AddColumn {
1621            column_metadata: ColumnMetadata {
1622                column_schema: ColumnSchema::new(
1623                    "tag_1",
1624                    ConcreteDataType::string_datatype(),
1625                    false,
1626                ),
1627                semantic_type: SemanticType::Tag,
1628                column_id: 5,
1629            },
1630            location: None,
1631        }
1632        .validate(&metadata)
1633        .unwrap_err();
1634
1635        // Add existing column.
1636        let add_column = AddColumn {
1637            column_metadata: ColumnMetadata {
1638                column_schema: ColumnSchema::new(
1639                    "tag_0",
1640                    ConcreteDataType::string_datatype(),
1641                    true,
1642                ),
1643                semantic_type: SemanticType::Tag,
1644                column_id: 2,
1645            },
1646            location: None,
1647        };
1648        add_column.validate(&metadata).unwrap();
1649        assert!(!add_column.need_alter(&metadata));
1650    }
1651
1652    #[test]
1653    fn test_add_duplicate_columns() {
1654        let kind = AlterKind::AddColumns {
1655            columns: vec![
1656                AddColumn {
1657                    column_metadata: ColumnMetadata {
1658                        column_schema: ColumnSchema::new(
1659                            "tag_1",
1660                            ConcreteDataType::string_datatype(),
1661                            true,
1662                        ),
1663                        semantic_type: SemanticType::Tag,
1664                        column_id: 5,
1665                    },
1666                    location: None,
1667                },
1668                AddColumn {
1669                    column_metadata: ColumnMetadata {
1670                        column_schema: ColumnSchema::new(
1671                            "tag_1",
1672                            ConcreteDataType::string_datatype(),
1673                            true,
1674                        ),
1675                        semantic_type: SemanticType::Field,
1676                        column_id: 6,
1677                    },
1678                    location: None,
1679                },
1680            ],
1681        };
1682        let metadata = new_metadata();
1683        kind.validate(&metadata).unwrap();
1684        assert!(kind.need_alter(&metadata));
1685    }
1686
1687    #[test]
1688    fn test_add_existing_column_different_metadata() {
1689        let metadata = new_metadata();
1690
1691        // Add existing column with different id.
1692        let kind = AlterKind::AddColumns {
1693            columns: vec![AddColumn {
1694                column_metadata: ColumnMetadata {
1695                    column_schema: ColumnSchema::new(
1696                        "tag_0",
1697                        ConcreteDataType::string_datatype(),
1698                        true,
1699                    ),
1700                    semantic_type: SemanticType::Tag,
1701                    column_id: 4,
1702                },
1703                location: None,
1704            }],
1705        };
1706        kind.validate(&metadata).unwrap_err();
1707
1708        // Add existing column with different type.
1709        let kind = AlterKind::AddColumns {
1710            columns: vec![AddColumn {
1711                column_metadata: ColumnMetadata {
1712                    column_schema: ColumnSchema::new(
1713                        "tag_0",
1714                        ConcreteDataType::int64_datatype(),
1715                        true,
1716                    ),
1717                    semantic_type: SemanticType::Tag,
1718                    column_id: 2,
1719                },
1720                location: None,
1721            }],
1722        };
1723        kind.validate(&metadata).unwrap_err();
1724
1725        // Add existing column with different name.
1726        let kind = AlterKind::AddColumns {
1727            columns: vec![AddColumn {
1728                column_metadata: ColumnMetadata {
1729                    column_schema: ColumnSchema::new(
1730                        "tag_1",
1731                        ConcreteDataType::string_datatype(),
1732                        true,
1733                    ),
1734                    semantic_type: SemanticType::Tag,
1735                    column_id: 2,
1736                },
1737                location: None,
1738            }],
1739        };
1740        kind.validate(&metadata).unwrap_err();
1741    }
1742
1743    #[test]
1744    fn test_add_existing_column_with_location() {
1745        let metadata = new_metadata();
1746        let kind = AlterKind::AddColumns {
1747            columns: vec![AddColumn {
1748                column_metadata: ColumnMetadata {
1749                    column_schema: ColumnSchema::new(
1750                        "tag_0",
1751                        ConcreteDataType::string_datatype(),
1752                        true,
1753                    ),
1754                    semantic_type: SemanticType::Tag,
1755                    column_id: 2,
1756                },
1757                location: Some(AddColumnLocation::First),
1758            }],
1759        };
1760        kind.validate(&metadata).unwrap_err();
1761    }
1762
1763    #[test]
1764    fn test_validate_drop_column() {
1765        let metadata = new_metadata();
1766        let kind = AlterKind::DropColumns {
1767            names: vec!["xxxx".to_string()],
1768        };
1769        kind.validate(&metadata).unwrap();
1770        assert!(!kind.need_alter(&metadata));
1771
1772        AlterKind::DropColumns {
1773            names: vec!["tag_0".to_string()],
1774        }
1775        .validate(&metadata)
1776        .unwrap_err();
1777
1778        let kind = AlterKind::DropColumns {
1779            names: vec!["field_0".to_string()],
1780        };
1781        kind.validate(&metadata).unwrap();
1782        assert!(kind.need_alter(&metadata));
1783    }
1784
1785    #[test]
1786    fn test_validate_modify_column_type() {
1787        let metadata = new_metadata();
1788        AlterKind::ModifyColumnTypes {
1789            columns: vec![ModifyColumnType {
1790                column_name: "xxxx".to_string(),
1791                target_type: ConcreteDataType::string_datatype(),
1792            }],
1793        }
1794        .validate(&metadata)
1795        .unwrap_err();
1796
1797        AlterKind::ModifyColumnTypes {
1798            columns: vec![ModifyColumnType {
1799                column_name: "field_1".to_string(),
1800                target_type: ConcreteDataType::date_datatype(),
1801            }],
1802        }
1803        .validate(&metadata)
1804        .unwrap_err();
1805
1806        AlterKind::ModifyColumnTypes {
1807            columns: vec![ModifyColumnType {
1808                column_name: "ts".to_string(),
1809                target_type: ConcreteDataType::date_datatype(),
1810            }],
1811        }
1812        .validate(&metadata)
1813        .unwrap_err();
1814
1815        AlterKind::ModifyColumnTypes {
1816            columns: vec![ModifyColumnType {
1817                column_name: "tag_0".to_string(),
1818                target_type: ConcreteDataType::date_datatype(),
1819            }],
1820        }
1821        .validate(&metadata)
1822        .unwrap_err();
1823
1824        let kind = AlterKind::ModifyColumnTypes {
1825            columns: vec![ModifyColumnType {
1826                column_name: "field_0".to_string(),
1827                target_type: ConcreteDataType::int32_datatype(),
1828            }],
1829        };
1830        kind.validate(&metadata).unwrap();
1831        assert!(kind.need_alter(&metadata));
1832    }
1833
1834    #[test]
1835    fn test_validate_add_columns() {
1836        let kind = AlterKind::AddColumns {
1837            columns: vec![
1838                AddColumn {
1839                    column_metadata: ColumnMetadata {
1840                        column_schema: ColumnSchema::new(
1841                            "tag_1",
1842                            ConcreteDataType::string_datatype(),
1843                            true,
1844                        ),
1845                        semantic_type: SemanticType::Tag,
1846                        column_id: 5,
1847                    },
1848                    location: None,
1849                },
1850                AddColumn {
1851                    column_metadata: ColumnMetadata {
1852                        column_schema: ColumnSchema::new(
1853                            "field_2",
1854                            ConcreteDataType::string_datatype(),
1855                            true,
1856                        ),
1857                        semantic_type: SemanticType::Field,
1858                        column_id: 6,
1859                    },
1860                    location: None,
1861                },
1862            ],
1863        };
1864        let request = RegionAlterRequest { kind };
1865        let mut metadata = new_metadata();
1866        metadata.schema_version = 1;
1867        request.validate(&metadata).unwrap();
1868    }
1869
1870    #[test]
1871    fn test_validate_create_region() {
1872        let column_metadatas = vec![
1873            ColumnMetadata {
1874                column_schema: ColumnSchema::new(
1875                    "ts",
1876                    ConcreteDataType::timestamp_millisecond_datatype(),
1877                    false,
1878                ),
1879                semantic_type: SemanticType::Timestamp,
1880                column_id: 1,
1881            },
1882            ColumnMetadata {
1883                column_schema: ColumnSchema::new(
1884                    "tag_0",
1885                    ConcreteDataType::string_datatype(),
1886                    true,
1887                ),
1888                semantic_type: SemanticType::Tag,
1889                column_id: 2,
1890            },
1891            ColumnMetadata {
1892                column_schema: ColumnSchema::new(
1893                    "field_0",
1894                    ConcreteDataType::string_datatype(),
1895                    true,
1896                ),
1897                semantic_type: SemanticType::Field,
1898                column_id: 3,
1899            },
1900        ];
1901        let create = RegionCreateRequest {
1902            engine: "mito".to_string(),
1903            column_metadatas,
1904            primary_key: vec![3, 4],
1905            options: HashMap::new(),
1906            table_dir: "path".to_string(),
1907            path_type: PathType::Bare,
1908            partition_expr_json: Some("".to_string()),
1909        };
1910
1911        assert!(create.validate().is_err());
1912    }
1913
1914    #[test]
1915    fn test_validate_modify_column_fulltext_options() {
1916        let kind = AlterKind::SetIndexes {
1917            options: vec![SetIndexOption::Fulltext {
1918                column_name: "tag_0".to_string(),
1919                options: FulltextOptions::new_unchecked(
1920                    true,
1921                    FulltextAnalyzer::Chinese,
1922                    false,
1923                    FulltextBackend::Bloom,
1924                    1000,
1925                    0.01,
1926                ),
1927            }],
1928        };
1929        let request = RegionAlterRequest { kind };
1930        let mut metadata = new_metadata();
1931        metadata.schema_version = 1;
1932        request.validate(&metadata).unwrap();
1933
1934        let kind = AlterKind::UnsetIndexes {
1935            options: vec![UnsetIndexOption::Fulltext {
1936                column_name: "tag_0".to_string(),
1937            }],
1938        };
1939        let request = RegionAlterRequest { kind };
1940        let mut metadata = new_metadata();
1941        metadata.schema_version = 1;
1942        request.validate(&metadata).unwrap();
1943    }
1944
1945    #[test]
1946    fn test_validate_sync_columns() {
1947        let metadata = new_metadata();
1948        let kind = AlterKind::SyncColumns {
1949            column_metadatas: vec![
1950                ColumnMetadata {
1951                    column_schema: ColumnSchema::new(
1952                        "tag_1",
1953                        ConcreteDataType::string_datatype(),
1954                        true,
1955                    ),
1956                    semantic_type: SemanticType::Tag,
1957                    column_id: 5,
1958                },
1959                ColumnMetadata {
1960                    column_schema: ColumnSchema::new(
1961                        "field_2",
1962                        ConcreteDataType::string_datatype(),
1963                        true,
1964                    ),
1965                    semantic_type: SemanticType::Field,
1966                    column_id: 6,
1967                },
1968            ],
1969        };
1970        let err = kind.validate(&metadata).unwrap_err();
1971        assert!(err.to_string().contains("not a primary key"));
1972
1973        // Change the timestamp column name.
1974        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1975        let ts_column = column_metadatas_with_different_ts_column
1976            .iter_mut()
1977            .find(|c| c.semantic_type == SemanticType::Timestamp)
1978            .unwrap();
1979        ts_column.column_schema.name = "ts1".to_string();
1980
1981        let kind = AlterKind::SyncColumns {
1982            column_metadatas: column_metadatas_with_different_ts_column,
1983        };
1984        let err = kind.validate(&metadata).unwrap_err();
1985        assert!(
1986            err.to_string()
1987                .contains("timestamp column ts has different id")
1988        );
1989
1990        // Change the primary key column name.
1991        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1992        let pk_column = column_metadatas_with_different_pk_column
1993            .iter_mut()
1994            .find(|c| c.column_schema.name == "tag_0")
1995            .unwrap();
1996        pk_column.column_id = 100;
1997        let kind = AlterKind::SyncColumns {
1998            column_metadatas: column_metadatas_with_different_pk_column,
1999        };
2000        let err = kind.validate(&metadata).unwrap_err();
2001        assert!(
2002            err.to_string()
2003                .contains("column with same name tag_0 has different id")
2004        );
2005
2006        // Add a new field column.
2007        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2008        column_metadatas_with_new_field_column.push(ColumnMetadata {
2009            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2010            semantic_type: SemanticType::Field,
2011            column_id: 4,
2012        });
2013        let kind = AlterKind::SyncColumns {
2014            column_metadatas: column_metadatas_with_new_field_column,
2015        };
2016        kind.validate(&metadata).unwrap();
2017    }
2018
2019    #[test]
2020    fn test_cast_path_type_to_primitive() {
2021        assert_eq!(PathType::Bare as u8, 0);
2022        assert_eq!(PathType::Data as u8, 1);
2023        assert_eq!(PathType::Metadata as u8, 2);
2024        assert_eq!(
2025            PathType::try_from(PathType::Bare as u8).unwrap(),
2026            PathType::Bare
2027        );
2028        assert_eq!(
2029            PathType::try_from(PathType::Data as u8).unwrap(),
2030            PathType::Data
2031        );
2032        assert_eq!(
2033            PathType::try_from(PathType::Metadata as u8).unwrap(),
2034            PathType::Metadata
2035        );
2036    }
2037}