store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60/// The type of path to generate.
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64    /// A bare path - the original path of an engine.
65    ///
66    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
67    Bare,
68    /// A path for the data region of a metric engine table.
69    ///
70    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
71    Data,
72    /// A path for the metadata region of a metric engine table.
73    ///
74    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
75    Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80    Create(Vec<(RegionId, RegionCreateRequest)>),
81    Drop(Vec<(RegionId, RegionDropRequest)>),
82    Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
87    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88        match body {
89            region_request::Body::Creates(creates) => {
90                let requests = creates
91                    .requests
92                    .into_iter()
93                    .map(parse_region_create)
94                    .collect::<Result<Vec<_>>>()?;
95                Ok(Some(Self::Create(requests)))
96            }
97            region_request::Body::Drops(drops) => {
98                let requests = drops
99                    .requests
100                    .into_iter()
101                    .map(parse_region_drop)
102                    .collect::<Result<Vec<_>>>()?;
103                Ok(Some(Self::Drop(requests)))
104            }
105            region_request::Body::Alters(alters) => {
106                let requests = alters
107                    .requests
108                    .into_iter()
109                    .map(parse_region_alter)
110                    .collect::<Result<Vec<_>>>()?;
111                Ok(Some(Self::Alter(requests)))
112            }
113            _ => Ok(None),
114        }
115    }
116
117    pub fn request_type(&self) -> &'static str {
118        self.into()
119    }
120
121    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122        match self {
123            Self::Create(requests) => requests
124                .into_iter()
125                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126                .collect(),
127            Self::Drop(requests) => requests
128                .into_iter()
129                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130                .collect(),
131            Self::Alter(requests) => requests
132                .into_iter()
133                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134                .collect(),
135        }
136    }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141    Put(RegionPutRequest),
142    Delete(RegionDeleteRequest),
143    Create(RegionCreateRequest),
144    Drop(RegionDropRequest),
145    Open(RegionOpenRequest),
146    Close(RegionCloseRequest),
147    Alter(RegionAlterRequest),
148    Flush(RegionFlushRequest),
149    Compact(RegionCompactRequest),
150    BuildIndex(RegionBuildIndexRequest),
151    Truncate(RegionTruncateRequest),
152    Catchup(RegionCatchupRequest),
153    BulkInserts(RegionBulkInsertsRequest),
154    EnterStaging(EnterStagingRequest),
155    ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
160    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
161    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162        match body {
163            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165            region_request::Body::Create(create) => make_region_create(create),
166            region_request::Body::Drop(drop) => make_region_drop(drop),
167            region_request::Body::Open(open) => make_region_open(open),
168            region_request::Body::Close(close) => make_region_close(close),
169            region_request::Body::Alter(alter) => make_region_alter(alter),
170            region_request::Body::Flush(flush) => make_region_flush(flush),
171            region_request::Body::Compact(compact) => make_region_compact(compact),
172            region_request::Body::BuildIndex(index) => make_region_build_index(index),
173            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174            region_request::Body::Creates(creates) => make_region_creates(creates),
175            region_request::Body::Drops(drops) => make_region_drops(drops),
176            region_request::Body::Alters(alters) => make_region_alters(alters),
177            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178            region_request::Body::Sync(_) => UnexpectedSnafu {
179                reason: "Sync request should be handled separately by RegionServer",
180            }
181            .fail(),
182            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183                reason: "ListMetadata request should be handled separately by RegionServer",
184            }
185            .fail(),
186            region_request::Body::ApplyStagingManifest(apply) => {
187                make_region_apply_staging_manifest(apply)
188            }
189        }
190    }
191
192    /// Returns the type name of the request.
193    pub fn request_type(&self) -> &'static str {
194        self.into()
195    }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199    let requests = inserts
200        .requests
201        .into_iter()
202        .filter_map(|r| {
203            let region_id = r.region_id.into();
204            r.rows.map(|rows| {
205                (
206                    region_id,
207                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
208                )
209            })
210        })
211        .collect();
212    Ok(requests)
213}
214
215fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
216    let requests = deletes
217        .requests
218        .into_iter()
219        .filter_map(|r| {
220            let region_id = r.region_id.into();
221            r.rows.map(|rows| {
222                (
223                    region_id,
224                    RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
225                )
226            })
227        })
228        .collect();
229    Ok(requests)
230}
231
232fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
233    let column_metadatas = create
234        .column_defs
235        .into_iter()
236        .map(ColumnMetadata::try_from_column_def)
237        .collect::<Result<Vec<_>>>()?;
238    let region_id = RegionId::from(create.region_id);
239    let table_dir = table_dir(&create.path, region_id.table_id());
240    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
241    Ok((
242        region_id,
243        RegionCreateRequest {
244            engine: create.engine,
245            column_metadatas,
246            primary_key: create.primary_key,
247            options: create.options,
248            table_dir,
249            path_type: PathType::Bare,
250            partition_expr_json,
251        },
252    ))
253}
254
255fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
256    let (region_id, request) = parse_region_create(create)?;
257    Ok(vec![(region_id, RegionRequest::Create(request))])
258}
259
260fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
261    let mut requests = Vec::with_capacity(creates.requests.len());
262    for create in creates.requests {
263        requests.extend(make_region_create(create)?);
264    }
265    Ok(requests)
266}
267
268fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
269    let region_id = drop.region_id.into();
270    Ok((
271        region_id,
272        RegionDropRequest {
273            fast_path: drop.fast_path,
274        },
275    ))
276}
277
278fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
279    let (region_id, request) = parse_region_drop(drop)?;
280    Ok(vec![(region_id, RegionRequest::Drop(request))])
281}
282
283fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
284    let mut requests = Vec::with_capacity(drops.requests.len());
285    for drop in drops.requests {
286        requests.extend(make_region_drop(drop)?);
287    }
288    Ok(requests)
289}
290
291fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
292    let region_id = RegionId::from(open.region_id);
293    let table_dir = table_dir(&open.path, region_id.table_id());
294    Ok(vec![(
295        region_id,
296        RegionRequest::Open(RegionOpenRequest {
297            engine: open.engine,
298            table_dir,
299            path_type: PathType::Bare,
300            options: open.options,
301            skip_wal_replay: false,
302            checkpoint: None,
303        }),
304    )])
305}
306
307fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308    let region_id = close.region_id.into();
309    Ok(vec![(
310        region_id,
311        RegionRequest::Close(RegionCloseRequest {}),
312    )])
313}
314
315fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
316    let region_id = alter.region_id.into();
317    let request = RegionAlterRequest::try_from(alter)?;
318    Ok((region_id, request))
319}
320
321fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
322    let (region_id, request) = parse_region_alter(alter)?;
323    Ok(vec![(region_id, RegionRequest::Alter(request))])
324}
325
326fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
327    let mut requests = Vec::with_capacity(alters.requests.len());
328    for alter in alters.requests {
329        requests.extend(make_region_alter(alter)?);
330    }
331    Ok(requests)
332}
333
334fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
335    let region_id = flush.region_id.into();
336    Ok(vec![(
337        region_id,
338        RegionRequest::Flush(RegionFlushRequest {
339            row_group_size: None,
340        }),
341    )])
342}
343
344fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
345    let region_id = compact.region_id.into();
346    let options = compact
347        .options
348        .unwrap_or(compact_request::Options::Regular(Default::default()));
349    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
350    let parallelism = if compact.parallelism == 0 {
351        None
352    } else {
353        Some(compact.parallelism)
354    };
355    Ok(vec![(
356        region_id,
357        RegionRequest::Compact(RegionCompactRequest {
358            options,
359            parallelism,
360        }),
361    )])
362}
363
364fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
365    let region_id = index.region_id.into();
366    Ok(vec![(
367        region_id,
368        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
369    )])
370}
371
372fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
373    let region_id = truncate.region_id.into();
374    match truncate.kind {
375        None => InvalidRawRegionRequestSnafu {
376            err: "missing kind in TruncateRequest".to_string(),
377        }
378        .fail(),
379        Some(truncate_request::Kind::All(_)) => Ok(vec![(
380            region_id,
381            RegionRequest::Truncate(RegionTruncateRequest::All),
382        )]),
383        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
384            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
385
386            Ok(vec![(
387                region_id,
388                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
389            )])
390        }
391    }
392}
393
394/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
395fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
396    let region_id = request.region_id.into();
397    let Some(Body::ArrowIpc(request)) = request.body else {
398        return Ok(vec![]);
399    };
400
401    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
402        .with_label_values(&["decode"])
403        .start_timer();
404    let mut decoder =
405        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
406    let payload = decoder
407        .try_decode_record_batch(&request.data_header, &request.payload)
408        .context(FlightCodecSnafu)?;
409    decoder_timer.observe_duration();
410    Ok(vec![(
411        region_id,
412        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
413            region_id,
414            payload,
415            raw_data: request,
416        }),
417    )])
418}
419
420fn make_region_apply_staging_manifest(
421    api::v1::region::ApplyStagingManifestRequest {
422        region_id,
423        partition_expr,
424        central_region_id,
425        manifest_path,
426    }: api::v1::region::ApplyStagingManifestRequest,
427) -> Result<Vec<(RegionId, RegionRequest)>> {
428    let region_id = region_id.into();
429    Ok(vec![(
430        region_id,
431        RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
432            partition_expr,
433            central_region_id: central_region_id.into(),
434            manifest_path,
435        }),
436    )])
437}
438
439/// Request to put data into a region.
440#[derive(Debug)]
441pub struct RegionPutRequest {
442    /// Rows to put.
443    pub rows: Rows,
444    /// Write hint.
445    pub hint: Option<WriteHint>,
446}
447
448#[derive(Debug)]
449pub struct RegionReadRequest {
450    pub request: ScanRequest,
451}
452
453/// Request to delete data from a region.
454#[derive(Debug)]
455pub struct RegionDeleteRequest {
456    /// Keys to rows to delete.
457    ///
458    /// Each row only contains primary key columns and a time index column.
459    pub rows: Rows,
460    /// Write hint.
461    pub hint: Option<WriteHint>,
462}
463
464#[derive(Debug, Clone)]
465pub struct RegionCreateRequest {
466    /// Region engine name
467    pub engine: String,
468    /// Columns in this region.
469    pub column_metadatas: Vec<ColumnMetadata>,
470    /// Columns in the primary key.
471    pub primary_key: Vec<ColumnId>,
472    /// Options of the created region.
473    pub options: HashMap<String, String>,
474    /// Directory for table's data home. Usually is composed by catalog and table id
475    pub table_dir: String,
476    /// Path type for generating paths
477    pub path_type: PathType,
478    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
479    /// `Option` to keep compatibility with old clients.
480    pub partition_expr_json: Option<String>,
481}
482
483impl RegionCreateRequest {
484    /// Checks whether the request is valid, returns an error if it is invalid.
485    pub fn validate(&self) -> Result<()> {
486        // time index must exist
487        ensure!(
488            self.column_metadatas
489                .iter()
490                .any(|x| x.semantic_type == SemanticType::Timestamp),
491            InvalidRegionRequestSnafu {
492                region_id: RegionId::new(0, 0),
493                err: "missing timestamp column in create region request".to_string(),
494            }
495        );
496
497        // build column id to indices
498        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
499        for (i, c) in self.column_metadatas.iter().enumerate() {
500            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
501                return InvalidRegionRequestSnafu {
502                    region_id: RegionId::new(0, 0),
503                    err: format!(
504                        "duplicate column id {} (at position {} and {}) in create region request",
505                        c.column_id, previous, i
506                    ),
507                }
508                .fail();
509            }
510        }
511
512        // primary key must exist
513        for column_id in &self.primary_key {
514            ensure!(
515                column_id_to_indices.contains_key(column_id),
516                InvalidRegionRequestSnafu {
517                    region_id: RegionId::new(0, 0),
518                    err: format!(
519                        "missing primary key column {} in create region request",
520                        column_id
521                    ),
522                }
523            );
524        }
525
526        Ok(())
527    }
528
529    /// Returns true when the region belongs to the metric engine's physical table.
530    pub fn is_physical_table(&self) -> bool {
531        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
532    }
533}
534
535#[derive(Debug, Clone)]
536pub struct RegionDropRequest {
537    pub fast_path: bool,
538}
539
540/// Open region request.
541#[derive(Debug, Clone)]
542pub struct RegionOpenRequest {
543    /// Region engine name
544    pub engine: String,
545    /// Directory for table's data home. Usually is composed by catalog and table id
546    pub table_dir: String,
547    /// Path type for generating paths
548    pub path_type: PathType,
549    /// Options of the opened region.
550    pub options: HashMap<String, String>,
551    /// To skip replaying the WAL.
552    pub skip_wal_replay: bool,
553    /// Replay checkpoint.
554    pub checkpoint: Option<ReplayCheckpoint>,
555}
556
557#[derive(Debug, Clone, Copy, PartialEq, Eq)]
558pub struct ReplayCheckpoint {
559    pub entry_id: u64,
560    pub metadata_entry_id: Option<u64>,
561}
562
563impl RegionOpenRequest {
564    /// Returns true when the region belongs to the metric engine's physical table.
565    pub fn is_physical_table(&self) -> bool {
566        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
567    }
568}
569
570/// Close region request.
571#[derive(Debug)]
572pub struct RegionCloseRequest {}
573
574/// Alter metadata of a region.
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct RegionAlterRequest {
577    /// Kind of alteration to do.
578    pub kind: AlterKind,
579}
580
581impl RegionAlterRequest {
582    /// Checks whether the request is valid, returns an error if it is invalid.
583    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
584        self.kind.validate(metadata)?;
585
586        Ok(())
587    }
588
589    /// Returns true if we need to apply the request to the region.
590    ///
591    /// The `request` should be valid.
592    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
593        debug_assert!(self.validate(metadata).is_ok());
594        self.kind.need_alter(metadata)
595    }
596}
597
598impl TryFrom<AlterRequest> for RegionAlterRequest {
599    type Error = MetadataError;
600
601    fn try_from(value: AlterRequest) -> Result<Self> {
602        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
603            err: "missing kind in AlterRequest",
604        })?;
605
606        let kind = AlterKind::try_from(kind)?;
607        Ok(RegionAlterRequest { kind })
608    }
609}
610
611/// Kind of the alteration.
612#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
613pub enum AlterKind {
614    /// Add columns to the region.
615    AddColumns {
616        /// Columns to add.
617        columns: Vec<AddColumn>,
618    },
619    /// Drop columns from the region, only fields are allowed to drop.
620    DropColumns {
621        /// Name of columns to drop.
622        names: Vec<String>,
623    },
624    /// Change columns datatype form the region, only fields are allowed to change.
625    ModifyColumnTypes {
626        /// Columns to change.
627        columns: Vec<ModifyColumnType>,
628    },
629    /// Set region options.
630    SetRegionOptions { options: Vec<SetRegionOption> },
631    /// Unset region options.
632    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
633    /// Set index options.
634    SetIndexes { options: Vec<SetIndexOption> },
635    /// Unset index options.
636    UnsetIndexes { options: Vec<UnsetIndexOption> },
637    /// Drop column default value.
638    DropDefaults {
639        /// Name of columns to drop.
640        names: Vec<String>,
641    },
642    /// Set column default value.
643    SetDefaults {
644        /// Columns to change.
645        columns: Vec<SetDefault>,
646    },
647    /// Sync column metadatas.
648    SyncColumns {
649        column_metadatas: Vec<ColumnMetadata>,
650    },
651}
652#[derive(Debug, PartialEq, Eq, Clone)]
653pub struct SetDefault {
654    pub name: String,
655    pub default_constraint: Vec<u8>,
656}
657
658#[derive(Debug, PartialEq, Eq, Clone)]
659pub enum SetIndexOption {
660    Fulltext {
661        column_name: String,
662        options: FulltextOptions,
663    },
664    Inverted {
665        column_name: String,
666    },
667    Skipping {
668        column_name: String,
669        options: SkippingIndexOptions,
670    },
671}
672
673impl SetIndexOption {
674    /// Returns the column name of the index option.
675    pub fn column_name(&self) -> &String {
676        match self {
677            SetIndexOption::Fulltext { column_name, .. } => column_name,
678            SetIndexOption::Inverted { column_name } => column_name,
679            SetIndexOption::Skipping { column_name, .. } => column_name,
680        }
681    }
682
683    /// Returns true if the index option is fulltext.
684    pub fn is_fulltext(&self) -> bool {
685        match self {
686            SetIndexOption::Fulltext { .. } => true,
687            SetIndexOption::Inverted { .. } => false,
688            SetIndexOption::Skipping { .. } => false,
689        }
690    }
691}
692
693impl TryFrom<v1::SetIndex> for SetIndexOption {
694    type Error = MetadataError;
695
696    fn try_from(value: v1::SetIndex) -> Result<Self> {
697        let option = value.options.context(InvalidRawRegionRequestSnafu {
698            err: "missing options in SetIndex",
699        })?;
700
701        let opt = match option {
702            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
703                column_name: x.column_name.clone(),
704                options: FulltextOptions::new(
705                    x.enable,
706                    as_fulltext_option_analyzer(
707                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
708                    ),
709                    x.case_sensitive,
710                    as_fulltext_option_backend(
711                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
712                    ),
713                    x.granularity as u32,
714                    x.false_positive_rate,
715                )
716                .context(InvalidIndexOptionSnafu)?,
717            },
718            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
719                column_name: i.column_name,
720            },
721            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
722                column_name: s.column_name,
723                options: SkippingIndexOptions::new(
724                    s.granularity as u32,
725                    s.false_positive_rate,
726                    as_skipping_index_type(
727                        PbSkippingIndexType::try_from(s.skipping_index_type)
728                            .context(DecodeProtoSnafu)?,
729                    ),
730                )
731                .context(InvalidIndexOptionSnafu)?,
732            },
733        };
734
735        Ok(opt)
736    }
737}
738
739#[derive(Debug, PartialEq, Eq, Clone)]
740pub enum UnsetIndexOption {
741    Fulltext { column_name: String },
742    Inverted { column_name: String },
743    Skipping { column_name: String },
744}
745
746impl UnsetIndexOption {
747    pub fn column_name(&self) -> &String {
748        match self {
749            UnsetIndexOption::Fulltext { column_name } => column_name,
750            UnsetIndexOption::Inverted { column_name } => column_name,
751            UnsetIndexOption::Skipping { column_name } => column_name,
752        }
753    }
754
755    pub fn is_fulltext(&self) -> bool {
756        match self {
757            UnsetIndexOption::Fulltext { .. } => true,
758            UnsetIndexOption::Inverted { .. } => false,
759            UnsetIndexOption::Skipping { .. } => false,
760        }
761    }
762}
763
764impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
765    type Error = MetadataError;
766
767    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
768        let option = value.options.context(InvalidRawRegionRequestSnafu {
769            err: "missing options in UnsetIndex",
770        })?;
771
772        let opt = match option {
773            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
774                column_name: f.column_name,
775            },
776            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
777                column_name: i.column_name,
778            },
779            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
780                column_name: s.column_name,
781            },
782        };
783
784        Ok(opt)
785    }
786}
787
788impl AlterKind {
789    /// Returns an error if the alter kind is invalid.
790    ///
791    /// It allows adding column if not exists and dropping column if exists.
792    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
793        match self {
794            AlterKind::AddColumns { columns } => {
795                for col_to_add in columns {
796                    col_to_add.validate(metadata)?;
797                }
798            }
799            AlterKind::DropColumns { names } => {
800                for name in names {
801                    Self::validate_column_to_drop(name, metadata)?;
802                }
803            }
804            AlterKind::ModifyColumnTypes { columns } => {
805                for col_to_change in columns {
806                    col_to_change.validate(metadata)?;
807                }
808            }
809            AlterKind::SetRegionOptions { .. } => {}
810            AlterKind::UnsetRegionOptions { .. } => {}
811            AlterKind::SetIndexes { options } => {
812                for option in options {
813                    Self::validate_column_alter_index_option(
814                        option.column_name(),
815                        metadata,
816                        option.is_fulltext(),
817                    )?;
818                }
819            }
820            AlterKind::UnsetIndexes { options } => {
821                for option in options {
822                    Self::validate_column_alter_index_option(
823                        option.column_name(),
824                        metadata,
825                        option.is_fulltext(),
826                    )?;
827                }
828            }
829            AlterKind::DropDefaults { names } => {
830                names
831                    .iter()
832                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
833            }
834            AlterKind::SetDefaults { columns } => {
835                columns
836                    .iter()
837                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
838            }
839            AlterKind::SyncColumns { column_metadatas } => {
840                let new_primary_keys = column_metadatas
841                    .iter()
842                    .filter(|c| c.semantic_type == SemanticType::Tag)
843                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
844                    .collect::<HashMap<_, _>>();
845
846                let old_primary_keys = metadata
847                    .column_metadatas
848                    .iter()
849                    .filter(|c| c.semantic_type == SemanticType::Tag)
850                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
851
852                for (name, id) in old_primary_keys {
853                    let primary_key =
854                        new_primary_keys
855                            .get(name)
856                            .with_context(|| InvalidRegionRequestSnafu {
857                                region_id: metadata.region_id,
858                                err: format!("column {} is not a primary key", name),
859                            })?;
860
861                    ensure!(
862                        *primary_key == id,
863                        InvalidRegionRequestSnafu {
864                            region_id: metadata.region_id,
865                            err: format!(
866                                "column with same name {} has different id, existing: {}, got: {}",
867                                name, id, primary_key
868                            ),
869                        }
870                    );
871                }
872
873                let new_ts_column = column_metadatas
874                    .iter()
875                    .find(|c| c.semantic_type == SemanticType::Timestamp)
876                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
877                    .context(InvalidRegionRequestSnafu {
878                        region_id: metadata.region_id,
879                        err: "timestamp column not found",
880                    })?;
881
882                // Safety: timestamp column must exist.
883                let old_ts_column = metadata
884                    .column_metadatas
885                    .iter()
886                    .find(|c| c.semantic_type == SemanticType::Timestamp)
887                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
888                    .unwrap();
889
890                ensure!(
891                    new_ts_column == old_ts_column,
892                    InvalidRegionRequestSnafu {
893                        region_id: metadata.region_id,
894                        err: format!(
895                            "timestamp column {} has different id, existing: {}, got: {}",
896                            old_ts_column.0, old_ts_column.1, new_ts_column.1
897                        ),
898                    }
899                );
900            }
901        }
902        Ok(())
903    }
904
905    /// Returns true if we need to apply the alteration to the region.
906    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
907        debug_assert!(self.validate(metadata).is_ok());
908        match self {
909            AlterKind::AddColumns { columns } => columns
910                .iter()
911                .any(|col_to_add| col_to_add.need_alter(metadata)),
912            AlterKind::DropColumns { names } => names
913                .iter()
914                .any(|name| metadata.column_by_name(name).is_some()),
915            AlterKind::ModifyColumnTypes { columns } => columns
916                .iter()
917                .any(|col_to_change| col_to_change.need_alter(metadata)),
918            AlterKind::SetRegionOptions { .. } => true,
919            AlterKind::UnsetRegionOptions { .. } => true,
920            AlterKind::SetIndexes { options, .. } => options
921                .iter()
922                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
923            AlterKind::UnsetIndexes { options } => options
924                .iter()
925                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
926            AlterKind::DropDefaults { names } => names
927                .iter()
928                .any(|name| metadata.column_by_name(name).is_some()),
929
930            AlterKind::SetDefaults { columns } => columns
931                .iter()
932                .any(|x| metadata.column_by_name(&x.name).is_some()),
933            AlterKind::SyncColumns { column_metadatas } => {
934                metadata.column_metadatas != *column_metadatas
935            }
936        }
937    }
938
939    /// Returns an error if the column to drop is invalid.
940    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
941        let Some(column) = metadata.column_by_name(name) else {
942            return Ok(());
943        };
944        ensure!(
945            column.semantic_type == SemanticType::Field,
946            InvalidRegionRequestSnafu {
947                region_id: metadata.region_id,
948                err: format!("column {} is not a field and could not be dropped", name),
949            }
950        );
951        Ok(())
952    }
953
954    /// Returns an error if the column's alter index option is invalid.
955    fn validate_column_alter_index_option(
956        column_name: &String,
957        metadata: &RegionMetadata,
958        is_fulltext: bool,
959    ) -> Result<()> {
960        let column = metadata
961            .column_by_name(column_name)
962            .context(InvalidRegionRequestSnafu {
963                region_id: metadata.region_id,
964                err: format!("column {} not found", column_name),
965            })?;
966
967        if is_fulltext {
968            ensure!(
969                column.column_schema.data_type.is_string(),
970                InvalidRegionRequestSnafu {
971                    region_id: metadata.region_id,
972                    err: format!(
973                        "cannot change alter index options for non-string column {}",
974                        column_name
975                    ),
976                }
977            );
978        }
979
980        Ok(())
981    }
982
983    /// Returns an error if the column isn't exist.
984    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
985        metadata
986            .column_by_name(column_name)
987            .context(InvalidRegionRequestSnafu {
988                region_id: metadata.region_id,
989                err: format!("column {} not found", column_name),
990            })?;
991
992        Ok(())
993    }
994}
995
996impl TryFrom<alter_request::Kind> for AlterKind {
997    type Error = MetadataError;
998
999    fn try_from(kind: alter_request::Kind) -> Result<Self> {
1000        let alter_kind = match kind {
1001            alter_request::Kind::AddColumns(x) => {
1002                let columns = x
1003                    .add_columns
1004                    .into_iter()
1005                    .map(|x| x.try_into())
1006                    .collect::<Result<Vec<_>>>()?;
1007                AlterKind::AddColumns { columns }
1008            }
1009            alter_request::Kind::ModifyColumnTypes(x) => {
1010                let columns = x
1011                    .modify_column_types
1012                    .into_iter()
1013                    .map(|x| x.into())
1014                    .collect::<Vec<_>>();
1015                AlterKind::ModifyColumnTypes { columns }
1016            }
1017            alter_request::Kind::DropColumns(x) => {
1018                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1019                AlterKind::DropColumns { names }
1020            }
1021            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1022                options: options
1023                    .table_options
1024                    .iter()
1025                    .map(TryFrom::try_from)
1026                    .collect::<Result<Vec<_>>>()?,
1027            },
1028            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1029                keys: options
1030                    .keys
1031                    .iter()
1032                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1033                    .collect::<Result<Vec<_>>>()?,
1034            },
1035            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1036                options: vec![SetIndexOption::try_from(o)?],
1037            },
1038            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1039                options: vec![UnsetIndexOption::try_from(o)?],
1040            },
1041            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1042                options: o
1043                    .set_indexes
1044                    .into_iter()
1045                    .map(SetIndexOption::try_from)
1046                    .collect::<Result<Vec<_>>>()?,
1047            },
1048            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1049                options: o
1050                    .unset_indexes
1051                    .into_iter()
1052                    .map(UnsetIndexOption::try_from)
1053                    .collect::<Result<Vec<_>>>()?,
1054            },
1055            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1056                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1057            },
1058            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1059                columns: x
1060                    .set_defaults
1061                    .into_iter()
1062                    .map(|x| {
1063                        Ok(SetDefault {
1064                            name: x.column_name,
1065                            default_constraint: x.default_constraint.clone(),
1066                        })
1067                    })
1068                    .collect::<Result<Vec<_>>>()?,
1069            },
1070            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1071                column_metadatas: x
1072                    .column_defs
1073                    .into_iter()
1074                    .map(ColumnMetadata::try_from_column_def)
1075                    .collect::<Result<Vec<_>>>()?,
1076            },
1077        };
1078
1079        Ok(alter_kind)
1080    }
1081}
1082
1083/// Adds a column.
1084#[derive(Debug, PartialEq, Eq, Clone)]
1085pub struct AddColumn {
1086    /// Metadata of the column to add.
1087    pub column_metadata: ColumnMetadata,
1088    /// Location to add the column. If location is None, the region adds
1089    /// the column to the last.
1090    pub location: Option<AddColumnLocation>,
1091}
1092
1093impl AddColumn {
1094    /// Returns an error if the column to add is invalid.
1095    ///
1096    /// It allows adding existing columns. However, the existing column must have the same metadata
1097    /// and the location must be None.
1098    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1099        ensure!(
1100            self.column_metadata.column_schema.is_nullable()
1101                || self
1102                    .column_metadata
1103                    .column_schema
1104                    .default_constraint()
1105                    .is_some(),
1106            InvalidRegionRequestSnafu {
1107                region_id: metadata.region_id,
1108                err: format!(
1109                    "no default value for column {}",
1110                    self.column_metadata.column_schema.name
1111                ),
1112            }
1113        );
1114
1115        if let Some(existing_column) =
1116            metadata.column_by_name(&self.column_metadata.column_schema.name)
1117        {
1118            // If the column already exists.
1119            ensure!(
1120                *existing_column == self.column_metadata,
1121                InvalidRegionRequestSnafu {
1122                    region_id: metadata.region_id,
1123                    err: format!(
1124                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1125                        self.column_metadata.column_schema.name,
1126                        existing_column,
1127                        self.column_metadata,
1128                    ),
1129                }
1130            );
1131            ensure!(
1132                self.location.is_none(),
1133                InvalidRegionRequestSnafu {
1134                    region_id: metadata.region_id,
1135                    err: format!(
1136                        "column {} already exists, but location is specified",
1137                        self.column_metadata.column_schema.name
1138                    ),
1139                }
1140            );
1141        }
1142
1143        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1144            // Ensures the existing column has the same name.
1145            ensure!(
1146                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1147                InvalidRegionRequestSnafu {
1148                    region_id: metadata.region_id,
1149                    err: format!(
1150                        "column id {} already exists with different name {}",
1151                        self.column_metadata.column_id, existing_column.column_schema.name
1152                    ),
1153                }
1154            );
1155        }
1156
1157        Ok(())
1158    }
1159
1160    /// Returns true if no column to add to the region.
1161    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1162        debug_assert!(self.validate(metadata).is_ok());
1163        metadata
1164            .column_by_name(&self.column_metadata.column_schema.name)
1165            .is_none()
1166    }
1167}
1168
1169impl TryFrom<v1::region::AddColumn> for AddColumn {
1170    type Error = MetadataError;
1171
1172    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1173        let column_def = add_column
1174            .column_def
1175            .context(InvalidRawRegionRequestSnafu {
1176                err: "missing column_def in AddColumn",
1177            })?;
1178
1179        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1180        let location = add_column
1181            .location
1182            .map(AddColumnLocation::try_from)
1183            .transpose()?;
1184
1185        Ok(AddColumn {
1186            column_metadata,
1187            location,
1188        })
1189    }
1190}
1191
1192/// Location to add a column.
1193#[derive(Debug, PartialEq, Eq, Clone)]
1194pub enum AddColumnLocation {
1195    /// Add the column to the first position of columns.
1196    First,
1197    /// Add the column after specific column.
1198    After {
1199        /// Add the column after this column.
1200        column_name: String,
1201    },
1202}
1203
1204impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1205    type Error = MetadataError;
1206
1207    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1208        let location_type = LocationType::try_from(location.location_type)
1209            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1210        let add_column_location = match location_type {
1211            LocationType::First => AddColumnLocation::First,
1212            LocationType::After => AddColumnLocation::After {
1213                column_name: location.after_column_name,
1214            },
1215        };
1216
1217        Ok(add_column_location)
1218    }
1219}
1220
1221/// Change a column's datatype.
1222#[derive(Debug, PartialEq, Eq, Clone)]
1223pub struct ModifyColumnType {
1224    /// Schema of the column to modify.
1225    pub column_name: String,
1226    /// Column will be changed to this type.
1227    pub target_type: ConcreteDataType,
1228}
1229
1230impl ModifyColumnType {
1231    /// Returns an error if the column's datatype to change is invalid.
1232    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1233        let column_meta = metadata
1234            .column_by_name(&self.column_name)
1235            .with_context(|| InvalidRegionRequestSnafu {
1236                region_id: metadata.region_id,
1237                err: format!("column {} not found", self.column_name),
1238            })?;
1239
1240        ensure!(
1241            matches!(column_meta.semantic_type, SemanticType::Field),
1242            InvalidRegionRequestSnafu {
1243                region_id: metadata.region_id,
1244                err: "'timestamp' or 'tag' column cannot change type".to_string()
1245            }
1246        );
1247        ensure!(
1248            column_meta
1249                .column_schema
1250                .data_type
1251                .can_arrow_type_cast_to(&self.target_type),
1252            InvalidRegionRequestSnafu {
1253                region_id: metadata.region_id,
1254                err: format!(
1255                    "column '{}' cannot be cast automatically to type '{}'",
1256                    self.column_name, self.target_type
1257                ),
1258            }
1259        );
1260
1261        Ok(())
1262    }
1263
1264    /// Returns true if no column's datatype to change to the region.
1265    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1266        debug_assert!(self.validate(metadata).is_ok());
1267        metadata.column_by_name(&self.column_name).is_some()
1268    }
1269}
1270
1271impl From<v1::ModifyColumnType> for ModifyColumnType {
1272    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1273        let target_type = ColumnDataTypeWrapper::new(
1274            modify_column_type.target_type(),
1275            modify_column_type.target_type_extension,
1276        )
1277        .into();
1278
1279        ModifyColumnType {
1280            column_name: modify_column_type.column_name,
1281            target_type,
1282        }
1283    }
1284}
1285
1286#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1287pub enum SetRegionOption {
1288    Ttl(Option<TimeToLive>),
1289    // Modifying TwscOptions with values as (option name, new value).
1290    Twsc(String, String),
1291    // Modifying the SST format.
1292    Format(String),
1293}
1294
1295impl TryFrom<&PbOption> for SetRegionOption {
1296    type Error = MetadataError;
1297
1298    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1299        let PbOption { key, value } = value;
1300        match key.as_str() {
1301            TTL_KEY => {
1302                let ttl = TimeToLive::from_humantime_or_str(value)
1303                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1304
1305                Ok(Self::Ttl(Some(ttl)))
1306            }
1307            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1308                Ok(Self::Twsc(key.clone(), value.clone()))
1309            }
1310            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1311            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1312        }
1313    }
1314}
1315
1316impl From<&UnsetRegionOption> for SetRegionOption {
1317    fn from(unset_option: &UnsetRegionOption) -> Self {
1318        match unset_option {
1319            UnsetRegionOption::TwcsTriggerFileNum => {
1320                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1321            }
1322            UnsetRegionOption::TwcsMaxOutputFileSize => {
1323                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1324            }
1325            UnsetRegionOption::TwcsTimeWindow => {
1326                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1327            }
1328            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1329        }
1330    }
1331}
1332
1333impl TryFrom<&str> for UnsetRegionOption {
1334    type Error = MetadataError;
1335
1336    fn try_from(key: &str) -> Result<Self> {
1337        match key.to_ascii_lowercase().as_str() {
1338            TTL_KEY => Ok(Self::Ttl),
1339            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1340            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1341            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1342            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1343        }
1344    }
1345}
1346
1347#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1348pub enum UnsetRegionOption {
1349    TwcsTriggerFileNum,
1350    TwcsMaxOutputFileSize,
1351    TwcsTimeWindow,
1352    Ttl,
1353}
1354
1355impl UnsetRegionOption {
1356    pub fn as_str(&self) -> &str {
1357        match self {
1358            Self::Ttl => TTL_KEY,
1359            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1360            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1361            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1362        }
1363    }
1364}
1365
1366impl Display for UnsetRegionOption {
1367    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1368        write!(f, "{}", self.as_str())
1369    }
1370}
1371
1372#[derive(Debug, Clone, Default)]
1373pub struct RegionFlushRequest {
1374    pub row_group_size: Option<usize>,
1375}
1376
1377#[derive(Debug)]
1378pub struct RegionCompactRequest {
1379    pub options: compact_request::Options,
1380    pub parallelism: Option<u32>,
1381}
1382
1383impl Default for RegionCompactRequest {
1384    fn default() -> Self {
1385        Self {
1386            // Default to regular compaction.
1387            options: compact_request::Options::Regular(Default::default()),
1388            parallelism: None,
1389        }
1390    }
1391}
1392
1393#[derive(Debug, Clone, Default)]
1394pub struct RegionBuildIndexRequest {}
1395
1396/// Truncate region request.
1397#[derive(Debug)]
1398pub enum RegionTruncateRequest {
1399    /// Truncate all data in the region.
1400    All,
1401    ByTimeRanges {
1402        /// Time ranges to truncate. Both bound are inclusive.
1403        /// only files that are fully contained in the time range will be truncated.
1404        /// so no guarantee that all data in the time range will be truncated.
1405        time_ranges: Vec<(Timestamp, Timestamp)>,
1406    },
1407}
1408
1409/// Catchup region request.
1410///
1411/// Makes a readonly region to catch up to leader region changes.
1412/// There is no effect if it operating on a leader region.
1413#[derive(Debug, Clone, Copy, Default)]
1414pub struct RegionCatchupRequest {
1415    /// Sets it to writable if it's available after it has caught up with all changes.
1416    pub set_writable: bool,
1417    /// The `entry_id` that was expected to reply to.
1418    /// `None` stands replaying to latest.
1419    pub entry_id: Option<entry::Id>,
1420    /// Used for metrics metadata region.
1421    /// The `entry_id` that was expected to reply to.
1422    /// `None` stands replaying to latest.
1423    pub metadata_entry_id: Option<entry::Id>,
1424    /// The hint for replaying memtable.
1425    pub location_id: Option<u64>,
1426    /// Replay checkpoint.
1427    pub checkpoint: Option<ReplayCheckpoint>,
1428}
1429
1430#[derive(Debug, Clone)]
1431pub struct RegionBulkInsertsRequest {
1432    pub region_id: RegionId,
1433    pub payload: DfRecordBatch,
1434    pub raw_data: ArrowIpc,
1435}
1436
1437impl RegionBulkInsertsRequest {
1438    pub fn estimated_size(&self) -> usize {
1439        self.payload.get_array_memory_size()
1440    }
1441}
1442
1443/// Request to stage a region with a new region rule(partition expression).
1444///
1445/// This request transitions a region into the staging mode.
1446/// It first flushes the memtable for the old region rule if it is not empty,
1447/// then enters the staging mode with the new region rule.
1448#[derive(Debug, Clone)]
1449pub struct EnterStagingRequest {
1450    /// The partition expression of the staging region.
1451    pub partition_expr: String,
1452}
1453
1454/// This request is used as part of the region repartition.
1455///
1456/// After a region has entered staging mode with a new region rule (partition
1457/// expression) and a separate process (for example, `remap_manifests`) has
1458/// generated the new file assignments for the staging region, this request
1459/// applies that generated manifest to the region.
1460///
1461/// In practice, this means:
1462/// - The `partition_expr` identifies the staging region rule that the manifest
1463///   was generated for.
1464/// - `central_region_id` specifies which region holds the staging blob storage
1465///   where the manifest was written during the `remap_manifests` operation.
1466/// - `manifest_path` is the relative path within the central region's staging
1467///   blob storage to fetch the generated manifest.
1468///
1469/// It should typically be called **after** the staging region has been
1470/// initialized by [`EnterStagingRequest`] and the new file layout has been
1471/// computed, to finalize the repartition operation.
1472#[derive(Debug, Clone)]
1473pub struct ApplyStagingManifestRequest {
1474    /// The partition expression of the staging region.
1475    pub partition_expr: String,
1476    /// The region that stores the staging manifests in its staging blob storage.
1477    pub central_region_id: RegionId,
1478    /// The relative path to the staging manifest within the central region's
1479    /// staging blob storage.
1480    pub manifest_path: String,
1481}
1482
1483impl fmt::Display for RegionRequest {
1484    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1485        match self {
1486            RegionRequest::Put(_) => write!(f, "Put"),
1487            RegionRequest::Delete(_) => write!(f, "Delete"),
1488            RegionRequest::Create(_) => write!(f, "Create"),
1489            RegionRequest::Drop(_) => write!(f, "Drop"),
1490            RegionRequest::Open(_) => write!(f, "Open"),
1491            RegionRequest::Close(_) => write!(f, "Close"),
1492            RegionRequest::Alter(_) => write!(f, "Alter"),
1493            RegionRequest::Flush(_) => write!(f, "Flush"),
1494            RegionRequest::Compact(_) => write!(f, "Compact"),
1495            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1496            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1497            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1498            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1499            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1500            RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1501        }
1502    }
1503}
1504
1505#[cfg(test)]
1506mod tests {
1507
1508    use api::v1::region::RegionColumnDef;
1509    use api::v1::{ColumnDataType, ColumnDef};
1510    use datatypes::prelude::ConcreteDataType;
1511    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1512
1513    use super::*;
1514    use crate::metadata::RegionMetadataBuilder;
1515
1516    #[test]
1517    fn test_from_proto_location() {
1518        let proto_location = v1::AddColumnLocation {
1519            location_type: LocationType::First as i32,
1520            after_column_name: String::default(),
1521        };
1522        let location = AddColumnLocation::try_from(proto_location).unwrap();
1523        assert_eq!(location, AddColumnLocation::First);
1524
1525        let proto_location = v1::AddColumnLocation {
1526            location_type: 10,
1527            after_column_name: String::default(),
1528        };
1529        AddColumnLocation::try_from(proto_location).unwrap_err();
1530
1531        let proto_location = v1::AddColumnLocation {
1532            location_type: LocationType::After as i32,
1533            after_column_name: "a".to_string(),
1534        };
1535        let location = AddColumnLocation::try_from(proto_location).unwrap();
1536        assert_eq!(
1537            location,
1538            AddColumnLocation::After {
1539                column_name: "a".to_string()
1540            }
1541        );
1542    }
1543
1544    #[test]
1545    fn test_from_none_proto_add_column() {
1546        AddColumn::try_from(v1::region::AddColumn {
1547            column_def: None,
1548            location: None,
1549        })
1550        .unwrap_err();
1551    }
1552
1553    #[test]
1554    fn test_from_proto_alter_request() {
1555        RegionAlterRequest::try_from(AlterRequest {
1556            region_id: 0,
1557            schema_version: 1,
1558            kind: None,
1559        })
1560        .unwrap_err();
1561
1562        let request = RegionAlterRequest::try_from(AlterRequest {
1563            region_id: 0,
1564            schema_version: 1,
1565            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1566                add_columns: vec![v1::region::AddColumn {
1567                    column_def: Some(RegionColumnDef {
1568                        column_def: Some(ColumnDef {
1569                            name: "a".to_string(),
1570                            data_type: ColumnDataType::String as i32,
1571                            is_nullable: true,
1572                            default_constraint: vec![],
1573                            semantic_type: SemanticType::Field as i32,
1574                            comment: String::new(),
1575                            ..Default::default()
1576                        }),
1577                        column_id: 1,
1578                    }),
1579                    location: Some(v1::AddColumnLocation {
1580                        location_type: LocationType::First as i32,
1581                        after_column_name: String::default(),
1582                    }),
1583                }],
1584            })),
1585        })
1586        .unwrap();
1587
1588        assert_eq!(
1589            request,
1590            RegionAlterRequest {
1591                kind: AlterKind::AddColumns {
1592                    columns: vec![AddColumn {
1593                        column_metadata: ColumnMetadata {
1594                            column_schema: ColumnSchema::new(
1595                                "a",
1596                                ConcreteDataType::string_datatype(),
1597                                true,
1598                            ),
1599                            semantic_type: SemanticType::Field,
1600                            column_id: 1,
1601                        },
1602                        location: Some(AddColumnLocation::First),
1603                    }]
1604                },
1605            }
1606        );
1607    }
1608
1609    /// Returns a new region metadata for testing. Metadata:
1610    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1611    fn new_metadata() -> RegionMetadata {
1612        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1613        builder
1614            .push_column_metadata(ColumnMetadata {
1615                column_schema: ColumnSchema::new(
1616                    "ts",
1617                    ConcreteDataType::timestamp_millisecond_datatype(),
1618                    false,
1619                ),
1620                semantic_type: SemanticType::Timestamp,
1621                column_id: 1,
1622            })
1623            .push_column_metadata(ColumnMetadata {
1624                column_schema: ColumnSchema::new(
1625                    "tag_0",
1626                    ConcreteDataType::string_datatype(),
1627                    true,
1628                ),
1629                semantic_type: SemanticType::Tag,
1630                column_id: 2,
1631            })
1632            .push_column_metadata(ColumnMetadata {
1633                column_schema: ColumnSchema::new(
1634                    "field_0",
1635                    ConcreteDataType::string_datatype(),
1636                    true,
1637                ),
1638                semantic_type: SemanticType::Field,
1639                column_id: 3,
1640            })
1641            .push_column_metadata(ColumnMetadata {
1642                column_schema: ColumnSchema::new(
1643                    "field_1",
1644                    ConcreteDataType::boolean_datatype(),
1645                    true,
1646                ),
1647                semantic_type: SemanticType::Field,
1648                column_id: 4,
1649            })
1650            .primary_key(vec![2]);
1651        builder.build().unwrap()
1652    }
1653
1654    #[test]
1655    fn test_add_column_validate() {
1656        let metadata = new_metadata();
1657        let add_column = AddColumn {
1658            column_metadata: ColumnMetadata {
1659                column_schema: ColumnSchema::new(
1660                    "tag_1",
1661                    ConcreteDataType::string_datatype(),
1662                    true,
1663                ),
1664                semantic_type: SemanticType::Tag,
1665                column_id: 5,
1666            },
1667            location: None,
1668        };
1669        add_column.validate(&metadata).unwrap();
1670        assert!(add_column.need_alter(&metadata));
1671
1672        // Add not null column.
1673        AddColumn {
1674            column_metadata: ColumnMetadata {
1675                column_schema: ColumnSchema::new(
1676                    "tag_1",
1677                    ConcreteDataType::string_datatype(),
1678                    false,
1679                ),
1680                semantic_type: SemanticType::Tag,
1681                column_id: 5,
1682            },
1683            location: None,
1684        }
1685        .validate(&metadata)
1686        .unwrap_err();
1687
1688        // Add existing column.
1689        let add_column = AddColumn {
1690            column_metadata: ColumnMetadata {
1691                column_schema: ColumnSchema::new(
1692                    "tag_0",
1693                    ConcreteDataType::string_datatype(),
1694                    true,
1695                ),
1696                semantic_type: SemanticType::Tag,
1697                column_id: 2,
1698            },
1699            location: None,
1700        };
1701        add_column.validate(&metadata).unwrap();
1702        assert!(!add_column.need_alter(&metadata));
1703    }
1704
1705    #[test]
1706    fn test_add_duplicate_columns() {
1707        let kind = AlterKind::AddColumns {
1708            columns: vec![
1709                AddColumn {
1710                    column_metadata: ColumnMetadata {
1711                        column_schema: ColumnSchema::new(
1712                            "tag_1",
1713                            ConcreteDataType::string_datatype(),
1714                            true,
1715                        ),
1716                        semantic_type: SemanticType::Tag,
1717                        column_id: 5,
1718                    },
1719                    location: None,
1720                },
1721                AddColumn {
1722                    column_metadata: ColumnMetadata {
1723                        column_schema: ColumnSchema::new(
1724                            "tag_1",
1725                            ConcreteDataType::string_datatype(),
1726                            true,
1727                        ),
1728                        semantic_type: SemanticType::Field,
1729                        column_id: 6,
1730                    },
1731                    location: None,
1732                },
1733            ],
1734        };
1735        let metadata = new_metadata();
1736        kind.validate(&metadata).unwrap();
1737        assert!(kind.need_alter(&metadata));
1738    }
1739
1740    #[test]
1741    fn test_add_existing_column_different_metadata() {
1742        let metadata = new_metadata();
1743
1744        // Add existing column with different id.
1745        let kind = AlterKind::AddColumns {
1746            columns: vec![AddColumn {
1747                column_metadata: ColumnMetadata {
1748                    column_schema: ColumnSchema::new(
1749                        "tag_0",
1750                        ConcreteDataType::string_datatype(),
1751                        true,
1752                    ),
1753                    semantic_type: SemanticType::Tag,
1754                    column_id: 4,
1755                },
1756                location: None,
1757            }],
1758        };
1759        kind.validate(&metadata).unwrap_err();
1760
1761        // Add existing column with different type.
1762        let kind = AlterKind::AddColumns {
1763            columns: vec![AddColumn {
1764                column_metadata: ColumnMetadata {
1765                    column_schema: ColumnSchema::new(
1766                        "tag_0",
1767                        ConcreteDataType::int64_datatype(),
1768                        true,
1769                    ),
1770                    semantic_type: SemanticType::Tag,
1771                    column_id: 2,
1772                },
1773                location: None,
1774            }],
1775        };
1776        kind.validate(&metadata).unwrap_err();
1777
1778        // Add existing column with different name.
1779        let kind = AlterKind::AddColumns {
1780            columns: vec![AddColumn {
1781                column_metadata: ColumnMetadata {
1782                    column_schema: ColumnSchema::new(
1783                        "tag_1",
1784                        ConcreteDataType::string_datatype(),
1785                        true,
1786                    ),
1787                    semantic_type: SemanticType::Tag,
1788                    column_id: 2,
1789                },
1790                location: None,
1791            }],
1792        };
1793        kind.validate(&metadata).unwrap_err();
1794    }
1795
1796    #[test]
1797    fn test_add_existing_column_with_location() {
1798        let metadata = new_metadata();
1799        let kind = AlterKind::AddColumns {
1800            columns: vec![AddColumn {
1801                column_metadata: ColumnMetadata {
1802                    column_schema: ColumnSchema::new(
1803                        "tag_0",
1804                        ConcreteDataType::string_datatype(),
1805                        true,
1806                    ),
1807                    semantic_type: SemanticType::Tag,
1808                    column_id: 2,
1809                },
1810                location: Some(AddColumnLocation::First),
1811            }],
1812        };
1813        kind.validate(&metadata).unwrap_err();
1814    }
1815
1816    #[test]
1817    fn test_validate_drop_column() {
1818        let metadata = new_metadata();
1819        let kind = AlterKind::DropColumns {
1820            names: vec!["xxxx".to_string()],
1821        };
1822        kind.validate(&metadata).unwrap();
1823        assert!(!kind.need_alter(&metadata));
1824
1825        AlterKind::DropColumns {
1826            names: vec!["tag_0".to_string()],
1827        }
1828        .validate(&metadata)
1829        .unwrap_err();
1830
1831        let kind = AlterKind::DropColumns {
1832            names: vec!["field_0".to_string()],
1833        };
1834        kind.validate(&metadata).unwrap();
1835        assert!(kind.need_alter(&metadata));
1836    }
1837
1838    #[test]
1839    fn test_validate_modify_column_type() {
1840        let metadata = new_metadata();
1841        AlterKind::ModifyColumnTypes {
1842            columns: vec![ModifyColumnType {
1843                column_name: "xxxx".to_string(),
1844                target_type: ConcreteDataType::string_datatype(),
1845            }],
1846        }
1847        .validate(&metadata)
1848        .unwrap_err();
1849
1850        AlterKind::ModifyColumnTypes {
1851            columns: vec![ModifyColumnType {
1852                column_name: "field_1".to_string(),
1853                target_type: ConcreteDataType::date_datatype(),
1854            }],
1855        }
1856        .validate(&metadata)
1857        .unwrap_err();
1858
1859        AlterKind::ModifyColumnTypes {
1860            columns: vec![ModifyColumnType {
1861                column_name: "ts".to_string(),
1862                target_type: ConcreteDataType::date_datatype(),
1863            }],
1864        }
1865        .validate(&metadata)
1866        .unwrap_err();
1867
1868        AlterKind::ModifyColumnTypes {
1869            columns: vec![ModifyColumnType {
1870                column_name: "tag_0".to_string(),
1871                target_type: ConcreteDataType::date_datatype(),
1872            }],
1873        }
1874        .validate(&metadata)
1875        .unwrap_err();
1876
1877        let kind = AlterKind::ModifyColumnTypes {
1878            columns: vec![ModifyColumnType {
1879                column_name: "field_0".to_string(),
1880                target_type: ConcreteDataType::int32_datatype(),
1881            }],
1882        };
1883        kind.validate(&metadata).unwrap();
1884        assert!(kind.need_alter(&metadata));
1885    }
1886
1887    #[test]
1888    fn test_validate_add_columns() {
1889        let kind = AlterKind::AddColumns {
1890            columns: vec![
1891                AddColumn {
1892                    column_metadata: ColumnMetadata {
1893                        column_schema: ColumnSchema::new(
1894                            "tag_1",
1895                            ConcreteDataType::string_datatype(),
1896                            true,
1897                        ),
1898                        semantic_type: SemanticType::Tag,
1899                        column_id: 5,
1900                    },
1901                    location: None,
1902                },
1903                AddColumn {
1904                    column_metadata: ColumnMetadata {
1905                        column_schema: ColumnSchema::new(
1906                            "field_2",
1907                            ConcreteDataType::string_datatype(),
1908                            true,
1909                        ),
1910                        semantic_type: SemanticType::Field,
1911                        column_id: 6,
1912                    },
1913                    location: None,
1914                },
1915            ],
1916        };
1917        let request = RegionAlterRequest { kind };
1918        let mut metadata = new_metadata();
1919        metadata.schema_version = 1;
1920        request.validate(&metadata).unwrap();
1921    }
1922
1923    #[test]
1924    fn test_validate_create_region() {
1925        let column_metadatas = vec![
1926            ColumnMetadata {
1927                column_schema: ColumnSchema::new(
1928                    "ts",
1929                    ConcreteDataType::timestamp_millisecond_datatype(),
1930                    false,
1931                ),
1932                semantic_type: SemanticType::Timestamp,
1933                column_id: 1,
1934            },
1935            ColumnMetadata {
1936                column_schema: ColumnSchema::new(
1937                    "tag_0",
1938                    ConcreteDataType::string_datatype(),
1939                    true,
1940                ),
1941                semantic_type: SemanticType::Tag,
1942                column_id: 2,
1943            },
1944            ColumnMetadata {
1945                column_schema: ColumnSchema::new(
1946                    "field_0",
1947                    ConcreteDataType::string_datatype(),
1948                    true,
1949                ),
1950                semantic_type: SemanticType::Field,
1951                column_id: 3,
1952            },
1953        ];
1954        let create = RegionCreateRequest {
1955            engine: "mito".to_string(),
1956            column_metadatas,
1957            primary_key: vec![3, 4],
1958            options: HashMap::new(),
1959            table_dir: "path".to_string(),
1960            path_type: PathType::Bare,
1961            partition_expr_json: Some("".to_string()),
1962        };
1963
1964        assert!(create.validate().is_err());
1965    }
1966
1967    #[test]
1968    fn test_validate_modify_column_fulltext_options() {
1969        let kind = AlterKind::SetIndexes {
1970            options: vec![SetIndexOption::Fulltext {
1971                column_name: "tag_0".to_string(),
1972                options: FulltextOptions::new_unchecked(
1973                    true,
1974                    FulltextAnalyzer::Chinese,
1975                    false,
1976                    FulltextBackend::Bloom,
1977                    1000,
1978                    0.01,
1979                ),
1980            }],
1981        };
1982        let request = RegionAlterRequest { kind };
1983        let mut metadata = new_metadata();
1984        metadata.schema_version = 1;
1985        request.validate(&metadata).unwrap();
1986
1987        let kind = AlterKind::UnsetIndexes {
1988            options: vec![UnsetIndexOption::Fulltext {
1989                column_name: "tag_0".to_string(),
1990            }],
1991        };
1992        let request = RegionAlterRequest { kind };
1993        let mut metadata = new_metadata();
1994        metadata.schema_version = 1;
1995        request.validate(&metadata).unwrap();
1996    }
1997
1998    #[test]
1999    fn test_validate_sync_columns() {
2000        let metadata = new_metadata();
2001        let kind = AlterKind::SyncColumns {
2002            column_metadatas: vec![
2003                ColumnMetadata {
2004                    column_schema: ColumnSchema::new(
2005                        "tag_1",
2006                        ConcreteDataType::string_datatype(),
2007                        true,
2008                    ),
2009                    semantic_type: SemanticType::Tag,
2010                    column_id: 5,
2011                },
2012                ColumnMetadata {
2013                    column_schema: ColumnSchema::new(
2014                        "field_2",
2015                        ConcreteDataType::string_datatype(),
2016                        true,
2017                    ),
2018                    semantic_type: SemanticType::Field,
2019                    column_id: 6,
2020                },
2021            ],
2022        };
2023        let err = kind.validate(&metadata).unwrap_err();
2024        assert!(err.to_string().contains("not a primary key"));
2025
2026        // Change the timestamp column name.
2027        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2028        let ts_column = column_metadatas_with_different_ts_column
2029            .iter_mut()
2030            .find(|c| c.semantic_type == SemanticType::Timestamp)
2031            .unwrap();
2032        ts_column.column_schema.name = "ts1".to_string();
2033
2034        let kind = AlterKind::SyncColumns {
2035            column_metadatas: column_metadatas_with_different_ts_column,
2036        };
2037        let err = kind.validate(&metadata).unwrap_err();
2038        assert!(
2039            err.to_string()
2040                .contains("timestamp column ts has different id")
2041        );
2042
2043        // Change the primary key column name.
2044        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2045        let pk_column = column_metadatas_with_different_pk_column
2046            .iter_mut()
2047            .find(|c| c.column_schema.name == "tag_0")
2048            .unwrap();
2049        pk_column.column_id = 100;
2050        let kind = AlterKind::SyncColumns {
2051            column_metadatas: column_metadatas_with_different_pk_column,
2052        };
2053        let err = kind.validate(&metadata).unwrap_err();
2054        assert!(
2055            err.to_string()
2056                .contains("column with same name tag_0 has different id")
2057        );
2058
2059        // Add a new field column.
2060        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2061        column_metadatas_with_new_field_column.push(ColumnMetadata {
2062            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2063            semantic_type: SemanticType::Field,
2064            column_id: 4,
2065        });
2066        let kind = AlterKind::SyncColumns {
2067            column_metadatas: column_metadatas_with_new_field_column,
2068        };
2069        kind.validate(&metadata).unwrap();
2070    }
2071
2072    #[test]
2073    fn test_cast_path_type_to_primitive() {
2074        assert_eq!(PathType::Bare as u8, 0);
2075        assert_eq!(PathType::Data as u8, 1);
2076        assert_eq!(PathType::Metadata as u8, 2);
2077        assert_eq!(
2078            PathType::try_from(PathType::Bare as u8).unwrap(),
2079            PathType::Bare
2080        );
2081        assert_eq!(
2082            PathType::try_from(PathType::Data as u8).unwrap(),
2083            PathType::Data
2084        );
2085        assert_eq!(
2086            PathType::try_from(PathType::Metadata as u8).unwrap(),
2087            PathType::Metadata
2088        );
2089    }
2090}