mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23    to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
39    RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
40    RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{FileId, RegionId};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::sst::file::FileMeta;
54use crate::sst::index::IndexBuildType;
55use crate::wal::EntryId;
56use crate::wal::entry_distributor::WalEntryReceiver;
57
58/// Request to write a region.
59#[derive(Debug)]
60pub struct WriteRequest {
61    /// Region to write.
62    pub region_id: RegionId,
63    /// Type of the write request.
64    pub op_type: OpType,
65    /// Rows to write.
66    pub rows: Rows,
67    /// Map column name to column index in `rows`.
68    pub name_to_index: HashMap<String, usize>,
69    /// Whether each column has null.
70    pub has_null: Vec<bool>,
71    /// Write hint.
72    pub hint: Option<WriteHint>,
73    /// Region metadata on the time of this request is created.
74    pub(crate) region_metadata: Option<RegionMetadataRef>,
75}
76
77impl WriteRequest {
78    /// Creates a new request.
79    ///
80    /// Returns `Err` if `rows` are invalid.
81    pub fn new(
82        region_id: RegionId,
83        op_type: OpType,
84        rows: Rows,
85        region_metadata: Option<RegionMetadataRef>,
86    ) -> Result<WriteRequest> {
87        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
88        for (index, column) in rows.schema.iter().enumerate() {
89            ensure!(
90                name_to_index
91                    .insert(column.column_name.clone(), index)
92                    .is_none(),
93                InvalidRequestSnafu {
94                    region_id,
95                    reason: format!("duplicate column {}", column.column_name),
96                }
97            );
98        }
99
100        let mut has_null = vec![false; rows.schema.len()];
101        for row in &rows.rows {
102            ensure!(
103                row.values.len() == rows.schema.len(),
104                InvalidRequestSnafu {
105                    region_id,
106                    reason: format!(
107                        "row has {} columns but schema has {}",
108                        row.values.len(),
109                        rows.schema.len()
110                    ),
111                }
112            );
113
114            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
115                validate_proto_value(region_id, value, column_schema)?;
116
117                if value.value_data.is_none() {
118                    has_null[i] = true;
119                }
120            }
121        }
122
123        Ok(WriteRequest {
124            region_id,
125            op_type,
126            rows,
127            name_to_index,
128            has_null,
129            hint: None,
130            region_metadata,
131        })
132    }
133
134    /// Sets the write hint.
135    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
136        self.hint = hint;
137        self
138    }
139
140    /// Returns the encoding hint.
141    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
142        infer_primary_key_encoding_from_hint(self.hint.as_ref())
143    }
144
145    /// Returns estimated size of the request.
146    pub(crate) fn estimated_size(&self) -> usize {
147        let row_size = self
148            .rows
149            .rows
150            .first()
151            .map(|row| row.encoded_len())
152            .unwrap_or(0);
153        row_size * self.rows.rows.len()
154    }
155
156    /// Gets column index by name.
157    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
158        self.name_to_index.get(name).copied()
159    }
160
161    /// Checks schema of rows is compatible with schema of the region.
162    ///
163    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
164    /// error.
165    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
166        debug_assert_eq!(self.region_id, metadata.region_id);
167
168        let region_id = self.region_id;
169        // Index all columns in rows.
170        let mut rows_columns: HashMap<_, _> = self
171            .rows
172            .schema
173            .iter()
174            .map(|column| (&column.column_name, column))
175            .collect();
176
177        let mut need_fill_default = false;
178        // Checks all columns in this region.
179        for column in &metadata.column_metadatas {
180            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
181                // Check data type.
182                ensure!(
183                    is_column_type_value_eq(
184                        input_col.datatype,
185                        input_col.datatype_extension.clone(),
186                        &column.column_schema.data_type
187                    ),
188                    InvalidRequestSnafu {
189                        region_id,
190                        reason: format!(
191                            "column {} expect type {:?}, given: {}({})",
192                            column.column_schema.name,
193                            column.column_schema.data_type,
194                            ColumnDataType::try_from(input_col.datatype)
195                                .map(|v| v.as_str_name())
196                                .unwrap_or("Unknown"),
197                            input_col.datatype,
198                        )
199                    }
200                );
201
202                // Check semantic type.
203                ensure!(
204                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
205                    InvalidRequestSnafu {
206                        region_id,
207                        reason: format!(
208                            "column {} has semantic type {:?}, given: {}({})",
209                            column.column_schema.name,
210                            column.semantic_type,
211                            api::v1::SemanticType::try_from(input_col.semantic_type)
212                                .map(|v| v.as_str_name())
213                                .unwrap_or("Unknown"),
214                            input_col.semantic_type
215                        ),
216                    }
217                );
218
219                // Check nullable.
220                // Safety: `rows_columns` ensures this column exists.
221                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
222                ensure!(
223                    !has_null || column.column_schema.is_nullable(),
224                    InvalidRequestSnafu {
225                        region_id,
226                        reason: format!(
227                            "column {} is not null but input has null",
228                            column.column_schema.name
229                        ),
230                    }
231                );
232            } else {
233                // Rows don't have this column.
234                self.check_missing_column(column)?;
235
236                need_fill_default = true;
237            }
238        }
239
240        // Checks all columns in rows exist in the region.
241        if !rows_columns.is_empty() {
242            let names: Vec<_> = rows_columns.into_keys().collect();
243            return InvalidRequestSnafu {
244                region_id,
245                reason: format!("unknown columns: {:?}", names),
246            }
247            .fail();
248        }
249
250        // If we need to fill default values, return a special error.
251        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
252
253        Ok(())
254    }
255
256    /// Tries to fill missing columns.
257    ///
258    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
259    /// values.
260    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
261        debug_assert_eq!(self.region_id, metadata.region_id);
262
263        let mut columns_to_fill = vec![];
264        for column in &metadata.column_metadatas {
265            if !self.name_to_index.contains_key(&column.column_schema.name) {
266                columns_to_fill.push(column);
267            }
268        }
269        self.fill_columns(columns_to_fill)?;
270
271        Ok(())
272    }
273
274    /// Checks the schema and fill missing columns.
275    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
276        if let Err(e) = self.check_schema(metadata) {
277            if e.is_fill_default() {
278                // TODO(yingwen): Add metrics for this case.
279                // We need to fill default value. The write request may be a request
280                // sent before changing the schema.
281                self.fill_missing_columns(metadata)?;
282            } else {
283                return Err(e);
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Fills default value for specific `columns`.
291    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
292        let mut default_values = Vec::with_capacity(columns.len());
293        let mut columns_to_fill = Vec::with_capacity(columns.len());
294        for column in columns {
295            let default_value = self.column_default_value(column)?;
296            if default_value.value_data.is_some() {
297                default_values.push(default_value);
298                columns_to_fill.push(column);
299            }
300        }
301
302        for row in &mut self.rows.rows {
303            row.values.extend(default_values.iter().cloned());
304        }
305
306        for column in columns_to_fill {
307            let (datatype, datatype_ext) =
308                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
309                    .with_context(|_| ConvertColumnDataTypeSnafu {
310                        reason: format!(
311                            "no protobuf type for column {} ({:?})",
312                            column.column_schema.name, column.column_schema.data_type
313                        ),
314                    })?
315                    .to_parts();
316            self.rows.schema.push(ColumnSchema {
317                column_name: column.column_schema.name.clone(),
318                datatype: datatype as i32,
319                semantic_type: column.semantic_type as i32,
320                datatype_extension: datatype_ext,
321                options: options_from_column_schema(&column.column_schema),
322            });
323        }
324
325        Ok(())
326    }
327
328    /// Checks whether we should allow a row doesn't provide this column.
329    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
330        if self.op_type == OpType::Delete {
331            if column.semantic_type == SemanticType::Field {
332                // For delete request, all tags and timestamp is required. We don't fill default
333                // tag or timestamp while deleting rows.
334                return Ok(());
335            } else {
336                return InvalidRequestSnafu {
337                    region_id: self.region_id,
338                    reason: format!("delete requests need column {}", column.column_schema.name),
339                }
340                .fail();
341            }
342        }
343
344        // Not a delete request. Checks whether they have default value.
345        ensure!(
346            column.column_schema.is_nullable()
347                || column.column_schema.default_constraint().is_some(),
348            InvalidRequestSnafu {
349                region_id: self.region_id,
350                reason: format!("missing column {}", column.column_schema.name),
351            }
352        );
353
354        Ok(())
355    }
356
357    /// Returns the default value for specific column.
358    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
359        let default_value = match self.op_type {
360            OpType::Delete => {
361                ensure!(
362                    column.semantic_type == SemanticType::Field,
363                    InvalidRequestSnafu {
364                        region_id: self.region_id,
365                        reason: format!(
366                            "delete requests need column {}",
367                            column.column_schema.name
368                        ),
369                    }
370                );
371
372                // For delete request, we need a default value for padding so we
373                // can delete a row even a field doesn't have a default value. So the
374                // value doesn't need to following the default value constraint of the
375                // column.
376                if column.column_schema.is_nullable() {
377                    datatypes::value::Value::Null
378                } else {
379                    column.column_schema.data_type.default_value()
380                }
381            }
382            OpType::Put => {
383                // For put requests, we use the default value from column schema.
384                if column.column_schema.is_default_impure() {
385                    UnexpectedSnafu {
386                        reason: format!(
387                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
388                            self.region_id,
389                            column.column_schema.name,
390                            column.column_schema.default_constraint(),
391                        ),
392                    }
393                    .fail()?
394                }
395                column
396                    .column_schema
397                    .create_default()
398                    .context(CreateDefaultSnafu {
399                        region_id: self.region_id,
400                        column: &column.column_schema.name,
401                    })?
402                    // This column doesn't have default value.
403                    .with_context(|| InvalidRequestSnafu {
404                        region_id: self.region_id,
405                        reason: format!(
406                            "column {} does not have default value",
407                            column.column_schema.name
408                        ),
409                    })?
410            }
411        };
412
413        // Convert default value into proto's value.
414        Ok(to_proto_value(default_value))
415    }
416}
417
418/// Validate proto value schema.
419pub(crate) fn validate_proto_value(
420    region_id: RegionId,
421    value: &Value,
422    column_schema: &ColumnSchema,
423) -> Result<()> {
424    if let Some(value_type) = proto_value_type(value) {
425        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426            InvalidRequestSnafu {
427                region_id,
428                reason: format!(
429                    "column {} has unknown type {}",
430                    column_schema.column_name, column_schema.datatype
431                ),
432            }
433            .build()
434        })?;
435        ensure!(
436            proto_value_type_match(column_type, value_type),
437            InvalidRequestSnafu {
438                region_id,
439                reason: format!(
440                    "value has type {:?}, but column {} has type {:?}({})",
441                    value_type, column_schema.column_name, column_type, column_schema.datatype,
442                ),
443            }
444        );
445    }
446
447    Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451    match (column_type, value_type) {
452        (ct, vt) if ct == vt => true,
453        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454        (ColumnDataType::Json, ColumnDataType::Binary) => true,
455        _ => false,
456    }
457}
458
459/// Oneshot output result sender.
460#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464    /// Creates a new output sender.
465    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466        OutputTx(sender)
467    }
468
469    /// Sends the `result`.
470    pub(crate) fn send(self, result: Result<AffectedRows>) {
471        // Ignores send result.
472        let _ = self.0.send(result);
473    }
474}
475
476/// Optional output result sender.
477#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481    /// Creates a sender.
482    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483        OptionOutputTx(sender)
484    }
485
486    /// Creates an empty sender.
487    pub(crate) fn none() -> OptionOutputTx {
488        OptionOutputTx(None)
489    }
490
491    /// Sends the `result` and consumes the inner sender.
492    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493        if let Some(sender) = self.0.take() {
494            sender.send(result);
495        }
496    }
497
498    /// Sends the `result` and consumes the sender.
499    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500        if let Some(sender) = self.0.take() {
501            sender.send(result);
502        }
503    }
504
505    /// Takes the inner sender.
506    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507        self.0.take()
508    }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513        Self::new(Some(OutputTx::new(sender)))
514    }
515}
516
517impl OnFailure for OptionOutputTx {
518    fn on_failure(&mut self, err: Error) {
519        self.send_mut(Err(err));
520    }
521}
522
523/// Callback on failure.
524pub(crate) trait OnFailure {
525    /// Handles `err` on failure.
526    fn on_failure(&mut self, err: Error);
527}
528
529/// Sender and write request.
530#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532    /// Result sender.
533    pub(crate) sender: OptionOutputTx,
534    pub(crate) request: WriteRequest,
535}
536
537pub(crate) struct SenderBulkRequest {
538    pub(crate) sender: OptionOutputTx,
539    pub(crate) region_id: RegionId,
540    pub(crate) request: BulkPart,
541    pub(crate) region_metadata: RegionMetadataRef,
542}
543
544/// Request sent to a worker with timestamp
545#[derive(Debug)]
546pub(crate) struct WorkerRequestWithTime {
547    pub(crate) request: WorkerRequest,
548    pub(crate) created_at: Instant,
549}
550
551impl WorkerRequestWithTime {
552    pub(crate) fn new(request: WorkerRequest) -> Self {
553        Self {
554            request,
555            created_at: Instant::now(),
556        }
557    }
558}
559
560/// Request sent to a worker
561#[derive(Debug)]
562pub(crate) enum WorkerRequest {
563    /// Write to a region.
564    Write(SenderWriteRequest),
565
566    /// Ddl request to a region.
567    Ddl(SenderDdlRequest),
568
569    /// Notifications from internal background jobs.
570    Background {
571        /// Id of the region to send.
572        region_id: RegionId,
573        /// Internal notification.
574        notify: BackgroundNotify,
575    },
576
577    /// The internal commands.
578    SetRegionRoleStateGracefully {
579        /// Id of the region to send.
580        region_id: RegionId,
581        /// The [SettableRegionRoleState].
582        region_role_state: SettableRegionRoleState,
583        /// The sender of [SetReadonlyResponse].
584        sender: Sender<SetRegionRoleStateResponse>,
585    },
586
587    /// Notify a worker to stop.
588    Stop,
589
590    /// Use [RegionEdit] to edit a region directly.
591    EditRegion(RegionEditRequest),
592
593    /// Keep the manifest of a region up to date.
594    SyncRegion(RegionSyncRequest),
595
596    /// Bulk inserts request and region metadata.
597    BulkInserts {
598        metadata: Option<RegionMetadataRef>,
599        request: RegionBulkInsertsRequest,
600        sender: OptionOutputTx,
601    },
602}
603
604impl WorkerRequest {
605    /// Creates a new open region request.
606    pub(crate) fn new_open_region_request(
607        region_id: RegionId,
608        request: RegionOpenRequest,
609        entry_receiver: Option<WalEntryReceiver>,
610    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
611        let (sender, receiver) = oneshot::channel();
612
613        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
614            region_id,
615            sender: sender.into(),
616            request: DdlRequest::Open((request, entry_receiver)),
617        });
618
619        (worker_request, receiver)
620    }
621
622    /// Creates a new catchup region request.
623    pub(crate) fn new_catchup_region_request(
624        region_id: RegionId,
625        request: RegionCatchupRequest,
626        entry_receiver: Option<WalEntryReceiver>,
627    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
628        let (sender, receiver) = oneshot::channel();
629        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
630            region_id,
631            sender: sender.into(),
632            request: DdlRequest::Catchup((request, entry_receiver)),
633        });
634        (worker_request, receiver)
635    }
636
637    /// Converts request from a [RegionRequest].
638    pub(crate) fn try_from_region_request(
639        region_id: RegionId,
640        value: RegionRequest,
641        region_metadata: Option<RegionMetadataRef>,
642    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
643        let (sender, receiver) = oneshot::channel();
644        let worker_request = match value {
645            RegionRequest::Put(v) => {
646                let mut write_request =
647                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
648                        .with_hint(v.hint);
649                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
650                    && let Some(region_metadata) = &region_metadata
651                {
652                    write_request.maybe_fill_missing_columns(region_metadata)?;
653                }
654                WorkerRequest::Write(SenderWriteRequest {
655                    sender: sender.into(),
656                    request: write_request,
657                })
658            }
659            RegionRequest::Delete(v) => {
660                let mut write_request =
661                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
662                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
663                    && let Some(region_metadata) = &region_metadata
664                {
665                    write_request.maybe_fill_missing_columns(region_metadata)?;
666                }
667                WorkerRequest::Write(SenderWriteRequest {
668                    sender: sender.into(),
669                    request: write_request,
670                })
671            }
672            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
673                region_id,
674                sender: sender.into(),
675                request: DdlRequest::Create(v),
676            }),
677            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
678                region_id,
679                sender: sender.into(),
680                request: DdlRequest::Drop,
681            }),
682            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
683                region_id,
684                sender: sender.into(),
685                request: DdlRequest::Open((v, None)),
686            }),
687            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
688                region_id,
689                sender: sender.into(),
690                request: DdlRequest::Close(v),
691            }),
692            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
693                region_id,
694                sender: sender.into(),
695                request: DdlRequest::Alter(v),
696            }),
697            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
698                region_id,
699                sender: sender.into(),
700                request: DdlRequest::Flush(v),
701            }),
702            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
703                region_id,
704                sender: sender.into(),
705                request: DdlRequest::Compact(v),
706            }),
707            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
708                region_id,
709                sender: sender.into(),
710                request: DdlRequest::BuildIndex(v),
711            }),
712            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
713                region_id,
714                sender: sender.into(),
715                request: DdlRequest::Truncate(v),
716            }),
717            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
718                region_id,
719                sender: sender.into(),
720                request: DdlRequest::Catchup((v, None)),
721            }),
722            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
723                metadata: region_metadata,
724                sender: sender.into(),
725                request: region_bulk_inserts_request,
726            },
727        };
728
729        Ok((worker_request, receiver))
730    }
731
732    pub(crate) fn new_set_readonly_gracefully(
733        region_id: RegionId,
734        region_role_state: SettableRegionRoleState,
735    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
736        let (sender, receiver) = oneshot::channel();
737
738        (
739            WorkerRequest::SetRegionRoleStateGracefully {
740                region_id,
741                region_role_state,
742                sender,
743            },
744            receiver,
745        )
746    }
747
748    pub(crate) fn new_sync_region_request(
749        region_id: RegionId,
750        manifest_version: ManifestVersion,
751    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
752        let (sender, receiver) = oneshot::channel();
753        (
754            WorkerRequest::SyncRegion(RegionSyncRequest {
755                region_id,
756                manifest_version,
757                sender,
758            }),
759            receiver,
760        )
761    }
762}
763
764/// DDL request to a region.
765#[derive(Debug)]
766pub(crate) enum DdlRequest {
767    Create(RegionCreateRequest),
768    Drop,
769    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
770    Close(RegionCloseRequest),
771    Alter(RegionAlterRequest),
772    Flush(RegionFlushRequest),
773    Compact(RegionCompactRequest),
774    BuildIndex(RegionBuildIndexRequest),
775    Truncate(RegionTruncateRequest),
776    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
777}
778
779/// Sender and Ddl request.
780#[derive(Debug)]
781pub(crate) struct SenderDdlRequest {
782    /// Region id of the request.
783    pub(crate) region_id: RegionId,
784    /// Result sender.
785    pub(crate) sender: OptionOutputTx,
786    /// Ddl request.
787    pub(crate) request: DdlRequest,
788}
789
790/// Notification from a background job.
791#[derive(Debug)]
792pub(crate) enum BackgroundNotify {
793    /// Flush has finished.
794    FlushFinished(FlushFinished),
795    /// Flush has failed.
796    FlushFailed(FlushFailed),
797    /// Index build has finished.
798    IndexBuildFinished(IndexBuildFinished),
799    /// Index build has been stopped (aborted or succeeded).
800    IndexBuildStopped(IndexBuildStopped),
801    /// Index build has failed.
802    IndexBuildFailed(IndexBuildFailed),
803    /// Compaction has finished.
804    CompactionFinished(CompactionFinished),
805    /// Compaction has failed.
806    CompactionFailed(CompactionFailed),
807    /// Truncate result.
808    Truncate(TruncateResult),
809    /// Region change result.
810    RegionChange(RegionChangeResult),
811    /// Region edit result.
812    RegionEdit(RegionEditResult),
813}
814
815/// Notifies a flush job is finished.
816#[derive(Debug)]
817pub(crate) struct FlushFinished {
818    /// Region id.
819    pub(crate) region_id: RegionId,
820    /// Entry id of flushed data.
821    pub(crate) flushed_entry_id: EntryId,
822    /// Flush result senders.
823    pub(crate) senders: Vec<OutputTx>,
824    /// Flush timer.
825    pub(crate) _timer: HistogramTimer,
826    /// Region edit to apply.
827    pub(crate) edit: RegionEdit,
828    /// Memtables to remove.
829    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
830}
831
832impl FlushFinished {
833    /// Marks the flush job as successful and observes the timer.
834    pub(crate) fn on_success(self) {
835        for sender in self.senders {
836            sender.send(Ok(0));
837        }
838    }
839}
840
841impl OnFailure for FlushFinished {
842    fn on_failure(&mut self, err: Error) {
843        let err = Arc::new(err);
844        for sender in self.senders.drain(..) {
845            sender.send(Err(err.clone()).context(FlushRegionSnafu {
846                region_id: self.region_id,
847            }));
848        }
849    }
850}
851
852/// Notifies a flush job is failed.
853#[derive(Debug)]
854pub(crate) struct FlushFailed {
855    /// The error source of the failure.
856    pub(crate) err: Arc<Error>,
857}
858
859#[derive(Debug)]
860pub(crate) struct IndexBuildFinished {
861    #[allow(dead_code)]
862    pub(crate) region_id: RegionId,
863    pub(crate) edit: RegionEdit,
864}
865
866/// Notifies an index build job has been stopped.
867#[derive(Debug)]
868pub(crate) struct IndexBuildStopped {
869    #[allow(dead_code)]
870    pub(crate) region_id: RegionId,
871    pub(crate) file_id: FileId,
872}
873
874/// Notifies an index build job has failed.
875#[derive(Debug)]
876pub(crate) struct IndexBuildFailed {
877    pub(crate) err: Arc<Error>,
878}
879
880/// Notifies a compaction job has finished.
881#[derive(Debug)]
882pub(crate) struct CompactionFinished {
883    /// Region id.
884    pub(crate) region_id: RegionId,
885    /// Compaction result senders.
886    pub(crate) senders: Vec<OutputTx>,
887    /// Start time of compaction task.
888    pub(crate) start_time: Instant,
889    /// Region edit to apply.
890    pub(crate) edit: RegionEdit,
891}
892
893impl CompactionFinished {
894    pub fn on_success(self) {
895        // only update compaction time on success
896        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
897
898        for sender in self.senders {
899            sender.send(Ok(0));
900        }
901        info!("Successfully compacted region: {}", self.region_id);
902    }
903}
904
905impl OnFailure for CompactionFinished {
906    /// Compaction succeeded but failed to update manifest or region's already been dropped.
907    fn on_failure(&mut self, err: Error) {
908        let err = Arc::new(err);
909        for sender in self.senders.drain(..) {
910            sender.send(Err(err.clone()).context(CompactRegionSnafu {
911                region_id: self.region_id,
912            }));
913        }
914    }
915}
916
917/// A failing compaction result.
918#[derive(Debug)]
919pub(crate) struct CompactionFailed {
920    pub(crate) region_id: RegionId,
921    /// The error source of the failure.
922    pub(crate) err: Arc<Error>,
923}
924
925/// Notifies the truncate result of a region.
926#[derive(Debug)]
927pub(crate) struct TruncateResult {
928    /// Region id.
929    pub(crate) region_id: RegionId,
930    /// Result sender.
931    pub(crate) sender: OptionOutputTx,
932    /// Truncate result.
933    pub(crate) result: Result<()>,
934    pub(crate) kind: TruncateKind,
935}
936
937/// Notifies the region the result of writing region change action.
938#[derive(Debug)]
939pub(crate) struct RegionChangeResult {
940    /// Region id.
941    pub(crate) region_id: RegionId,
942    /// The new region metadata to apply.
943    pub(crate) new_meta: RegionMetadataRef,
944    /// Result sender.
945    pub(crate) sender: OptionOutputTx,
946    /// Result from the manifest manager.
947    pub(crate) result: Result<()>,
948    /// Used for index build in schema change.
949    pub(crate) need_index: bool,
950}
951
952/// Request to edit a region directly.
953#[derive(Debug)]
954pub(crate) struct RegionEditRequest {
955    pub(crate) region_id: RegionId,
956    pub(crate) edit: RegionEdit,
957    /// The sender to notify the result to the region engine.
958    pub(crate) tx: Sender<Result<()>>,
959}
960
961/// Notifies the regin the result of editing region.
962#[derive(Debug)]
963pub(crate) struct RegionEditResult {
964    /// Region id.
965    pub(crate) region_id: RegionId,
966    /// Result sender.
967    pub(crate) sender: Sender<Result<()>>,
968    /// Region edit to apply.
969    pub(crate) edit: RegionEdit,
970    /// Result from the manifest manager.
971    pub(crate) result: Result<()>,
972}
973
974#[derive(Debug)]
975pub(crate) struct BuildIndexRequest {
976    pub(crate) region_id: RegionId,
977    pub(crate) build_type: IndexBuildType,
978    /// files need to build index, empty means all.
979    pub(crate) file_metas: Vec<FileMeta>,
980}
981
982#[derive(Debug)]
983pub(crate) struct RegionSyncRequest {
984    pub(crate) region_id: RegionId,
985    pub(crate) manifest_version: ManifestVersion,
986    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
987    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
988}
989
990#[cfg(test)]
991mod tests {
992    use api::v1::value::ValueData;
993    use api::v1::{Row, SemanticType};
994    use datatypes::prelude::ConcreteDataType;
995    use datatypes::schema::ColumnDefaultConstraint;
996    use mito_codec::test_util::i64_value;
997    use store_api::metadata::RegionMetadataBuilder;
998
999    use super::*;
1000    use crate::error::Error;
1001    use crate::test_util::ts_ms_value;
1002
1003    fn new_column_schema(
1004        name: &str,
1005        data_type: ColumnDataType,
1006        semantic_type: SemanticType,
1007    ) -> ColumnSchema {
1008        ColumnSchema {
1009            column_name: name.to_string(),
1010            datatype: data_type as i32,
1011            semantic_type: semantic_type as i32,
1012            ..Default::default()
1013        }
1014    }
1015
1016    fn check_invalid_request(err: &Error, expect: &str) {
1017        if let Error::InvalidRequest {
1018            region_id: _,
1019            reason,
1020            location: _,
1021        } = err
1022        {
1023            assert_eq!(reason, expect);
1024        } else {
1025            panic!("Unexpected error {err}")
1026        }
1027    }
1028
1029    #[test]
1030    fn test_write_request_duplicate_column() {
1031        let rows = Rows {
1032            schema: vec![
1033                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1034                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1035            ],
1036            rows: vec![],
1037        };
1038
1039        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1040        check_invalid_request(&err, "duplicate column c0");
1041    }
1042
1043    #[test]
1044    fn test_valid_write_request() {
1045        let rows = Rows {
1046            schema: vec![
1047                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1048                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1049            ],
1050            rows: vec![Row {
1051                values: vec![i64_value(1), i64_value(2)],
1052            }],
1053        };
1054
1055        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1056        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1057        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1058        assert_eq!(None, request.column_index_by_name("c2"));
1059    }
1060
1061    #[test]
1062    fn test_write_request_column_num() {
1063        let rows = Rows {
1064            schema: vec![
1065                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1066                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1067            ],
1068            rows: vec![Row {
1069                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1070            }],
1071        };
1072
1073        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1074        check_invalid_request(&err, "row has 3 columns but schema has 2");
1075    }
1076
1077    fn new_region_metadata() -> RegionMetadata {
1078        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1079        builder
1080            .push_column_metadata(ColumnMetadata {
1081                column_schema: datatypes::schema::ColumnSchema::new(
1082                    "ts",
1083                    ConcreteDataType::timestamp_millisecond_datatype(),
1084                    false,
1085                ),
1086                semantic_type: SemanticType::Timestamp,
1087                column_id: 1,
1088            })
1089            .push_column_metadata(ColumnMetadata {
1090                column_schema: datatypes::schema::ColumnSchema::new(
1091                    "k0",
1092                    ConcreteDataType::int64_datatype(),
1093                    true,
1094                ),
1095                semantic_type: SemanticType::Tag,
1096                column_id: 2,
1097            })
1098            .primary_key(vec![2]);
1099        builder.build().unwrap()
1100    }
1101
1102    #[test]
1103    fn test_check_schema() {
1104        let rows = Rows {
1105            schema: vec![
1106                new_column_schema(
1107                    "ts",
1108                    ColumnDataType::TimestampMillisecond,
1109                    SemanticType::Timestamp,
1110                ),
1111                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1112            ],
1113            rows: vec![Row {
1114                values: vec![ts_ms_value(1), i64_value(2)],
1115            }],
1116        };
1117        let metadata = new_region_metadata();
1118
1119        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1120        request.check_schema(&metadata).unwrap();
1121    }
1122
1123    #[test]
1124    fn test_column_type() {
1125        let rows = Rows {
1126            schema: vec![
1127                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1128                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1129            ],
1130            rows: vec![Row {
1131                values: vec![i64_value(1), i64_value(2)],
1132            }],
1133        };
1134        let metadata = new_region_metadata();
1135
1136        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1137        let err = request.check_schema(&metadata).unwrap_err();
1138        check_invalid_request(
1139            &err,
1140            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1141        );
1142    }
1143
1144    #[test]
1145    fn test_semantic_type() {
1146        let rows = Rows {
1147            schema: vec![
1148                new_column_schema(
1149                    "ts",
1150                    ColumnDataType::TimestampMillisecond,
1151                    SemanticType::Tag,
1152                ),
1153                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1154            ],
1155            rows: vec![Row {
1156                values: vec![ts_ms_value(1), i64_value(2)],
1157            }],
1158        };
1159        let metadata = new_region_metadata();
1160
1161        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1162        let err = request.check_schema(&metadata).unwrap_err();
1163        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1164    }
1165
1166    #[test]
1167    fn test_column_nullable() {
1168        let rows = Rows {
1169            schema: vec![
1170                new_column_schema(
1171                    "ts",
1172                    ColumnDataType::TimestampMillisecond,
1173                    SemanticType::Timestamp,
1174                ),
1175                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1176            ],
1177            rows: vec![Row {
1178                values: vec![Value { value_data: None }, i64_value(2)],
1179            }],
1180        };
1181        let metadata = new_region_metadata();
1182
1183        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1184        let err = request.check_schema(&metadata).unwrap_err();
1185        check_invalid_request(&err, "column ts is not null but input has null");
1186    }
1187
1188    #[test]
1189    fn test_column_default() {
1190        let rows = Rows {
1191            schema: vec![new_column_schema(
1192                "k0",
1193                ColumnDataType::Int64,
1194                SemanticType::Tag,
1195            )],
1196            rows: vec![Row {
1197                values: vec![i64_value(1)],
1198            }],
1199        };
1200        let metadata = new_region_metadata();
1201
1202        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1203        let err = request.check_schema(&metadata).unwrap_err();
1204        check_invalid_request(&err, "missing column ts");
1205    }
1206
1207    #[test]
1208    fn test_unknown_column() {
1209        let rows = Rows {
1210            schema: vec![
1211                new_column_schema(
1212                    "ts",
1213                    ColumnDataType::TimestampMillisecond,
1214                    SemanticType::Timestamp,
1215                ),
1216                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1217                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1218            ],
1219            rows: vec![Row {
1220                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1221            }],
1222        };
1223        let metadata = new_region_metadata();
1224
1225        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1226        let err = request.check_schema(&metadata).unwrap_err();
1227        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1228    }
1229
1230    #[test]
1231    fn test_fill_impure_columns_err() {
1232        let rows = Rows {
1233            schema: vec![new_column_schema(
1234                "k0",
1235                ColumnDataType::Int64,
1236                SemanticType::Tag,
1237            )],
1238            rows: vec![Row {
1239                values: vec![i64_value(1)],
1240            }],
1241        };
1242        let metadata = {
1243            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1244            builder
1245                .push_column_metadata(ColumnMetadata {
1246                    column_schema: datatypes::schema::ColumnSchema::new(
1247                        "ts",
1248                        ConcreteDataType::timestamp_millisecond_datatype(),
1249                        false,
1250                    )
1251                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1252                        "now()".to_string(),
1253                    )))
1254                    .unwrap(),
1255                    semantic_type: SemanticType::Timestamp,
1256                    column_id: 1,
1257                })
1258                .push_column_metadata(ColumnMetadata {
1259                    column_schema: datatypes::schema::ColumnSchema::new(
1260                        "k0",
1261                        ConcreteDataType::int64_datatype(),
1262                        true,
1263                    ),
1264                    semantic_type: SemanticType::Tag,
1265                    column_id: 2,
1266                })
1267                .primary_key(vec![2]);
1268            builder.build().unwrap()
1269        };
1270
1271        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1272        let err = request.check_schema(&metadata).unwrap_err();
1273        assert!(err.is_fill_default());
1274        assert!(
1275            request
1276                .fill_missing_columns(&metadata)
1277                .unwrap_err()
1278                .to_string()
1279                .contains("unexpected impure default value with region_id")
1280        );
1281    }
1282
1283    #[test]
1284    fn test_fill_missing_columns() {
1285        let rows = Rows {
1286            schema: vec![new_column_schema(
1287                "ts",
1288                ColumnDataType::TimestampMillisecond,
1289                SemanticType::Timestamp,
1290            )],
1291            rows: vec![Row {
1292                values: vec![ts_ms_value(1)],
1293            }],
1294        };
1295        let metadata = new_region_metadata();
1296
1297        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1298        let err = request.check_schema(&metadata).unwrap_err();
1299        assert!(err.is_fill_default());
1300        request.fill_missing_columns(&metadata).unwrap();
1301
1302        let expect_rows = Rows {
1303            schema: vec![new_column_schema(
1304                "ts",
1305                ColumnDataType::TimestampMillisecond,
1306                SemanticType::Timestamp,
1307            )],
1308            rows: vec![Row {
1309                values: vec![ts_ms_value(1)],
1310            }],
1311        };
1312        assert_eq!(expect_rows, request.rows);
1313    }
1314
1315    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1316        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1317        builder
1318            .push_column_metadata(ColumnMetadata {
1319                column_schema: datatypes::schema::ColumnSchema::new(
1320                    "ts",
1321                    ConcreteDataType::timestamp_millisecond_datatype(),
1322                    false,
1323                ),
1324                semantic_type: SemanticType::Timestamp,
1325                column_id: 1,
1326            })
1327            .push_column_metadata(ColumnMetadata {
1328                column_schema: datatypes::schema::ColumnSchema::new(
1329                    "k0",
1330                    ConcreteDataType::int64_datatype(),
1331                    true,
1332                ),
1333                semantic_type: SemanticType::Tag,
1334                column_id: 2,
1335            })
1336            .primary_key(vec![2]);
1337        builder
1338    }
1339
1340    fn region_metadata_two_fields() -> RegionMetadata {
1341        let mut builder = builder_with_ts_tag();
1342        builder
1343            .push_column_metadata(ColumnMetadata {
1344                column_schema: datatypes::schema::ColumnSchema::new(
1345                    "f0",
1346                    ConcreteDataType::int64_datatype(),
1347                    true,
1348                ),
1349                semantic_type: SemanticType::Field,
1350                column_id: 3,
1351            })
1352            // Column is not nullable.
1353            .push_column_metadata(ColumnMetadata {
1354                column_schema: datatypes::schema::ColumnSchema::new(
1355                    "f1",
1356                    ConcreteDataType::int64_datatype(),
1357                    false,
1358                )
1359                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1360                    datatypes::value::Value::Int64(100),
1361                )))
1362                .unwrap(),
1363                semantic_type: SemanticType::Field,
1364                column_id: 4,
1365            });
1366        builder.build().unwrap()
1367    }
1368
1369    #[test]
1370    fn test_fill_missing_for_delete() {
1371        let rows = Rows {
1372            schema: vec![new_column_schema(
1373                "ts",
1374                ColumnDataType::TimestampMillisecond,
1375                SemanticType::Timestamp,
1376            )],
1377            rows: vec![Row {
1378                values: vec![ts_ms_value(1)],
1379            }],
1380        };
1381        let metadata = region_metadata_two_fields();
1382
1383        let mut request =
1384            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1385        let err = request.check_schema(&metadata).unwrap_err();
1386        check_invalid_request(&err, "delete requests need column k0");
1387        let err = request.fill_missing_columns(&metadata).unwrap_err();
1388        check_invalid_request(&err, "delete requests need column k0");
1389
1390        let rows = Rows {
1391            schema: vec![
1392                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1393                new_column_schema(
1394                    "ts",
1395                    ColumnDataType::TimestampMillisecond,
1396                    SemanticType::Timestamp,
1397                ),
1398            ],
1399            rows: vec![Row {
1400                values: vec![i64_value(100), ts_ms_value(1)],
1401            }],
1402        };
1403        let mut request =
1404            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1405        let err = request.check_schema(&metadata).unwrap_err();
1406        assert!(err.is_fill_default());
1407        request.fill_missing_columns(&metadata).unwrap();
1408
1409        let expect_rows = Rows {
1410            schema: vec![
1411                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1412                new_column_schema(
1413                    "ts",
1414                    ColumnDataType::TimestampMillisecond,
1415                    SemanticType::Timestamp,
1416                ),
1417                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1418            ],
1419            // Column f1 is not nullable and we use 0 for padding.
1420            rows: vec![Row {
1421                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1422            }],
1423        };
1424        assert_eq!(expect_rows, request.rows);
1425    }
1426
1427    #[test]
1428    fn test_fill_missing_without_default_in_delete() {
1429        let mut builder = builder_with_ts_tag();
1430        builder
1431            // f0 is nullable.
1432            .push_column_metadata(ColumnMetadata {
1433                column_schema: datatypes::schema::ColumnSchema::new(
1434                    "f0",
1435                    ConcreteDataType::int64_datatype(),
1436                    true,
1437                ),
1438                semantic_type: SemanticType::Field,
1439                column_id: 3,
1440            })
1441            // f1 is not nullable and don't has default.
1442            .push_column_metadata(ColumnMetadata {
1443                column_schema: datatypes::schema::ColumnSchema::new(
1444                    "f1",
1445                    ConcreteDataType::int64_datatype(),
1446                    false,
1447                ),
1448                semantic_type: SemanticType::Field,
1449                column_id: 4,
1450            });
1451        let metadata = builder.build().unwrap();
1452
1453        let rows = Rows {
1454            schema: vec![
1455                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1456                new_column_schema(
1457                    "ts",
1458                    ColumnDataType::TimestampMillisecond,
1459                    SemanticType::Timestamp,
1460                ),
1461            ],
1462            // Missing f0 (nullable), f1 (not nullable).
1463            rows: vec![Row {
1464                values: vec![i64_value(100), ts_ms_value(1)],
1465            }],
1466        };
1467        let mut request =
1468            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1469        let err = request.check_schema(&metadata).unwrap_err();
1470        assert!(err.is_fill_default());
1471        request.fill_missing_columns(&metadata).unwrap();
1472
1473        let expect_rows = Rows {
1474            schema: vec![
1475                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1476                new_column_schema(
1477                    "ts",
1478                    ColumnDataType::TimestampMillisecond,
1479                    SemanticType::Timestamp,
1480                ),
1481                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1482            ],
1483            // Column f1 is not nullable and we use 0 for padding.
1484            rows: vec![Row {
1485                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1486            }],
1487        };
1488        assert_eq!(expect_rows, request.rows);
1489    }
1490
1491    #[test]
1492    fn test_no_default() {
1493        let rows = Rows {
1494            schema: vec![new_column_schema(
1495                "k0",
1496                ColumnDataType::Int64,
1497                SemanticType::Tag,
1498            )],
1499            rows: vec![Row {
1500                values: vec![i64_value(1)],
1501            }],
1502        };
1503        let metadata = new_region_metadata();
1504
1505        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1506        let err = request.fill_missing_columns(&metadata).unwrap_err();
1507        check_invalid_request(&err, "column ts does not have default value");
1508    }
1509
1510    #[test]
1511    fn test_missing_and_invalid() {
1512        // Missing f0 and f1 has invalid type (string).
1513        let rows = Rows {
1514            schema: vec![
1515                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1516                new_column_schema(
1517                    "ts",
1518                    ColumnDataType::TimestampMillisecond,
1519                    SemanticType::Timestamp,
1520                ),
1521                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1522            ],
1523            rows: vec![Row {
1524                values: vec![
1525                    i64_value(100),
1526                    ts_ms_value(1),
1527                    Value {
1528                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1529                    },
1530                ],
1531            }],
1532        };
1533        let metadata = region_metadata_two_fields();
1534
1535        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1536        let err = request.check_schema(&metadata).unwrap_err();
1537        check_invalid_request(
1538            &err,
1539            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1540        );
1541    }
1542
1543    #[test]
1544    fn test_write_request_metadata() {
1545        let rows = Rows {
1546            schema: vec![
1547                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1548                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1549            ],
1550            rows: vec![Row {
1551                values: vec![i64_value(1), i64_value(2)],
1552            }],
1553        };
1554
1555        let metadata = Arc::new(new_region_metadata());
1556        let request = WriteRequest::new(
1557            RegionId::new(1, 1),
1558            OpType::Put,
1559            rows,
1560            Some(metadata.clone()),
1561        )
1562        .unwrap();
1563
1564        assert!(request.region_metadata.is_some());
1565        assert_eq!(
1566            request.region_metadata.unwrap().region_id,
1567            RegionId::new(1, 1)
1568        );
1569    }
1570}