store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26    CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27    OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49    RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54    SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59/// The type of path to generate.
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63    /// A bare path - the original path of an engine.
64    ///
65    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
66    Bare,
67    /// A path for the data region of a metric engine table.
68    ///
69    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
70    Data,
71    /// A path for the metadata region of a metric engine table.
72    ///
73    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
74    Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79    Create(Vec<(RegionId, RegionCreateRequest)>),
80    Drop(Vec<(RegionId, RegionDropRequest)>),
81    Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
86    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87        match body {
88            region_request::Body::Creates(creates) => {
89                let requests = creates
90                    .requests
91                    .into_iter()
92                    .map(parse_region_create)
93                    .collect::<Result<Vec<_>>>()?;
94                Ok(Some(Self::Create(requests)))
95            }
96            region_request::Body::Drops(drops) => {
97                let requests = drops
98                    .requests
99                    .into_iter()
100                    .map(parse_region_drop)
101                    .collect::<Result<Vec<_>>>()?;
102                Ok(Some(Self::Drop(requests)))
103            }
104            region_request::Body::Alters(alters) => {
105                let requests = alters
106                    .requests
107                    .into_iter()
108                    .map(parse_region_alter)
109                    .collect::<Result<Vec<_>>>()?;
110                Ok(Some(Self::Alter(requests)))
111            }
112            _ => Ok(None),
113        }
114    }
115
116    pub fn request_type(&self) -> &'static str {
117        self.into()
118    }
119
120    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121        match self {
122            Self::Create(requests) => requests
123                .into_iter()
124                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125                .collect(),
126            Self::Drop(requests) => requests
127                .into_iter()
128                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129                .collect(),
130            Self::Alter(requests) => requests
131                .into_iter()
132                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133                .collect(),
134        }
135    }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140    Put(RegionPutRequest),
141    Delete(RegionDeleteRequest),
142    Create(RegionCreateRequest),
143    Drop(RegionDropRequest),
144    Open(RegionOpenRequest),
145    Close(RegionCloseRequest),
146    Alter(RegionAlterRequest),
147    Flush(RegionFlushRequest),
148    Compact(RegionCompactRequest),
149    BuildIndex(RegionBuildIndexRequest),
150    Truncate(RegionTruncateRequest),
151    Catchup(RegionCatchupRequest),
152    BulkInserts(RegionBulkInsertsRequest),
153}
154
155impl RegionRequest {
156    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
157    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
158    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
159        match body {
160            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
161            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
162            region_request::Body::Create(create) => make_region_create(create),
163            region_request::Body::Drop(drop) => make_region_drop(drop),
164            region_request::Body::Open(open) => make_region_open(open),
165            region_request::Body::Close(close) => make_region_close(close),
166            region_request::Body::Alter(alter) => make_region_alter(alter),
167            region_request::Body::Flush(flush) => make_region_flush(flush),
168            region_request::Body::Compact(compact) => make_region_compact(compact),
169            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
170            region_request::Body::Creates(creates) => make_region_creates(creates),
171            region_request::Body::Drops(drops) => make_region_drops(drops),
172            region_request::Body::Alters(alters) => make_region_alters(alters),
173            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
174            region_request::Body::Sync(_) => UnexpectedSnafu {
175                reason: "Sync request should be handled separately by RegionServer",
176            }
177            .fail(),
178            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
179                reason: "ListMetadata request should be handled separately by RegionServer",
180            }
181            .fail(),
182        }
183    }
184
185    /// Returns the type name of the request.
186    pub fn request_type(&self) -> &'static str {
187        self.into()
188    }
189}
190
191fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
192    let requests = inserts
193        .requests
194        .into_iter()
195        .filter_map(|r| {
196            let region_id = r.region_id.into();
197            r.rows.map(|rows| {
198                (
199                    region_id,
200                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
201                )
202            })
203        })
204        .collect();
205    Ok(requests)
206}
207
208fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
209    let requests = deletes
210        .requests
211        .into_iter()
212        .filter_map(|r| {
213            let region_id = r.region_id.into();
214            r.rows.map(|rows| {
215                (
216                    region_id,
217                    RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
218                )
219            })
220        })
221        .collect();
222    Ok(requests)
223}
224
225fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
226    let column_metadatas = create
227        .column_defs
228        .into_iter()
229        .map(ColumnMetadata::try_from_column_def)
230        .collect::<Result<Vec<_>>>()?;
231    let region_id = RegionId::from(create.region_id);
232    let table_dir = table_dir(&create.path, region_id.table_id());
233    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
234    Ok((
235        region_id,
236        RegionCreateRequest {
237            engine: create.engine,
238            column_metadatas,
239            primary_key: create.primary_key,
240            options: create.options,
241            table_dir,
242            path_type: PathType::Bare,
243            partition_expr_json,
244        },
245    ))
246}
247
248fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249    let (region_id, request) = parse_region_create(create)?;
250    Ok(vec![(region_id, RegionRequest::Create(request))])
251}
252
253fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254    let mut requests = Vec::with_capacity(creates.requests.len());
255    for create in creates.requests {
256        requests.extend(make_region_create(create)?);
257    }
258    Ok(requests)
259}
260
261fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
262    let region_id = drop.region_id.into();
263    Ok((
264        region_id,
265        RegionDropRequest {
266            fast_path: drop.fast_path,
267        },
268    ))
269}
270
271fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
272    let (region_id, request) = parse_region_drop(drop)?;
273    Ok(vec![(region_id, RegionRequest::Drop(request))])
274}
275
276fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
277    let mut requests = Vec::with_capacity(drops.requests.len());
278    for drop in drops.requests {
279        requests.extend(make_region_drop(drop)?);
280    }
281    Ok(requests)
282}
283
284fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285    let region_id = RegionId::from(open.region_id);
286    let table_dir = table_dir(&open.path, region_id.table_id());
287    Ok(vec![(
288        region_id,
289        RegionRequest::Open(RegionOpenRequest {
290            engine: open.engine,
291            table_dir,
292            path_type: PathType::Bare,
293            options: open.options,
294            skip_wal_replay: false,
295            checkpoint: None,
296        }),
297    )])
298}
299
300fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
301    let region_id = close.region_id.into();
302    Ok(vec![(
303        region_id,
304        RegionRequest::Close(RegionCloseRequest {}),
305    )])
306}
307
308fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
309    let region_id = alter.region_id.into();
310    let request = RegionAlterRequest::try_from(alter)?;
311    Ok((region_id, request))
312}
313
314fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
315    let (region_id, request) = parse_region_alter(alter)?;
316    Ok(vec![(region_id, RegionRequest::Alter(request))])
317}
318
319fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
320    let mut requests = Vec::with_capacity(alters.requests.len());
321    for alter in alters.requests {
322        requests.extend(make_region_alter(alter)?);
323    }
324    Ok(requests)
325}
326
327fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328    let region_id = flush.region_id.into();
329    Ok(vec![(
330        region_id,
331        RegionRequest::Flush(RegionFlushRequest {
332            row_group_size: None,
333        }),
334    )])
335}
336
337fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
338    let region_id = compact.region_id.into();
339    let options = compact
340        .options
341        .unwrap_or(compact_request::Options::Regular(Default::default()));
342    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
343    let parallelism = if compact.parallelism == 0 {
344        None
345    } else {
346        Some(compact.parallelism)
347    };
348    Ok(vec![(
349        region_id,
350        RegionRequest::Compact(RegionCompactRequest {
351            options,
352            parallelism,
353        }),
354    )])
355}
356
357fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
358    let region_id = truncate.region_id.into();
359    match truncate.kind {
360        None => InvalidRawRegionRequestSnafu {
361            err: "missing kind in TruncateRequest".to_string(),
362        }
363        .fail(),
364        Some(truncate_request::Kind::All(_)) => Ok(vec![(
365            region_id,
366            RegionRequest::Truncate(RegionTruncateRequest::All),
367        )]),
368        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
369            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
370
371            Ok(vec![(
372                region_id,
373                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
374            )])
375        }
376    }
377}
378
379/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
380fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
381    let region_id = request.region_id.into();
382    let Some(Body::ArrowIpc(request)) = request.body else {
383        return Ok(vec![]);
384    };
385
386    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
387        .with_label_values(&["decode"])
388        .start_timer();
389    let mut decoder =
390        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
391    let payload = decoder
392        .try_decode_record_batch(&request.data_header, &request.payload)
393        .context(FlightCodecSnafu)?;
394    decoder_timer.observe_duration();
395    Ok(vec![(
396        region_id,
397        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
398            region_id,
399            payload,
400            raw_data: request,
401        }),
402    )])
403}
404
405/// Request to put data into a region.
406#[derive(Debug)]
407pub struct RegionPutRequest {
408    /// Rows to put.
409    pub rows: Rows,
410    /// Write hint.
411    pub hint: Option<WriteHint>,
412}
413
414#[derive(Debug)]
415pub struct RegionReadRequest {
416    pub request: ScanRequest,
417}
418
419/// Request to delete data from a region.
420#[derive(Debug)]
421pub struct RegionDeleteRequest {
422    /// Keys to rows to delete.
423    ///
424    /// Each row only contains primary key columns and a time index column.
425    pub rows: Rows,
426    /// Write hint.
427    pub hint: Option<WriteHint>,
428}
429
430#[derive(Debug, Clone)]
431pub struct RegionCreateRequest {
432    /// Region engine name
433    pub engine: String,
434    /// Columns in this region.
435    pub column_metadatas: Vec<ColumnMetadata>,
436    /// Columns in the primary key.
437    pub primary_key: Vec<ColumnId>,
438    /// Options of the created region.
439    pub options: HashMap<String, String>,
440    /// Directory for table's data home. Usually is composed by catalog and table id
441    pub table_dir: String,
442    /// Path type for generating paths
443    pub path_type: PathType,
444    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
445    /// `Option` to keep compatibility with old clients.
446    pub partition_expr_json: Option<String>,
447}
448
449impl RegionCreateRequest {
450    /// Checks whether the request is valid, returns an error if it is invalid.
451    pub fn validate(&self) -> Result<()> {
452        // time index must exist
453        ensure!(
454            self.column_metadatas
455                .iter()
456                .any(|x| x.semantic_type == SemanticType::Timestamp),
457            InvalidRegionRequestSnafu {
458                region_id: RegionId::new(0, 0),
459                err: "missing timestamp column in create region request".to_string(),
460            }
461        );
462
463        // build column id to indices
464        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
465        for (i, c) in self.column_metadatas.iter().enumerate() {
466            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
467                return InvalidRegionRequestSnafu {
468                    region_id: RegionId::new(0, 0),
469                    err: format!(
470                        "duplicate column id {} (at position {} and {}) in create region request",
471                        c.column_id, previous, i
472                    ),
473                }
474                .fail();
475            }
476        }
477
478        // primary key must exist
479        for column_id in &self.primary_key {
480            ensure!(
481                column_id_to_indices.contains_key(column_id),
482                InvalidRegionRequestSnafu {
483                    region_id: RegionId::new(0, 0),
484                    err: format!(
485                        "missing primary key column {} in create region request",
486                        column_id
487                    ),
488                }
489            );
490        }
491
492        Ok(())
493    }
494
495    /// Returns true when the region belongs to the metric engine's physical table.
496    pub fn is_physical_table(&self) -> bool {
497        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
498    }
499}
500
501#[derive(Debug, Clone)]
502pub struct RegionDropRequest {
503    pub fast_path: bool,
504}
505
506/// Open region request.
507#[derive(Debug, Clone)]
508pub struct RegionOpenRequest {
509    /// Region engine name
510    pub engine: String,
511    /// Directory for table's data home. Usually is composed by catalog and table id
512    pub table_dir: String,
513    /// Path type for generating paths
514    pub path_type: PathType,
515    /// Options of the opened region.
516    pub options: HashMap<String, String>,
517    /// To skip replaying the WAL.
518    pub skip_wal_replay: bool,
519    /// Replay checkpoint.
520    pub checkpoint: Option<ReplayCheckpoint>,
521}
522
523#[derive(Debug, Clone, Copy, PartialEq, Eq)]
524pub struct ReplayCheckpoint {
525    pub entry_id: u64,
526    pub metadata_entry_id: Option<u64>,
527}
528
529impl RegionOpenRequest {
530    /// Returns true when the region belongs to the metric engine's physical table.
531    pub fn is_physical_table(&self) -> bool {
532        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
533    }
534}
535
536/// Close region request.
537#[derive(Debug)]
538pub struct RegionCloseRequest {}
539
540/// Alter metadata of a region.
541#[derive(Debug, PartialEq, Eq, Clone)]
542pub struct RegionAlterRequest {
543    /// Kind of alteration to do.
544    pub kind: AlterKind,
545}
546
547impl RegionAlterRequest {
548    /// Checks whether the request is valid, returns an error if it is invalid.
549    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
550        self.kind.validate(metadata)?;
551
552        Ok(())
553    }
554
555    /// Returns true if we need to apply the request to the region.
556    ///
557    /// The `request` should be valid.
558    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
559        debug_assert!(self.validate(metadata).is_ok());
560        self.kind.need_alter(metadata)
561    }
562}
563
564impl TryFrom<AlterRequest> for RegionAlterRequest {
565    type Error = MetadataError;
566
567    fn try_from(value: AlterRequest) -> Result<Self> {
568        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
569            err: "missing kind in AlterRequest",
570        })?;
571
572        let kind = AlterKind::try_from(kind)?;
573        Ok(RegionAlterRequest { kind })
574    }
575}
576
577/// Kind of the alteration.
578#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
579pub enum AlterKind {
580    /// Add columns to the region.
581    AddColumns {
582        /// Columns to add.
583        columns: Vec<AddColumn>,
584    },
585    /// Drop columns from the region, only fields are allowed to drop.
586    DropColumns {
587        /// Name of columns to drop.
588        names: Vec<String>,
589    },
590    /// Change columns datatype form the region, only fields are allowed to change.
591    ModifyColumnTypes {
592        /// Columns to change.
593        columns: Vec<ModifyColumnType>,
594    },
595    /// Set region options.
596    SetRegionOptions { options: Vec<SetRegionOption> },
597    /// Unset region options.
598    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
599    /// Set index options.
600    SetIndexes { options: Vec<SetIndexOption> },
601    /// Unset index options.
602    UnsetIndexes { options: Vec<UnsetIndexOption> },
603    /// Drop column default value.
604    DropDefaults {
605        /// Name of columns to drop.
606        names: Vec<String>,
607    },
608    /// Set column default value.
609    SetDefaults {
610        /// Columns to change.
611        columns: Vec<SetDefault>,
612    },
613    /// Sync column metadatas.
614    SyncColumns {
615        column_metadatas: Vec<ColumnMetadata>,
616    },
617}
618#[derive(Debug, PartialEq, Eq, Clone)]
619pub struct SetDefault {
620    pub name: String,
621    pub default_constraint: Vec<u8>,
622}
623
624#[derive(Debug, PartialEq, Eq, Clone)]
625pub enum SetIndexOption {
626    Fulltext {
627        column_name: String,
628        options: FulltextOptions,
629    },
630    Inverted {
631        column_name: String,
632    },
633    Skipping {
634        column_name: String,
635        options: SkippingIndexOptions,
636    },
637}
638
639impl SetIndexOption {
640    /// Returns the column name of the index option.
641    pub fn column_name(&self) -> &String {
642        match self {
643            SetIndexOption::Fulltext { column_name, .. } => column_name,
644            SetIndexOption::Inverted { column_name } => column_name,
645            SetIndexOption::Skipping { column_name, .. } => column_name,
646        }
647    }
648
649    /// Returns true if the index option is fulltext.
650    pub fn is_fulltext(&self) -> bool {
651        match self {
652            SetIndexOption::Fulltext { .. } => true,
653            SetIndexOption::Inverted { .. } => false,
654            SetIndexOption::Skipping { .. } => false,
655        }
656    }
657}
658
659impl TryFrom<v1::SetIndex> for SetIndexOption {
660    type Error = MetadataError;
661
662    fn try_from(value: v1::SetIndex) -> Result<Self> {
663        let option = value.options.context(InvalidRawRegionRequestSnafu {
664            err: "missing options in SetIndex",
665        })?;
666
667        let opt = match option {
668            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
669                column_name: x.column_name.clone(),
670                options: FulltextOptions::new(
671                    x.enable,
672                    as_fulltext_option_analyzer(
673                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
674                    ),
675                    x.case_sensitive,
676                    as_fulltext_option_backend(
677                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
678                    ),
679                    x.granularity as u32,
680                    x.false_positive_rate,
681                )
682                .context(InvalidIndexOptionSnafu)?,
683            },
684            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
685                column_name: i.column_name,
686            },
687            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
688                column_name: s.column_name,
689                options: SkippingIndexOptions::new(
690                    s.granularity as u32,
691                    s.false_positive_rate,
692                    as_skipping_index_type(
693                        PbSkippingIndexType::try_from(s.skipping_index_type)
694                            .context(DecodeProtoSnafu)?,
695                    ),
696                )
697                .context(InvalidIndexOptionSnafu)?,
698            },
699        };
700
701        Ok(opt)
702    }
703}
704
705#[derive(Debug, PartialEq, Eq, Clone)]
706pub enum UnsetIndexOption {
707    Fulltext { column_name: String },
708    Inverted { column_name: String },
709    Skipping { column_name: String },
710}
711
712impl UnsetIndexOption {
713    pub fn column_name(&self) -> &String {
714        match self {
715            UnsetIndexOption::Fulltext { column_name } => column_name,
716            UnsetIndexOption::Inverted { column_name } => column_name,
717            UnsetIndexOption::Skipping { column_name } => column_name,
718        }
719    }
720
721    pub fn is_fulltext(&self) -> bool {
722        match self {
723            UnsetIndexOption::Fulltext { .. } => true,
724            UnsetIndexOption::Inverted { .. } => false,
725            UnsetIndexOption::Skipping { .. } => false,
726        }
727    }
728}
729
730impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
731    type Error = MetadataError;
732
733    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
734        let option = value.options.context(InvalidRawRegionRequestSnafu {
735            err: "missing options in UnsetIndex",
736        })?;
737
738        let opt = match option {
739            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
740                column_name: f.column_name,
741            },
742            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
743                column_name: i.column_name,
744            },
745            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
746                column_name: s.column_name,
747            },
748        };
749
750        Ok(opt)
751    }
752}
753
754impl AlterKind {
755    /// Returns an error if the alter kind is invalid.
756    ///
757    /// It allows adding column if not exists and dropping column if exists.
758    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
759        match self {
760            AlterKind::AddColumns { columns } => {
761                for col_to_add in columns {
762                    col_to_add.validate(metadata)?;
763                }
764            }
765            AlterKind::DropColumns { names } => {
766                for name in names {
767                    Self::validate_column_to_drop(name, metadata)?;
768                }
769            }
770            AlterKind::ModifyColumnTypes { columns } => {
771                for col_to_change in columns {
772                    col_to_change.validate(metadata)?;
773                }
774            }
775            AlterKind::SetRegionOptions { .. } => {}
776            AlterKind::UnsetRegionOptions { .. } => {}
777            AlterKind::SetIndexes { options } => {
778                for option in options {
779                    Self::validate_column_alter_index_option(
780                        option.column_name(),
781                        metadata,
782                        option.is_fulltext(),
783                    )?;
784                }
785            }
786            AlterKind::UnsetIndexes { options } => {
787                for option in options {
788                    Self::validate_column_alter_index_option(
789                        option.column_name(),
790                        metadata,
791                        option.is_fulltext(),
792                    )?;
793                }
794            }
795            AlterKind::DropDefaults { names } => {
796                names
797                    .iter()
798                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
799            }
800            AlterKind::SetDefaults { columns } => {
801                columns
802                    .iter()
803                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
804            }
805            AlterKind::SyncColumns { column_metadatas } => {
806                let new_primary_keys = column_metadatas
807                    .iter()
808                    .filter(|c| c.semantic_type == SemanticType::Tag)
809                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
810                    .collect::<HashMap<_, _>>();
811
812                let old_primary_keys = metadata
813                    .column_metadatas
814                    .iter()
815                    .filter(|c| c.semantic_type == SemanticType::Tag)
816                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
817
818                for (name, id) in old_primary_keys {
819                    let primary_key =
820                        new_primary_keys
821                            .get(name)
822                            .with_context(|| InvalidRegionRequestSnafu {
823                                region_id: metadata.region_id,
824                                err: format!("column {} is not a primary key", name),
825                            })?;
826
827                    ensure!(
828                        *primary_key == id,
829                        InvalidRegionRequestSnafu {
830                            region_id: metadata.region_id,
831                            err: format!(
832                                "column with same name {} has different id, existing: {}, got: {}",
833                                name, id, primary_key
834                            ),
835                        }
836                    );
837                }
838
839                let new_ts_column = column_metadatas
840                    .iter()
841                    .find(|c| c.semantic_type == SemanticType::Timestamp)
842                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
843                    .context(InvalidRegionRequestSnafu {
844                        region_id: metadata.region_id,
845                        err: "timestamp column not found",
846                    })?;
847
848                // Safety: timestamp column must exist.
849                let old_ts_column = metadata
850                    .column_metadatas
851                    .iter()
852                    .find(|c| c.semantic_type == SemanticType::Timestamp)
853                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
854                    .unwrap();
855
856                ensure!(
857                    new_ts_column == old_ts_column,
858                    InvalidRegionRequestSnafu {
859                        region_id: metadata.region_id,
860                        err: format!(
861                            "timestamp column {} has different id, existing: {}, got: {}",
862                            old_ts_column.0, old_ts_column.1, new_ts_column.1
863                        ),
864                    }
865                );
866            }
867        }
868        Ok(())
869    }
870
871    /// Returns true if we need to apply the alteration to the region.
872    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
873        debug_assert!(self.validate(metadata).is_ok());
874        match self {
875            AlterKind::AddColumns { columns } => columns
876                .iter()
877                .any(|col_to_add| col_to_add.need_alter(metadata)),
878            AlterKind::DropColumns { names } => names
879                .iter()
880                .any(|name| metadata.column_by_name(name).is_some()),
881            AlterKind::ModifyColumnTypes { columns } => columns
882                .iter()
883                .any(|col_to_change| col_to_change.need_alter(metadata)),
884            AlterKind::SetRegionOptions { .. } => true,
885            AlterKind::UnsetRegionOptions { .. } => true,
886            AlterKind::SetIndexes { options, .. } => options
887                .iter()
888                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
889            AlterKind::UnsetIndexes { options } => options
890                .iter()
891                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
892            AlterKind::DropDefaults { names } => names
893                .iter()
894                .any(|name| metadata.column_by_name(name).is_some()),
895
896            AlterKind::SetDefaults { columns } => columns
897                .iter()
898                .any(|x| metadata.column_by_name(&x.name).is_some()),
899            AlterKind::SyncColumns { column_metadatas } => {
900                metadata.column_metadatas != *column_metadatas
901            }
902        }
903    }
904
905    /// Returns an error if the column to drop is invalid.
906    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
907        let Some(column) = metadata.column_by_name(name) else {
908            return Ok(());
909        };
910        ensure!(
911            column.semantic_type == SemanticType::Field,
912            InvalidRegionRequestSnafu {
913                region_id: metadata.region_id,
914                err: format!("column {} is not a field and could not be dropped", name),
915            }
916        );
917        Ok(())
918    }
919
920    /// Returns an error if the column's alter index option is invalid.
921    fn validate_column_alter_index_option(
922        column_name: &String,
923        metadata: &RegionMetadata,
924        is_fulltext: bool,
925    ) -> Result<()> {
926        let column = metadata
927            .column_by_name(column_name)
928            .context(InvalidRegionRequestSnafu {
929                region_id: metadata.region_id,
930                err: format!("column {} not found", column_name),
931            })?;
932
933        if is_fulltext {
934            ensure!(
935                column.column_schema.data_type.is_string(),
936                InvalidRegionRequestSnafu {
937                    region_id: metadata.region_id,
938                    err: format!(
939                        "cannot change alter index options for non-string column {}",
940                        column_name
941                    ),
942                }
943            );
944        }
945
946        Ok(())
947    }
948
949    /// Returns an error if the column isn't exist.
950    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
951        metadata
952            .column_by_name(column_name)
953            .context(InvalidRegionRequestSnafu {
954                region_id: metadata.region_id,
955                err: format!("column {} not found", column_name),
956            })?;
957
958        Ok(())
959    }
960}
961
962impl TryFrom<alter_request::Kind> for AlterKind {
963    type Error = MetadataError;
964
965    fn try_from(kind: alter_request::Kind) -> Result<Self> {
966        let alter_kind = match kind {
967            alter_request::Kind::AddColumns(x) => {
968                let columns = x
969                    .add_columns
970                    .into_iter()
971                    .map(|x| x.try_into())
972                    .collect::<Result<Vec<_>>>()?;
973                AlterKind::AddColumns { columns }
974            }
975            alter_request::Kind::ModifyColumnTypes(x) => {
976                let columns = x
977                    .modify_column_types
978                    .into_iter()
979                    .map(|x| x.into())
980                    .collect::<Vec<_>>();
981                AlterKind::ModifyColumnTypes { columns }
982            }
983            alter_request::Kind::DropColumns(x) => {
984                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
985                AlterKind::DropColumns { names }
986            }
987            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
988                options: options
989                    .table_options
990                    .iter()
991                    .map(TryFrom::try_from)
992                    .collect::<Result<Vec<_>>>()?,
993            },
994            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
995                keys: options
996                    .keys
997                    .iter()
998                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
999                    .collect::<Result<Vec<_>>>()?,
1000            },
1001            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1002                options: vec![SetIndexOption::try_from(o)?],
1003            },
1004            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1005                options: vec![UnsetIndexOption::try_from(o)?],
1006            },
1007            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1008                options: o
1009                    .set_indexes
1010                    .into_iter()
1011                    .map(SetIndexOption::try_from)
1012                    .collect::<Result<Vec<_>>>()?,
1013            },
1014            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1015                options: o
1016                    .unset_indexes
1017                    .into_iter()
1018                    .map(UnsetIndexOption::try_from)
1019                    .collect::<Result<Vec<_>>>()?,
1020            },
1021            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1022                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1023            },
1024            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1025                columns: x
1026                    .set_defaults
1027                    .into_iter()
1028                    .map(|x| {
1029                        Ok(SetDefault {
1030                            name: x.column_name,
1031                            default_constraint: x.default_constraint.clone(),
1032                        })
1033                    })
1034                    .collect::<Result<Vec<_>>>()?,
1035            },
1036            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1037                column_metadatas: x
1038                    .column_defs
1039                    .into_iter()
1040                    .map(ColumnMetadata::try_from_column_def)
1041                    .collect::<Result<Vec<_>>>()?,
1042            },
1043        };
1044
1045        Ok(alter_kind)
1046    }
1047}
1048
1049/// Adds a column.
1050#[derive(Debug, PartialEq, Eq, Clone)]
1051pub struct AddColumn {
1052    /// Metadata of the column to add.
1053    pub column_metadata: ColumnMetadata,
1054    /// Location to add the column. If location is None, the region adds
1055    /// the column to the last.
1056    pub location: Option<AddColumnLocation>,
1057}
1058
1059impl AddColumn {
1060    /// Returns an error if the column to add is invalid.
1061    ///
1062    /// It allows adding existing columns. However, the existing column must have the same metadata
1063    /// and the location must be None.
1064    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1065        ensure!(
1066            self.column_metadata.column_schema.is_nullable()
1067                || self
1068                    .column_metadata
1069                    .column_schema
1070                    .default_constraint()
1071                    .is_some(),
1072            InvalidRegionRequestSnafu {
1073                region_id: metadata.region_id,
1074                err: format!(
1075                    "no default value for column {}",
1076                    self.column_metadata.column_schema.name
1077                ),
1078            }
1079        );
1080
1081        if let Some(existing_column) =
1082            metadata.column_by_name(&self.column_metadata.column_schema.name)
1083        {
1084            // If the column already exists.
1085            ensure!(
1086                *existing_column == self.column_metadata,
1087                InvalidRegionRequestSnafu {
1088                    region_id: metadata.region_id,
1089                    err: format!(
1090                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1091                        self.column_metadata.column_schema.name,
1092                        existing_column,
1093                        self.column_metadata,
1094                    ),
1095                }
1096            );
1097            ensure!(
1098                self.location.is_none(),
1099                InvalidRegionRequestSnafu {
1100                    region_id: metadata.region_id,
1101                    err: format!(
1102                        "column {} already exists, but location is specified",
1103                        self.column_metadata.column_schema.name
1104                    ),
1105                }
1106            );
1107        }
1108
1109        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1110            // Ensures the existing column has the same name.
1111            ensure!(
1112                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1113                InvalidRegionRequestSnafu {
1114                    region_id: metadata.region_id,
1115                    err: format!(
1116                        "column id {} already exists with different name {}",
1117                        self.column_metadata.column_id, existing_column.column_schema.name
1118                    ),
1119                }
1120            );
1121        }
1122
1123        Ok(())
1124    }
1125
1126    /// Returns true if no column to add to the region.
1127    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1128        debug_assert!(self.validate(metadata).is_ok());
1129        metadata
1130            .column_by_name(&self.column_metadata.column_schema.name)
1131            .is_none()
1132    }
1133}
1134
1135impl TryFrom<v1::region::AddColumn> for AddColumn {
1136    type Error = MetadataError;
1137
1138    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1139        let column_def = add_column
1140            .column_def
1141            .context(InvalidRawRegionRequestSnafu {
1142                err: "missing column_def in AddColumn",
1143            })?;
1144
1145        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1146        let location = add_column
1147            .location
1148            .map(AddColumnLocation::try_from)
1149            .transpose()?;
1150
1151        Ok(AddColumn {
1152            column_metadata,
1153            location,
1154        })
1155    }
1156}
1157
1158/// Location to add a column.
1159#[derive(Debug, PartialEq, Eq, Clone)]
1160pub enum AddColumnLocation {
1161    /// Add the column to the first position of columns.
1162    First,
1163    /// Add the column after specific column.
1164    After {
1165        /// Add the column after this column.
1166        column_name: String,
1167    },
1168}
1169
1170impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1171    type Error = MetadataError;
1172
1173    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1174        let location_type = LocationType::try_from(location.location_type)
1175            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1176        let add_column_location = match location_type {
1177            LocationType::First => AddColumnLocation::First,
1178            LocationType::After => AddColumnLocation::After {
1179                column_name: location.after_column_name,
1180            },
1181        };
1182
1183        Ok(add_column_location)
1184    }
1185}
1186
1187/// Change a column's datatype.
1188#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct ModifyColumnType {
1190    /// Schema of the column to modify.
1191    pub column_name: String,
1192    /// Column will be changed to this type.
1193    pub target_type: ConcreteDataType,
1194}
1195
1196impl ModifyColumnType {
1197    /// Returns an error if the column's datatype to change is invalid.
1198    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1199        let column_meta = metadata
1200            .column_by_name(&self.column_name)
1201            .with_context(|| InvalidRegionRequestSnafu {
1202                region_id: metadata.region_id,
1203                err: format!("column {} not found", self.column_name),
1204            })?;
1205
1206        ensure!(
1207            matches!(column_meta.semantic_type, SemanticType::Field),
1208            InvalidRegionRequestSnafu {
1209                region_id: metadata.region_id,
1210                err: "'timestamp' or 'tag' column cannot change type".to_string()
1211            }
1212        );
1213        ensure!(
1214            column_meta
1215                .column_schema
1216                .data_type
1217                .can_arrow_type_cast_to(&self.target_type),
1218            InvalidRegionRequestSnafu {
1219                region_id: metadata.region_id,
1220                err: format!(
1221                    "column '{}' cannot be cast automatically to type '{}'",
1222                    self.column_name, self.target_type
1223                ),
1224            }
1225        );
1226
1227        Ok(())
1228    }
1229
1230    /// Returns true if no column's datatype to change to the region.
1231    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1232        debug_assert!(self.validate(metadata).is_ok());
1233        metadata.column_by_name(&self.column_name).is_some()
1234    }
1235}
1236
1237impl From<v1::ModifyColumnType> for ModifyColumnType {
1238    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1239        let target_type = ColumnDataTypeWrapper::new(
1240            modify_column_type.target_type(),
1241            modify_column_type.target_type_extension,
1242        )
1243        .into();
1244
1245        ModifyColumnType {
1246            column_name: modify_column_type.column_name,
1247            target_type,
1248        }
1249    }
1250}
1251
1252#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1253pub enum SetRegionOption {
1254    Ttl(Option<TimeToLive>),
1255    // Modifying TwscOptions with values as (option name, new value).
1256    Twsc(String, String),
1257    // Modifying the SST format.
1258    Format(String),
1259}
1260
1261impl TryFrom<&PbOption> for SetRegionOption {
1262    type Error = MetadataError;
1263
1264    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1265        let PbOption { key, value } = value;
1266        match key.as_str() {
1267            TTL_KEY => {
1268                let ttl = TimeToLive::from_humantime_or_str(value)
1269                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1270
1271                Ok(Self::Ttl(Some(ttl)))
1272            }
1273            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1274                Ok(Self::Twsc(key.clone(), value.clone()))
1275            }
1276            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1277            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1278        }
1279    }
1280}
1281
1282impl From<&UnsetRegionOption> for SetRegionOption {
1283    fn from(unset_option: &UnsetRegionOption) -> Self {
1284        match unset_option {
1285            UnsetRegionOption::TwcsTriggerFileNum => {
1286                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1287            }
1288            UnsetRegionOption::TwcsMaxOutputFileSize => {
1289                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1290            }
1291            UnsetRegionOption::TwcsTimeWindow => {
1292                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1293            }
1294            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1295        }
1296    }
1297}
1298
1299impl TryFrom<&str> for UnsetRegionOption {
1300    type Error = MetadataError;
1301
1302    fn try_from(key: &str) -> Result<Self> {
1303        match key.to_ascii_lowercase().as_str() {
1304            TTL_KEY => Ok(Self::Ttl),
1305            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1306            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1307            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1308            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1309        }
1310    }
1311}
1312
1313#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1314pub enum UnsetRegionOption {
1315    TwcsTriggerFileNum,
1316    TwcsMaxOutputFileSize,
1317    TwcsTimeWindow,
1318    Ttl,
1319}
1320
1321impl UnsetRegionOption {
1322    pub fn as_str(&self) -> &str {
1323        match self {
1324            Self::Ttl => TTL_KEY,
1325            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1326            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1327            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1328        }
1329    }
1330}
1331
1332impl Display for UnsetRegionOption {
1333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1334        write!(f, "{}", self.as_str())
1335    }
1336}
1337
1338#[derive(Debug, Clone, Default)]
1339pub struct RegionFlushRequest {
1340    pub row_group_size: Option<usize>,
1341}
1342
1343#[derive(Debug)]
1344pub struct RegionCompactRequest {
1345    pub options: compact_request::Options,
1346    pub parallelism: Option<u32>,
1347}
1348
1349impl Default for RegionCompactRequest {
1350    fn default() -> Self {
1351        Self {
1352            // Default to regular compaction.
1353            options: compact_request::Options::Regular(Default::default()),
1354            parallelism: None,
1355        }
1356    }
1357}
1358
1359#[derive(Debug, Clone, Default)]
1360pub struct RegionBuildIndexRequest {}
1361
1362/// Truncate region request.
1363#[derive(Debug)]
1364pub enum RegionTruncateRequest {
1365    /// Truncate all data in the region.
1366    All,
1367    ByTimeRanges {
1368        /// Time ranges to truncate. Both bound are inclusive.
1369        /// only files that are fully contained in the time range will be truncated.
1370        /// so no guarantee that all data in the time range will be truncated.
1371        time_ranges: Vec<(Timestamp, Timestamp)>,
1372    },
1373}
1374
1375/// Catchup region request.
1376///
1377/// Makes a readonly region to catch up to leader region changes.
1378/// There is no effect if it operating on a leader region.
1379#[derive(Debug, Clone, Copy, Default)]
1380pub struct RegionCatchupRequest {
1381    /// Sets it to writable if it's available after it has caught up with all changes.
1382    pub set_writable: bool,
1383    /// The `entry_id` that was expected to reply to.
1384    /// `None` stands replaying to latest.
1385    pub entry_id: Option<entry::Id>,
1386    /// Used for metrics metadata region.
1387    /// The `entry_id` that was expected to reply to.
1388    /// `None` stands replaying to latest.
1389    pub metadata_entry_id: Option<entry::Id>,
1390    /// The hint for replaying memtable.
1391    pub location_id: Option<u64>,
1392    /// Replay checkpoint.
1393    pub checkpoint: Option<ReplayCheckpoint>,
1394}
1395
1396#[derive(Debug, Clone)]
1397pub struct RegionBulkInsertsRequest {
1398    pub region_id: RegionId,
1399    pub payload: DfRecordBatch,
1400    pub raw_data: ArrowIpc,
1401}
1402
1403impl RegionBulkInsertsRequest {
1404    pub fn estimated_size(&self) -> usize {
1405        self.payload.get_array_memory_size()
1406    }
1407}
1408
1409impl fmt::Display for RegionRequest {
1410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1411        match self {
1412            RegionRequest::Put(_) => write!(f, "Put"),
1413            RegionRequest::Delete(_) => write!(f, "Delete"),
1414            RegionRequest::Create(_) => write!(f, "Create"),
1415            RegionRequest::Drop(_) => write!(f, "Drop"),
1416            RegionRequest::Open(_) => write!(f, "Open"),
1417            RegionRequest::Close(_) => write!(f, "Close"),
1418            RegionRequest::Alter(_) => write!(f, "Alter"),
1419            RegionRequest::Flush(_) => write!(f, "Flush"),
1420            RegionRequest::Compact(_) => write!(f, "Compact"),
1421            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1422            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1423            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1424            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1425        }
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431
1432    use api::v1::region::RegionColumnDef;
1433    use api::v1::{ColumnDataType, ColumnDef};
1434    use datatypes::prelude::ConcreteDataType;
1435    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1436
1437    use super::*;
1438    use crate::metadata::RegionMetadataBuilder;
1439
1440    #[test]
1441    fn test_from_proto_location() {
1442        let proto_location = v1::AddColumnLocation {
1443            location_type: LocationType::First as i32,
1444            after_column_name: String::default(),
1445        };
1446        let location = AddColumnLocation::try_from(proto_location).unwrap();
1447        assert_eq!(location, AddColumnLocation::First);
1448
1449        let proto_location = v1::AddColumnLocation {
1450            location_type: 10,
1451            after_column_name: String::default(),
1452        };
1453        AddColumnLocation::try_from(proto_location).unwrap_err();
1454
1455        let proto_location = v1::AddColumnLocation {
1456            location_type: LocationType::After as i32,
1457            after_column_name: "a".to_string(),
1458        };
1459        let location = AddColumnLocation::try_from(proto_location).unwrap();
1460        assert_eq!(
1461            location,
1462            AddColumnLocation::After {
1463                column_name: "a".to_string()
1464            }
1465        );
1466    }
1467
1468    #[test]
1469    fn test_from_none_proto_add_column() {
1470        AddColumn::try_from(v1::region::AddColumn {
1471            column_def: None,
1472            location: None,
1473        })
1474        .unwrap_err();
1475    }
1476
1477    #[test]
1478    fn test_from_proto_alter_request() {
1479        RegionAlterRequest::try_from(AlterRequest {
1480            region_id: 0,
1481            schema_version: 1,
1482            kind: None,
1483        })
1484        .unwrap_err();
1485
1486        let request = RegionAlterRequest::try_from(AlterRequest {
1487            region_id: 0,
1488            schema_version: 1,
1489            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1490                add_columns: vec![v1::region::AddColumn {
1491                    column_def: Some(RegionColumnDef {
1492                        column_def: Some(ColumnDef {
1493                            name: "a".to_string(),
1494                            data_type: ColumnDataType::String as i32,
1495                            is_nullable: true,
1496                            default_constraint: vec![],
1497                            semantic_type: SemanticType::Field as i32,
1498                            comment: String::new(),
1499                            ..Default::default()
1500                        }),
1501                        column_id: 1,
1502                    }),
1503                    location: Some(v1::AddColumnLocation {
1504                        location_type: LocationType::First as i32,
1505                        after_column_name: String::default(),
1506                    }),
1507                }],
1508            })),
1509        })
1510        .unwrap();
1511
1512        assert_eq!(
1513            request,
1514            RegionAlterRequest {
1515                kind: AlterKind::AddColumns {
1516                    columns: vec![AddColumn {
1517                        column_metadata: ColumnMetadata {
1518                            column_schema: ColumnSchema::new(
1519                                "a",
1520                                ConcreteDataType::string_datatype(),
1521                                true,
1522                            ),
1523                            semantic_type: SemanticType::Field,
1524                            column_id: 1,
1525                        },
1526                        location: Some(AddColumnLocation::First),
1527                    }]
1528                },
1529            }
1530        );
1531    }
1532
1533    /// Returns a new region metadata for testing. Metadata:
1534    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1535    fn new_metadata() -> RegionMetadata {
1536        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1537        builder
1538            .push_column_metadata(ColumnMetadata {
1539                column_schema: ColumnSchema::new(
1540                    "ts",
1541                    ConcreteDataType::timestamp_millisecond_datatype(),
1542                    false,
1543                ),
1544                semantic_type: SemanticType::Timestamp,
1545                column_id: 1,
1546            })
1547            .push_column_metadata(ColumnMetadata {
1548                column_schema: ColumnSchema::new(
1549                    "tag_0",
1550                    ConcreteDataType::string_datatype(),
1551                    true,
1552                ),
1553                semantic_type: SemanticType::Tag,
1554                column_id: 2,
1555            })
1556            .push_column_metadata(ColumnMetadata {
1557                column_schema: ColumnSchema::new(
1558                    "field_0",
1559                    ConcreteDataType::string_datatype(),
1560                    true,
1561                ),
1562                semantic_type: SemanticType::Field,
1563                column_id: 3,
1564            })
1565            .push_column_metadata(ColumnMetadata {
1566                column_schema: ColumnSchema::new(
1567                    "field_1",
1568                    ConcreteDataType::boolean_datatype(),
1569                    true,
1570                ),
1571                semantic_type: SemanticType::Field,
1572                column_id: 4,
1573            })
1574            .primary_key(vec![2]);
1575        builder.build().unwrap()
1576    }
1577
1578    #[test]
1579    fn test_add_column_validate() {
1580        let metadata = new_metadata();
1581        let add_column = AddColumn {
1582            column_metadata: ColumnMetadata {
1583                column_schema: ColumnSchema::new(
1584                    "tag_1",
1585                    ConcreteDataType::string_datatype(),
1586                    true,
1587                ),
1588                semantic_type: SemanticType::Tag,
1589                column_id: 5,
1590            },
1591            location: None,
1592        };
1593        add_column.validate(&metadata).unwrap();
1594        assert!(add_column.need_alter(&metadata));
1595
1596        // Add not null column.
1597        AddColumn {
1598            column_metadata: ColumnMetadata {
1599                column_schema: ColumnSchema::new(
1600                    "tag_1",
1601                    ConcreteDataType::string_datatype(),
1602                    false,
1603                ),
1604                semantic_type: SemanticType::Tag,
1605                column_id: 5,
1606            },
1607            location: None,
1608        }
1609        .validate(&metadata)
1610        .unwrap_err();
1611
1612        // Add existing column.
1613        let add_column = AddColumn {
1614            column_metadata: ColumnMetadata {
1615                column_schema: ColumnSchema::new(
1616                    "tag_0",
1617                    ConcreteDataType::string_datatype(),
1618                    true,
1619                ),
1620                semantic_type: SemanticType::Tag,
1621                column_id: 2,
1622            },
1623            location: None,
1624        };
1625        add_column.validate(&metadata).unwrap();
1626        assert!(!add_column.need_alter(&metadata));
1627    }
1628
1629    #[test]
1630    fn test_add_duplicate_columns() {
1631        let kind = AlterKind::AddColumns {
1632            columns: vec![
1633                AddColumn {
1634                    column_metadata: ColumnMetadata {
1635                        column_schema: ColumnSchema::new(
1636                            "tag_1",
1637                            ConcreteDataType::string_datatype(),
1638                            true,
1639                        ),
1640                        semantic_type: SemanticType::Tag,
1641                        column_id: 5,
1642                    },
1643                    location: None,
1644                },
1645                AddColumn {
1646                    column_metadata: ColumnMetadata {
1647                        column_schema: ColumnSchema::new(
1648                            "tag_1",
1649                            ConcreteDataType::string_datatype(),
1650                            true,
1651                        ),
1652                        semantic_type: SemanticType::Field,
1653                        column_id: 6,
1654                    },
1655                    location: None,
1656                },
1657            ],
1658        };
1659        let metadata = new_metadata();
1660        kind.validate(&metadata).unwrap();
1661        assert!(kind.need_alter(&metadata));
1662    }
1663
1664    #[test]
1665    fn test_add_existing_column_different_metadata() {
1666        let metadata = new_metadata();
1667
1668        // Add existing column with different id.
1669        let kind = AlterKind::AddColumns {
1670            columns: vec![AddColumn {
1671                column_metadata: ColumnMetadata {
1672                    column_schema: ColumnSchema::new(
1673                        "tag_0",
1674                        ConcreteDataType::string_datatype(),
1675                        true,
1676                    ),
1677                    semantic_type: SemanticType::Tag,
1678                    column_id: 4,
1679                },
1680                location: None,
1681            }],
1682        };
1683        kind.validate(&metadata).unwrap_err();
1684
1685        // Add existing column with different type.
1686        let kind = AlterKind::AddColumns {
1687            columns: vec![AddColumn {
1688                column_metadata: ColumnMetadata {
1689                    column_schema: ColumnSchema::new(
1690                        "tag_0",
1691                        ConcreteDataType::int64_datatype(),
1692                        true,
1693                    ),
1694                    semantic_type: SemanticType::Tag,
1695                    column_id: 2,
1696                },
1697                location: None,
1698            }],
1699        };
1700        kind.validate(&metadata).unwrap_err();
1701
1702        // Add existing column with different name.
1703        let kind = AlterKind::AddColumns {
1704            columns: vec![AddColumn {
1705                column_metadata: ColumnMetadata {
1706                    column_schema: ColumnSchema::new(
1707                        "tag_1",
1708                        ConcreteDataType::string_datatype(),
1709                        true,
1710                    ),
1711                    semantic_type: SemanticType::Tag,
1712                    column_id: 2,
1713                },
1714                location: None,
1715            }],
1716        };
1717        kind.validate(&metadata).unwrap_err();
1718    }
1719
1720    #[test]
1721    fn test_add_existing_column_with_location() {
1722        let metadata = new_metadata();
1723        let kind = AlterKind::AddColumns {
1724            columns: vec![AddColumn {
1725                column_metadata: ColumnMetadata {
1726                    column_schema: ColumnSchema::new(
1727                        "tag_0",
1728                        ConcreteDataType::string_datatype(),
1729                        true,
1730                    ),
1731                    semantic_type: SemanticType::Tag,
1732                    column_id: 2,
1733                },
1734                location: Some(AddColumnLocation::First),
1735            }],
1736        };
1737        kind.validate(&metadata).unwrap_err();
1738    }
1739
1740    #[test]
1741    fn test_validate_drop_column() {
1742        let metadata = new_metadata();
1743        let kind = AlterKind::DropColumns {
1744            names: vec!["xxxx".to_string()],
1745        };
1746        kind.validate(&metadata).unwrap();
1747        assert!(!kind.need_alter(&metadata));
1748
1749        AlterKind::DropColumns {
1750            names: vec!["tag_0".to_string()],
1751        }
1752        .validate(&metadata)
1753        .unwrap_err();
1754
1755        let kind = AlterKind::DropColumns {
1756            names: vec!["field_0".to_string()],
1757        };
1758        kind.validate(&metadata).unwrap();
1759        assert!(kind.need_alter(&metadata));
1760    }
1761
1762    #[test]
1763    fn test_validate_modify_column_type() {
1764        let metadata = new_metadata();
1765        AlterKind::ModifyColumnTypes {
1766            columns: vec![ModifyColumnType {
1767                column_name: "xxxx".to_string(),
1768                target_type: ConcreteDataType::string_datatype(),
1769            }],
1770        }
1771        .validate(&metadata)
1772        .unwrap_err();
1773
1774        AlterKind::ModifyColumnTypes {
1775            columns: vec![ModifyColumnType {
1776                column_name: "field_1".to_string(),
1777                target_type: ConcreteDataType::date_datatype(),
1778            }],
1779        }
1780        .validate(&metadata)
1781        .unwrap_err();
1782
1783        AlterKind::ModifyColumnTypes {
1784            columns: vec![ModifyColumnType {
1785                column_name: "ts".to_string(),
1786                target_type: ConcreteDataType::date_datatype(),
1787            }],
1788        }
1789        .validate(&metadata)
1790        .unwrap_err();
1791
1792        AlterKind::ModifyColumnTypes {
1793            columns: vec![ModifyColumnType {
1794                column_name: "tag_0".to_string(),
1795                target_type: ConcreteDataType::date_datatype(),
1796            }],
1797        }
1798        .validate(&metadata)
1799        .unwrap_err();
1800
1801        let kind = AlterKind::ModifyColumnTypes {
1802            columns: vec![ModifyColumnType {
1803                column_name: "field_0".to_string(),
1804                target_type: ConcreteDataType::int32_datatype(),
1805            }],
1806        };
1807        kind.validate(&metadata).unwrap();
1808        assert!(kind.need_alter(&metadata));
1809    }
1810
1811    #[test]
1812    fn test_validate_add_columns() {
1813        let kind = AlterKind::AddColumns {
1814            columns: vec![
1815                AddColumn {
1816                    column_metadata: ColumnMetadata {
1817                        column_schema: ColumnSchema::new(
1818                            "tag_1",
1819                            ConcreteDataType::string_datatype(),
1820                            true,
1821                        ),
1822                        semantic_type: SemanticType::Tag,
1823                        column_id: 5,
1824                    },
1825                    location: None,
1826                },
1827                AddColumn {
1828                    column_metadata: ColumnMetadata {
1829                        column_schema: ColumnSchema::new(
1830                            "field_2",
1831                            ConcreteDataType::string_datatype(),
1832                            true,
1833                        ),
1834                        semantic_type: SemanticType::Field,
1835                        column_id: 6,
1836                    },
1837                    location: None,
1838                },
1839            ],
1840        };
1841        let request = RegionAlterRequest { kind };
1842        let mut metadata = new_metadata();
1843        metadata.schema_version = 1;
1844        request.validate(&metadata).unwrap();
1845    }
1846
1847    #[test]
1848    fn test_validate_create_region() {
1849        let column_metadatas = vec![
1850            ColumnMetadata {
1851                column_schema: ColumnSchema::new(
1852                    "ts",
1853                    ConcreteDataType::timestamp_millisecond_datatype(),
1854                    false,
1855                ),
1856                semantic_type: SemanticType::Timestamp,
1857                column_id: 1,
1858            },
1859            ColumnMetadata {
1860                column_schema: ColumnSchema::new(
1861                    "tag_0",
1862                    ConcreteDataType::string_datatype(),
1863                    true,
1864                ),
1865                semantic_type: SemanticType::Tag,
1866                column_id: 2,
1867            },
1868            ColumnMetadata {
1869                column_schema: ColumnSchema::new(
1870                    "field_0",
1871                    ConcreteDataType::string_datatype(),
1872                    true,
1873                ),
1874                semantic_type: SemanticType::Field,
1875                column_id: 3,
1876            },
1877        ];
1878        let create = RegionCreateRequest {
1879            engine: "mito".to_string(),
1880            column_metadatas,
1881            primary_key: vec![3, 4],
1882            options: HashMap::new(),
1883            table_dir: "path".to_string(),
1884            path_type: PathType::Bare,
1885            partition_expr_json: Some("".to_string()),
1886        };
1887
1888        assert!(create.validate().is_err());
1889    }
1890
1891    #[test]
1892    fn test_validate_modify_column_fulltext_options() {
1893        let kind = AlterKind::SetIndexes {
1894            options: vec![SetIndexOption::Fulltext {
1895                column_name: "tag_0".to_string(),
1896                options: FulltextOptions::new_unchecked(
1897                    true,
1898                    FulltextAnalyzer::Chinese,
1899                    false,
1900                    FulltextBackend::Bloom,
1901                    1000,
1902                    0.01,
1903                ),
1904            }],
1905        };
1906        let request = RegionAlterRequest { kind };
1907        let mut metadata = new_metadata();
1908        metadata.schema_version = 1;
1909        request.validate(&metadata).unwrap();
1910
1911        let kind = AlterKind::UnsetIndexes {
1912            options: vec![UnsetIndexOption::Fulltext {
1913                column_name: "tag_0".to_string(),
1914            }],
1915        };
1916        let request = RegionAlterRequest { kind };
1917        let mut metadata = new_metadata();
1918        metadata.schema_version = 1;
1919        request.validate(&metadata).unwrap();
1920    }
1921
1922    #[test]
1923    fn test_validate_sync_columns() {
1924        let metadata = new_metadata();
1925        let kind = AlterKind::SyncColumns {
1926            column_metadatas: vec![
1927                ColumnMetadata {
1928                    column_schema: ColumnSchema::new(
1929                        "tag_1",
1930                        ConcreteDataType::string_datatype(),
1931                        true,
1932                    ),
1933                    semantic_type: SemanticType::Tag,
1934                    column_id: 5,
1935                },
1936                ColumnMetadata {
1937                    column_schema: ColumnSchema::new(
1938                        "field_2",
1939                        ConcreteDataType::string_datatype(),
1940                        true,
1941                    ),
1942                    semantic_type: SemanticType::Field,
1943                    column_id: 6,
1944                },
1945            ],
1946        };
1947        let err = kind.validate(&metadata).unwrap_err();
1948        assert!(err.to_string().contains("not a primary key"));
1949
1950        // Change the timestamp column name.
1951        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1952        let ts_column = column_metadatas_with_different_ts_column
1953            .iter_mut()
1954            .find(|c| c.semantic_type == SemanticType::Timestamp)
1955            .unwrap();
1956        ts_column.column_schema.name = "ts1".to_string();
1957
1958        let kind = AlterKind::SyncColumns {
1959            column_metadatas: column_metadatas_with_different_ts_column,
1960        };
1961        let err = kind.validate(&metadata).unwrap_err();
1962        assert!(
1963            err.to_string()
1964                .contains("timestamp column ts has different id")
1965        );
1966
1967        // Change the primary key column name.
1968        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1969        let pk_column = column_metadatas_with_different_pk_column
1970            .iter_mut()
1971            .find(|c| c.column_schema.name == "tag_0")
1972            .unwrap();
1973        pk_column.column_id = 100;
1974        let kind = AlterKind::SyncColumns {
1975            column_metadatas: column_metadatas_with_different_pk_column,
1976        };
1977        let err = kind.validate(&metadata).unwrap_err();
1978        assert!(
1979            err.to_string()
1980                .contains("column with same name tag_0 has different id")
1981        );
1982
1983        // Add a new field column.
1984        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1985        column_metadatas_with_new_field_column.push(ColumnMetadata {
1986            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1987            semantic_type: SemanticType::Field,
1988            column_id: 4,
1989        });
1990        let kind = AlterKind::SyncColumns {
1991            column_metadatas: column_metadatas_with_new_field_column,
1992        };
1993        kind.validate(&metadata).unwrap();
1994    }
1995
1996    #[test]
1997    fn test_cast_path_type_to_primitive() {
1998        assert_eq!(PathType::Bare as u8, 0);
1999        assert_eq!(PathType::Data as u8, 1);
2000        assert_eq!(PathType::Metadata as u8, 2);
2001        assert_eq!(
2002            PathType::try_from(PathType::Bare as u8).unwrap(),
2003            PathType::Bare
2004        );
2005        assert_eq!(
2006            PathType::try_from(PathType::Data as u8).unwrap(),
2007            PathType::Data
2008        );
2009        assert_eq!(
2010            PathType::try_from(PathType::Metadata as u8).unwrap(),
2011            PathType::Metadata
2012        );
2013    }
2014}