mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::MemtableId;
51use crate::metrics::COMPACTION_ELAPSED_TOTAL;
52use crate::wal::entry_distributor::WalEntryReceiver;
53use crate::wal::EntryId;
54
55/// Request to write a region.
56#[derive(Debug)]
57pub struct WriteRequest {
58    /// Region to write.
59    pub region_id: RegionId,
60    /// Type of the write request.
61    pub op_type: OpType,
62    /// Rows to write.
63    pub rows: Rows,
64    /// Map column name to column index in `rows`.
65    name_to_index: HashMap<String, usize>,
66    /// Whether each column has null.
67    has_null: Vec<bool>,
68    /// Write hint.
69    pub hint: Option<WriteHint>,
70    /// Region metadata on the time of this request is created.
71    pub(crate) region_metadata: Option<RegionMetadataRef>,
72}
73
74impl WriteRequest {
75    /// Creates a new request.
76    ///
77    /// Returns `Err` if `rows` are invalid.
78    pub fn new(
79        region_id: RegionId,
80        op_type: OpType,
81        rows: Rows,
82        region_metadata: Option<RegionMetadataRef>,
83    ) -> Result<WriteRequest> {
84        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
85        for (index, column) in rows.schema.iter().enumerate() {
86            ensure!(
87                name_to_index
88                    .insert(column.column_name.clone(), index)
89                    .is_none(),
90                InvalidRequestSnafu {
91                    region_id,
92                    reason: format!("duplicate column {}", column.column_name),
93                }
94            );
95        }
96
97        let mut has_null = vec![false; rows.schema.len()];
98        for row in &rows.rows {
99            ensure!(
100                row.values.len() == rows.schema.len(),
101                InvalidRequestSnafu {
102                    region_id,
103                    reason: format!(
104                        "row has {} columns but schema has {}",
105                        row.values.len(),
106                        rows.schema.len()
107                    ),
108                }
109            );
110
111            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
112                validate_proto_value(region_id, value, column_schema)?;
113
114                if value.value_data.is_none() {
115                    has_null[i] = true;
116                }
117            }
118        }
119
120        Ok(WriteRequest {
121            region_id,
122            op_type,
123            rows,
124            name_to_index,
125            has_null,
126            hint: None,
127            region_metadata,
128        })
129    }
130
131    /// Sets the write hint.
132    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
133        self.hint = hint;
134        self
135    }
136
137    /// Returns the encoding hint.
138    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
139        infer_primary_key_encoding_from_hint(self.hint.as_ref())
140    }
141
142    /// Returns estimated size of the request.
143    pub(crate) fn estimated_size(&self) -> usize {
144        let row_size = self
145            .rows
146            .rows
147            .first()
148            .map(|row| row.encoded_len())
149            .unwrap_or(0);
150        row_size * self.rows.rows.len()
151    }
152
153    /// Gets column index by name.
154    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
155        self.name_to_index.get(name).copied()
156    }
157
158    /// Checks schema of rows is compatible with schema of the region.
159    ///
160    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
161    /// error.
162    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
163        debug_assert_eq!(self.region_id, metadata.region_id);
164
165        let region_id = self.region_id;
166        // Index all columns in rows.
167        let mut rows_columns: HashMap<_, _> = self
168            .rows
169            .schema
170            .iter()
171            .map(|column| (&column.column_name, column))
172            .collect();
173
174        let mut need_fill_default = false;
175        // Checks all columns in this region.
176        for column in &metadata.column_metadatas {
177            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
178                // Check data type.
179                ensure!(
180                    is_column_type_value_eq(
181                        input_col.datatype,
182                        input_col.datatype_extension,
183                        &column.column_schema.data_type
184                    ),
185                    InvalidRequestSnafu {
186                        region_id,
187                        reason: format!(
188                            "column {} expect type {:?}, given: {}({})",
189                            column.column_schema.name,
190                            column.column_schema.data_type,
191                            ColumnDataType::try_from(input_col.datatype)
192                                .map(|v| v.as_str_name())
193                                .unwrap_or("Unknown"),
194                            input_col.datatype,
195                        )
196                    }
197                );
198
199                // Check semantic type.
200                ensure!(
201                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
202                    InvalidRequestSnafu {
203                        region_id,
204                        reason: format!(
205                            "column {} has semantic type {:?}, given: {}({})",
206                            column.column_schema.name,
207                            column.semantic_type,
208                            api::v1::SemanticType::try_from(input_col.semantic_type)
209                                .map(|v| v.as_str_name())
210                                .unwrap_or("Unknown"),
211                            input_col.semantic_type
212                        ),
213                    }
214                );
215
216                // Check nullable.
217                // Safety: `rows_columns` ensures this column exists.
218                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
219                ensure!(
220                    !has_null || column.column_schema.is_nullable(),
221                    InvalidRequestSnafu {
222                        region_id,
223                        reason: format!(
224                            "column {} is not null but input has null",
225                            column.column_schema.name
226                        ),
227                    }
228                );
229            } else {
230                // Rows don't have this column.
231                self.check_missing_column(column)?;
232
233                need_fill_default = true;
234            }
235        }
236
237        // Checks all columns in rows exist in the region.
238        if !rows_columns.is_empty() {
239            let names: Vec<_> = rows_columns.into_keys().collect();
240            return InvalidRequestSnafu {
241                region_id,
242                reason: format!("unknown columns: {:?}", names),
243            }
244            .fail();
245        }
246
247        // If we need to fill default values, return a special error.
248        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
249
250        Ok(())
251    }
252
253    /// Tries to fill missing columns.
254    ///
255    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
256    /// values.
257    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
258        debug_assert_eq!(self.region_id, metadata.region_id);
259
260        let mut columns_to_fill = vec![];
261        for column in &metadata.column_metadatas {
262            if !self.name_to_index.contains_key(&column.column_schema.name) {
263                columns_to_fill.push(column);
264            }
265        }
266        self.fill_columns(columns_to_fill)?;
267
268        Ok(())
269    }
270
271    /// Checks the schema and fill missing columns.
272    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
273        if let Err(e) = self.check_schema(metadata) {
274            if e.is_fill_default() {
275                // TODO(yingwen): Add metrics for this case.
276                // We need to fill default value. The write request may be a request
277                // sent before changing the schema.
278                self.fill_missing_columns(metadata)?;
279            } else {
280                return Err(e);
281            }
282        }
283
284        Ok(())
285    }
286
287    /// Fills default value for specific `columns`.
288    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
289        let mut default_values = Vec::with_capacity(columns.len());
290        let mut columns_to_fill = Vec::with_capacity(columns.len());
291        for column in columns {
292            let default_value = self.column_default_value(column)?;
293            if default_value.value_data.is_some() {
294                default_values.push(default_value);
295                columns_to_fill.push(column);
296            }
297        }
298
299        for row in &mut self.rows.rows {
300            row.values.extend(default_values.iter().cloned());
301        }
302
303        for column in columns_to_fill {
304            let (datatype, datatype_ext) =
305                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
306                    .with_context(|_| ConvertColumnDataTypeSnafu {
307                        reason: format!(
308                            "no protobuf type for column {} ({:?})",
309                            column.column_schema.name, column.column_schema.data_type
310                        ),
311                    })?
312                    .to_parts();
313            self.rows.schema.push(ColumnSchema {
314                column_name: column.column_schema.name.clone(),
315                datatype: datatype as i32,
316                semantic_type: column.semantic_type as i32,
317                datatype_extension: datatype_ext,
318                options: options_from_column_schema(&column.column_schema),
319            });
320        }
321
322        Ok(())
323    }
324
325    /// Checks whether we should allow a row doesn't provide this column.
326    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
327        if self.op_type == OpType::Delete {
328            if column.semantic_type == SemanticType::Field {
329                // For delete request, all tags and timestamp is required. We don't fill default
330                // tag or timestamp while deleting rows.
331                return Ok(());
332            } else {
333                return InvalidRequestSnafu {
334                    region_id: self.region_id,
335                    reason: format!("delete requests need column {}", column.column_schema.name),
336                }
337                .fail();
338            }
339        }
340
341        // Not a delete request. Checks whether they have default value.
342        ensure!(
343            column.column_schema.is_nullable()
344                || column.column_schema.default_constraint().is_some(),
345            InvalidRequestSnafu {
346                region_id: self.region_id,
347                reason: format!("missing column {}", column.column_schema.name),
348            }
349        );
350
351        Ok(())
352    }
353
354    /// Returns the default value for specific column.
355    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
356        let default_value = match self.op_type {
357            OpType::Delete => {
358                ensure!(
359                    column.semantic_type == SemanticType::Field,
360                    InvalidRequestSnafu {
361                        region_id: self.region_id,
362                        reason: format!(
363                            "delete requests need column {}",
364                            column.column_schema.name
365                        ),
366                    }
367                );
368
369                // For delete request, we need a default value for padding so we
370                // can delete a row even a field doesn't have a default value. So the
371                // value doesn't need to following the default value constraint of the
372                // column.
373                if column.column_schema.is_nullable() {
374                    datatypes::value::Value::Null
375                } else {
376                    column.column_schema.data_type.default_value()
377                }
378            }
379            OpType::Put => {
380                // For put requests, we use the default value from column schema.
381                if column.column_schema.is_default_impure() {
382                    UnexpectedImpureDefaultSnafu {
383                        region_id: self.region_id,
384                        column: &column.column_schema.name,
385                        default_value: format!("{:?}", column.column_schema.default_constraint()),
386                    }
387                    .fail()?
388                }
389                column
390                    .column_schema
391                    .create_default()
392                    .context(CreateDefaultSnafu {
393                        region_id: self.region_id,
394                        column: &column.column_schema.name,
395                    })?
396                    // This column doesn't have default value.
397                    .with_context(|| InvalidRequestSnafu {
398                        region_id: self.region_id,
399                        reason: format!(
400                            "column {} does not have default value",
401                            column.column_schema.name
402                        ),
403                    })?
404            }
405        };
406
407        // Convert default value into proto's value.
408        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
409            region_id: self.region_id,
410            reason: format!(
411                "no protobuf type for default value of column {} ({:?})",
412                column.column_schema.name, column.column_schema.data_type
413            ),
414        })
415    }
416}
417
418/// Validate proto value schema.
419pub(crate) fn validate_proto_value(
420    region_id: RegionId,
421    value: &Value,
422    column_schema: &ColumnSchema,
423) -> Result<()> {
424    if let Some(value_type) = proto_value_type(value) {
425        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426            InvalidRequestSnafu {
427                region_id,
428                reason: format!(
429                    "column {} has unknown type {}",
430                    column_schema.column_name, column_schema.datatype
431                ),
432            }
433            .build()
434        })?;
435        ensure!(
436            proto_value_type_match(column_type, value_type),
437            InvalidRequestSnafu {
438                region_id,
439                reason: format!(
440                    "value has type {:?}, but column {} has type {:?}({})",
441                    value_type, column_schema.column_name, column_type, column_schema.datatype,
442                ),
443            }
444        );
445    }
446
447    Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451    match (column_type, value_type) {
452        (ct, vt) if ct == vt => true,
453        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454        (ColumnDataType::Json, ColumnDataType::Binary) => true,
455        _ => false,
456    }
457}
458
459/// Oneshot output result sender.
460#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464    /// Creates a new output sender.
465    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466        OutputTx(sender)
467    }
468
469    /// Sends the `result`.
470    pub(crate) fn send(self, result: Result<AffectedRows>) {
471        // Ignores send result.
472        let _ = self.0.send(result);
473    }
474}
475
476/// Optional output result sender.
477#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481    /// Creates a sender.
482    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483        OptionOutputTx(sender)
484    }
485
486    /// Creates an empty sender.
487    pub(crate) fn none() -> OptionOutputTx {
488        OptionOutputTx(None)
489    }
490
491    /// Sends the `result` and consumes the inner sender.
492    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493        if let Some(sender) = self.0.take() {
494            sender.send(result);
495        }
496    }
497
498    /// Sends the `result` and consumes the sender.
499    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500        if let Some(sender) = self.0.take() {
501            sender.send(result);
502        }
503    }
504
505    /// Takes the inner sender.
506    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507        self.0.take()
508    }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513        Self::new(Some(OutputTx::new(sender)))
514    }
515}
516
517impl OnFailure for OptionOutputTx {
518    fn on_failure(&mut self, err: Error) {
519        self.send_mut(Err(err));
520    }
521}
522
523/// Callback on failure.
524pub(crate) trait OnFailure {
525    /// Handles `err` on failure.
526    fn on_failure(&mut self, err: Error);
527}
528
529/// Sender and write request.
530#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532    /// Result sender.
533    pub(crate) sender: OptionOutputTx,
534    pub(crate) request: WriteRequest,
535}
536
537/// Request sent to a worker
538#[derive(Debug)]
539pub(crate) enum WorkerRequest {
540    /// Write to a region.
541    Write(SenderWriteRequest),
542
543    /// Ddl request to a region.
544    Ddl(SenderDdlRequest),
545
546    /// Notifications from internal background jobs.
547    Background {
548        /// Id of the region to send.
549        region_id: RegionId,
550        /// Internal notification.
551        notify: BackgroundNotify,
552    },
553
554    /// The internal commands.
555    SetRegionRoleStateGracefully {
556        /// Id of the region to send.
557        region_id: RegionId,
558        /// The [SettableRegionRoleState].
559        region_role_state: SettableRegionRoleState,
560        /// The sender of [SetReadonlyResponse].
561        sender: Sender<SetRegionRoleStateResponse>,
562    },
563
564    /// Notify a worker to stop.
565    Stop,
566
567    /// Use [RegionEdit] to edit a region directly.
568    EditRegion(RegionEditRequest),
569
570    /// Keep the manifest of a region up to date.
571    SyncRegion(RegionSyncRequest),
572
573    /// Bulk inserts request and region metadata.
574    BulkInserts {
575        metadata: Option<RegionMetadataRef>,
576        request: RegionBulkInsertsRequest,
577        sender: OptionOutputTx,
578    },
579}
580
581impl WorkerRequest {
582    pub(crate) fn new_open_region_request(
583        region_id: RegionId,
584        request: RegionOpenRequest,
585        entry_receiver: Option<WalEntryReceiver>,
586    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
587        let (sender, receiver) = oneshot::channel();
588
589        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
590            region_id,
591            sender: sender.into(),
592            request: DdlRequest::Open((request, entry_receiver)),
593        });
594
595        (worker_request, receiver)
596    }
597
598    /// Converts request from a [RegionRequest].
599    pub(crate) fn try_from_region_request(
600        region_id: RegionId,
601        value: RegionRequest,
602        region_metadata: Option<RegionMetadataRef>,
603    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
604        let (sender, receiver) = oneshot::channel();
605        let worker_request = match value {
606            RegionRequest::Put(v) => {
607                let mut write_request =
608                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
609                        .with_hint(v.hint);
610                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
611                    && let Some(region_metadata) = &region_metadata
612                {
613                    write_request.maybe_fill_missing_columns(region_metadata)?;
614                }
615                WorkerRequest::Write(SenderWriteRequest {
616                    sender: sender.into(),
617                    request: write_request,
618                })
619            }
620            RegionRequest::Delete(v) => {
621                let mut write_request =
622                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
623                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
624                    && let Some(region_metadata) = &region_metadata
625                {
626                    write_request.maybe_fill_missing_columns(region_metadata)?;
627                }
628                WorkerRequest::Write(SenderWriteRequest {
629                    sender: sender.into(),
630                    request: write_request,
631                })
632            }
633            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
634                region_id,
635                sender: sender.into(),
636                request: DdlRequest::Create(v),
637            }),
638            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
639                region_id,
640                sender: sender.into(),
641                request: DdlRequest::Drop,
642            }),
643            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
644                region_id,
645                sender: sender.into(),
646                request: DdlRequest::Open((v, None)),
647            }),
648            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
649                region_id,
650                sender: sender.into(),
651                request: DdlRequest::Close(v),
652            }),
653            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
654                region_id,
655                sender: sender.into(),
656                request: DdlRequest::Alter(v),
657            }),
658            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
659                region_id,
660                sender: sender.into(),
661                request: DdlRequest::Flush(v),
662            }),
663            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
664                region_id,
665                sender: sender.into(),
666                request: DdlRequest::Compact(v),
667            }),
668            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
669                region_id,
670                sender: sender.into(),
671                request: DdlRequest::Truncate(v),
672            }),
673            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
674                region_id,
675                sender: sender.into(),
676                request: DdlRequest::Catchup(v),
677            }),
678            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
679                metadata: region_metadata,
680                sender: sender.into(),
681                request: region_bulk_inserts_request,
682            },
683        };
684
685        Ok((worker_request, receiver))
686    }
687
688    pub(crate) fn new_set_readonly_gracefully(
689        region_id: RegionId,
690        region_role_state: SettableRegionRoleState,
691    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
692        let (sender, receiver) = oneshot::channel();
693
694        (
695            WorkerRequest::SetRegionRoleStateGracefully {
696                region_id,
697                region_role_state,
698                sender,
699            },
700            receiver,
701        )
702    }
703
704    pub(crate) fn new_sync_region_request(
705        region_id: RegionId,
706        manifest_version: ManifestVersion,
707    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
708        let (sender, receiver) = oneshot::channel();
709        (
710            WorkerRequest::SyncRegion(RegionSyncRequest {
711                region_id,
712                manifest_version,
713                sender,
714            }),
715            receiver,
716        )
717    }
718}
719
720/// DDL request to a region.
721#[derive(Debug)]
722pub(crate) enum DdlRequest {
723    Create(RegionCreateRequest),
724    Drop,
725    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
726    Close(RegionCloseRequest),
727    Alter(RegionAlterRequest),
728    Flush(RegionFlushRequest),
729    Compact(RegionCompactRequest),
730    Truncate(RegionTruncateRequest),
731    Catchup(RegionCatchupRequest),
732}
733
734/// Sender and Ddl request.
735#[derive(Debug)]
736pub(crate) struct SenderDdlRequest {
737    /// Region id of the request.
738    pub(crate) region_id: RegionId,
739    /// Result sender.
740    pub(crate) sender: OptionOutputTx,
741    /// Ddl request.
742    pub(crate) request: DdlRequest,
743}
744
745/// Notification from a background job.
746#[derive(Debug)]
747pub(crate) enum BackgroundNotify {
748    /// Flush has finished.
749    FlushFinished(FlushFinished),
750    /// Flush has failed.
751    FlushFailed(FlushFailed),
752    /// Compaction has finished.
753    CompactionFinished(CompactionFinished),
754    /// Compaction has failed.
755    CompactionFailed(CompactionFailed),
756    /// Truncate result.
757    Truncate(TruncateResult),
758    /// Region change result.
759    RegionChange(RegionChangeResult),
760    /// Region edit result.
761    RegionEdit(RegionEditResult),
762}
763
764/// Notifies a flush job is finished.
765#[derive(Debug)]
766pub(crate) struct FlushFinished {
767    /// Region id.
768    pub(crate) region_id: RegionId,
769    /// Entry id of flushed data.
770    pub(crate) flushed_entry_id: EntryId,
771    /// Flush result senders.
772    pub(crate) senders: Vec<OutputTx>,
773    /// Flush timer.
774    pub(crate) _timer: HistogramTimer,
775    /// Region edit to apply.
776    pub(crate) edit: RegionEdit,
777    /// Memtables to remove.
778    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
779}
780
781impl FlushFinished {
782    /// Marks the flush job as successful and observes the timer.
783    pub(crate) fn on_success(self) {
784        for sender in self.senders {
785            sender.send(Ok(0));
786        }
787    }
788}
789
790impl OnFailure for FlushFinished {
791    fn on_failure(&mut self, err: Error) {
792        let err = Arc::new(err);
793        for sender in self.senders.drain(..) {
794            sender.send(Err(err.clone()).context(FlushRegionSnafu {
795                region_id: self.region_id,
796            }));
797        }
798    }
799}
800
801/// Notifies a flush job is failed.
802#[derive(Debug)]
803pub(crate) struct FlushFailed {
804    /// The error source of the failure.
805    pub(crate) err: Arc<Error>,
806}
807
808/// Notifies a compaction job has finished.
809#[derive(Debug)]
810pub(crate) struct CompactionFinished {
811    /// Region id.
812    pub(crate) region_id: RegionId,
813    /// Compaction result senders.
814    pub(crate) senders: Vec<OutputTx>,
815    /// Start time of compaction task.
816    pub(crate) start_time: Instant,
817    /// Region edit to apply.
818    pub(crate) edit: RegionEdit,
819}
820
821impl CompactionFinished {
822    pub fn on_success(self) {
823        // only update compaction time on success
824        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
825
826        for sender in self.senders {
827            sender.send(Ok(0));
828        }
829        info!("Successfully compacted region: {}", self.region_id);
830    }
831}
832
833impl OnFailure for CompactionFinished {
834    /// Compaction succeeded but failed to update manifest or region's already been dropped.
835    fn on_failure(&mut self, err: Error) {
836        let err = Arc::new(err);
837        for sender in self.senders.drain(..) {
838            sender.send(Err(err.clone()).context(CompactRegionSnafu {
839                region_id: self.region_id,
840            }));
841        }
842    }
843}
844
845/// A failing compaction result.
846#[derive(Debug)]
847pub(crate) struct CompactionFailed {
848    pub(crate) region_id: RegionId,
849    /// The error source of the failure.
850    pub(crate) err: Arc<Error>,
851}
852
853/// Notifies the truncate result of a region.
854#[derive(Debug)]
855pub(crate) struct TruncateResult {
856    /// Region id.
857    pub(crate) region_id: RegionId,
858    /// Result sender.
859    pub(crate) sender: OptionOutputTx,
860    /// Truncate result.
861    pub(crate) result: Result<()>,
862    /// Truncated entry id.
863    pub(crate) truncated_entry_id: EntryId,
864    /// Truncated sequence.
865    pub(crate) truncated_sequence: SequenceNumber,
866}
867
868/// Notifies the region the result of writing region change action.
869#[derive(Debug)]
870pub(crate) struct RegionChangeResult {
871    /// Region id.
872    pub(crate) region_id: RegionId,
873    /// The new region metadata to apply.
874    pub(crate) new_meta: RegionMetadataRef,
875    /// Result sender.
876    pub(crate) sender: OptionOutputTx,
877    /// Result from the manifest manager.
878    pub(crate) result: Result<()>,
879}
880
881/// Request to edit a region directly.
882#[derive(Debug)]
883pub(crate) struct RegionEditRequest {
884    pub(crate) region_id: RegionId,
885    pub(crate) edit: RegionEdit,
886    /// The sender to notify the result to the region engine.
887    pub(crate) tx: Sender<Result<()>>,
888}
889
890/// Notifies the regin the result of editing region.
891#[derive(Debug)]
892pub(crate) struct RegionEditResult {
893    /// Region id.
894    pub(crate) region_id: RegionId,
895    /// Result sender.
896    pub(crate) sender: Sender<Result<()>>,
897    /// Region edit to apply.
898    pub(crate) edit: RegionEdit,
899    /// Result from the manifest manager.
900    pub(crate) result: Result<()>,
901}
902
903#[derive(Debug)]
904pub(crate) struct RegionSyncRequest {
905    pub(crate) region_id: RegionId,
906    pub(crate) manifest_version: ManifestVersion,
907    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
908    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
909}
910
911#[cfg(test)]
912mod tests {
913    use api::v1::value::ValueData;
914    use api::v1::{Row, SemanticType};
915    use datatypes::prelude::ConcreteDataType;
916    use datatypes::schema::ColumnDefaultConstraint;
917    use store_api::metadata::RegionMetadataBuilder;
918
919    use super::*;
920    use crate::error::Error;
921    use crate::test_util::{i64_value, ts_ms_value};
922
923    fn new_column_schema(
924        name: &str,
925        data_type: ColumnDataType,
926        semantic_type: SemanticType,
927    ) -> ColumnSchema {
928        ColumnSchema {
929            column_name: name.to_string(),
930            datatype: data_type as i32,
931            semantic_type: semantic_type as i32,
932            ..Default::default()
933        }
934    }
935
936    fn check_invalid_request(err: &Error, expect: &str) {
937        if let Error::InvalidRequest {
938            region_id: _,
939            reason,
940            location: _,
941        } = err
942        {
943            assert_eq!(reason, expect);
944        } else {
945            panic!("Unexpected error {err}")
946        }
947    }
948
949    #[test]
950    fn test_write_request_duplicate_column() {
951        let rows = Rows {
952            schema: vec![
953                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
954                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
955            ],
956            rows: vec![],
957        };
958
959        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
960        check_invalid_request(&err, "duplicate column c0");
961    }
962
963    #[test]
964    fn test_valid_write_request() {
965        let rows = Rows {
966            schema: vec![
967                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
968                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
969            ],
970            rows: vec![Row {
971                values: vec![i64_value(1), i64_value(2)],
972            }],
973        };
974
975        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
976        assert_eq!(0, request.column_index_by_name("c0").unwrap());
977        assert_eq!(1, request.column_index_by_name("c1").unwrap());
978        assert_eq!(None, request.column_index_by_name("c2"));
979    }
980
981    #[test]
982    fn test_write_request_column_num() {
983        let rows = Rows {
984            schema: vec![
985                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
986                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
987            ],
988            rows: vec![Row {
989                values: vec![i64_value(1), i64_value(2), i64_value(3)],
990            }],
991        };
992
993        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
994        check_invalid_request(&err, "row has 3 columns but schema has 2");
995    }
996
997    fn new_region_metadata() -> RegionMetadata {
998        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
999        builder
1000            .push_column_metadata(ColumnMetadata {
1001                column_schema: datatypes::schema::ColumnSchema::new(
1002                    "ts",
1003                    ConcreteDataType::timestamp_millisecond_datatype(),
1004                    false,
1005                ),
1006                semantic_type: SemanticType::Timestamp,
1007                column_id: 1,
1008            })
1009            .push_column_metadata(ColumnMetadata {
1010                column_schema: datatypes::schema::ColumnSchema::new(
1011                    "k0",
1012                    ConcreteDataType::int64_datatype(),
1013                    true,
1014                ),
1015                semantic_type: SemanticType::Tag,
1016                column_id: 2,
1017            })
1018            .primary_key(vec![2]);
1019        builder.build().unwrap()
1020    }
1021
1022    #[test]
1023    fn test_check_schema() {
1024        let rows = Rows {
1025            schema: vec![
1026                new_column_schema(
1027                    "ts",
1028                    ColumnDataType::TimestampMillisecond,
1029                    SemanticType::Timestamp,
1030                ),
1031                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1032            ],
1033            rows: vec![Row {
1034                values: vec![ts_ms_value(1), i64_value(2)],
1035            }],
1036        };
1037        let metadata = new_region_metadata();
1038
1039        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1040        request.check_schema(&metadata).unwrap();
1041    }
1042
1043    #[test]
1044    fn test_column_type() {
1045        let rows = Rows {
1046            schema: vec![
1047                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1048                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1049            ],
1050            rows: vec![Row {
1051                values: vec![i64_value(1), i64_value(2)],
1052            }],
1053        };
1054        let metadata = new_region_metadata();
1055
1056        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1057        let err = request.check_schema(&metadata).unwrap_err();
1058        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1059    }
1060
1061    #[test]
1062    fn test_semantic_type() {
1063        let rows = Rows {
1064            schema: vec![
1065                new_column_schema(
1066                    "ts",
1067                    ColumnDataType::TimestampMillisecond,
1068                    SemanticType::Tag,
1069                ),
1070                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1071            ],
1072            rows: vec![Row {
1073                values: vec![ts_ms_value(1), i64_value(2)],
1074            }],
1075        };
1076        let metadata = new_region_metadata();
1077
1078        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1079        let err = request.check_schema(&metadata).unwrap_err();
1080        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1081    }
1082
1083    #[test]
1084    fn test_column_nullable() {
1085        let rows = Rows {
1086            schema: vec![
1087                new_column_schema(
1088                    "ts",
1089                    ColumnDataType::TimestampMillisecond,
1090                    SemanticType::Timestamp,
1091                ),
1092                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1093            ],
1094            rows: vec![Row {
1095                values: vec![Value { value_data: None }, i64_value(2)],
1096            }],
1097        };
1098        let metadata = new_region_metadata();
1099
1100        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1101        let err = request.check_schema(&metadata).unwrap_err();
1102        check_invalid_request(&err, "column ts is not null but input has null");
1103    }
1104
1105    #[test]
1106    fn test_column_default() {
1107        let rows = Rows {
1108            schema: vec![new_column_schema(
1109                "k0",
1110                ColumnDataType::Int64,
1111                SemanticType::Tag,
1112            )],
1113            rows: vec![Row {
1114                values: vec![i64_value(1)],
1115            }],
1116        };
1117        let metadata = new_region_metadata();
1118
1119        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1120        let err = request.check_schema(&metadata).unwrap_err();
1121        check_invalid_request(&err, "missing column ts");
1122    }
1123
1124    #[test]
1125    fn test_unknown_column() {
1126        let rows = Rows {
1127            schema: vec![
1128                new_column_schema(
1129                    "ts",
1130                    ColumnDataType::TimestampMillisecond,
1131                    SemanticType::Timestamp,
1132                ),
1133                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1134                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1135            ],
1136            rows: vec![Row {
1137                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1138            }],
1139        };
1140        let metadata = new_region_metadata();
1141
1142        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1143        let err = request.check_schema(&metadata).unwrap_err();
1144        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1145    }
1146
1147    #[test]
1148    fn test_fill_impure_columns_err() {
1149        let rows = Rows {
1150            schema: vec![new_column_schema(
1151                "k0",
1152                ColumnDataType::Int64,
1153                SemanticType::Tag,
1154            )],
1155            rows: vec![Row {
1156                values: vec![i64_value(1)],
1157            }],
1158        };
1159        let metadata = {
1160            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1161            builder
1162                .push_column_metadata(ColumnMetadata {
1163                    column_schema: datatypes::schema::ColumnSchema::new(
1164                        "ts",
1165                        ConcreteDataType::timestamp_millisecond_datatype(),
1166                        false,
1167                    )
1168                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1169                        "now()".to_string(),
1170                    )))
1171                    .unwrap(),
1172                    semantic_type: SemanticType::Timestamp,
1173                    column_id: 1,
1174                })
1175                .push_column_metadata(ColumnMetadata {
1176                    column_schema: datatypes::schema::ColumnSchema::new(
1177                        "k0",
1178                        ConcreteDataType::int64_datatype(),
1179                        true,
1180                    ),
1181                    semantic_type: SemanticType::Tag,
1182                    column_id: 2,
1183                })
1184                .primary_key(vec![2]);
1185            builder.build().unwrap()
1186        };
1187
1188        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1189        let err = request.check_schema(&metadata).unwrap_err();
1190        assert!(err.is_fill_default());
1191        assert!(request
1192            .fill_missing_columns(&metadata)
1193            .unwrap_err()
1194            .to_string()
1195            .contains("Unexpected impure default value with region_id"));
1196    }
1197
1198    #[test]
1199    fn test_fill_missing_columns() {
1200        let rows = Rows {
1201            schema: vec![new_column_schema(
1202                "ts",
1203                ColumnDataType::TimestampMillisecond,
1204                SemanticType::Timestamp,
1205            )],
1206            rows: vec![Row {
1207                values: vec![ts_ms_value(1)],
1208            }],
1209        };
1210        let metadata = new_region_metadata();
1211
1212        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1213        let err = request.check_schema(&metadata).unwrap_err();
1214        assert!(err.is_fill_default());
1215        request.fill_missing_columns(&metadata).unwrap();
1216
1217        let expect_rows = Rows {
1218            schema: vec![new_column_schema(
1219                "ts",
1220                ColumnDataType::TimestampMillisecond,
1221                SemanticType::Timestamp,
1222            )],
1223            rows: vec![Row {
1224                values: vec![ts_ms_value(1)],
1225            }],
1226        };
1227        assert_eq!(expect_rows, request.rows);
1228    }
1229
1230    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1231        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1232        builder
1233            .push_column_metadata(ColumnMetadata {
1234                column_schema: datatypes::schema::ColumnSchema::new(
1235                    "ts",
1236                    ConcreteDataType::timestamp_millisecond_datatype(),
1237                    false,
1238                ),
1239                semantic_type: SemanticType::Timestamp,
1240                column_id: 1,
1241            })
1242            .push_column_metadata(ColumnMetadata {
1243                column_schema: datatypes::schema::ColumnSchema::new(
1244                    "k0",
1245                    ConcreteDataType::int64_datatype(),
1246                    true,
1247                ),
1248                semantic_type: SemanticType::Tag,
1249                column_id: 2,
1250            })
1251            .primary_key(vec![2]);
1252        builder
1253    }
1254
1255    fn region_metadata_two_fields() -> RegionMetadata {
1256        let mut builder = builder_with_ts_tag();
1257        builder
1258            .push_column_metadata(ColumnMetadata {
1259                column_schema: datatypes::schema::ColumnSchema::new(
1260                    "f0",
1261                    ConcreteDataType::int64_datatype(),
1262                    true,
1263                ),
1264                semantic_type: SemanticType::Field,
1265                column_id: 3,
1266            })
1267            // Column is not nullable.
1268            .push_column_metadata(ColumnMetadata {
1269                column_schema: datatypes::schema::ColumnSchema::new(
1270                    "f1",
1271                    ConcreteDataType::int64_datatype(),
1272                    false,
1273                )
1274                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1275                    datatypes::value::Value::Int64(100),
1276                )))
1277                .unwrap(),
1278                semantic_type: SemanticType::Field,
1279                column_id: 4,
1280            });
1281        builder.build().unwrap()
1282    }
1283
1284    #[test]
1285    fn test_fill_missing_for_delete() {
1286        let rows = Rows {
1287            schema: vec![new_column_schema(
1288                "ts",
1289                ColumnDataType::TimestampMillisecond,
1290                SemanticType::Timestamp,
1291            )],
1292            rows: vec![Row {
1293                values: vec![ts_ms_value(1)],
1294            }],
1295        };
1296        let metadata = region_metadata_two_fields();
1297
1298        let mut request =
1299            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1300        let err = request.check_schema(&metadata).unwrap_err();
1301        check_invalid_request(&err, "delete requests need column k0");
1302        let err = request.fill_missing_columns(&metadata).unwrap_err();
1303        check_invalid_request(&err, "delete requests need column k0");
1304
1305        let rows = Rows {
1306            schema: vec![
1307                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1308                new_column_schema(
1309                    "ts",
1310                    ColumnDataType::TimestampMillisecond,
1311                    SemanticType::Timestamp,
1312                ),
1313            ],
1314            rows: vec![Row {
1315                values: vec![i64_value(100), ts_ms_value(1)],
1316            }],
1317        };
1318        let mut request =
1319            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1320        let err = request.check_schema(&metadata).unwrap_err();
1321        assert!(err.is_fill_default());
1322        request.fill_missing_columns(&metadata).unwrap();
1323
1324        let expect_rows = Rows {
1325            schema: vec![
1326                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1327                new_column_schema(
1328                    "ts",
1329                    ColumnDataType::TimestampMillisecond,
1330                    SemanticType::Timestamp,
1331                ),
1332                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1333            ],
1334            // Column f1 is not nullable and we use 0 for padding.
1335            rows: vec![Row {
1336                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1337            }],
1338        };
1339        assert_eq!(expect_rows, request.rows);
1340    }
1341
1342    #[test]
1343    fn test_fill_missing_without_default_in_delete() {
1344        let mut builder = builder_with_ts_tag();
1345        builder
1346            // f0 is nullable.
1347            .push_column_metadata(ColumnMetadata {
1348                column_schema: datatypes::schema::ColumnSchema::new(
1349                    "f0",
1350                    ConcreteDataType::int64_datatype(),
1351                    true,
1352                ),
1353                semantic_type: SemanticType::Field,
1354                column_id: 3,
1355            })
1356            // f1 is not nullable and don't has default.
1357            .push_column_metadata(ColumnMetadata {
1358                column_schema: datatypes::schema::ColumnSchema::new(
1359                    "f1",
1360                    ConcreteDataType::int64_datatype(),
1361                    false,
1362                ),
1363                semantic_type: SemanticType::Field,
1364                column_id: 4,
1365            });
1366        let metadata = builder.build().unwrap();
1367
1368        let rows = Rows {
1369            schema: vec![
1370                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1371                new_column_schema(
1372                    "ts",
1373                    ColumnDataType::TimestampMillisecond,
1374                    SemanticType::Timestamp,
1375                ),
1376            ],
1377            // Missing f0 (nullable), f1 (not nullable).
1378            rows: vec![Row {
1379                values: vec![i64_value(100), ts_ms_value(1)],
1380            }],
1381        };
1382        let mut request =
1383            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1384        let err = request.check_schema(&metadata).unwrap_err();
1385        assert!(err.is_fill_default());
1386        request.fill_missing_columns(&metadata).unwrap();
1387
1388        let expect_rows = Rows {
1389            schema: vec![
1390                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1391                new_column_schema(
1392                    "ts",
1393                    ColumnDataType::TimestampMillisecond,
1394                    SemanticType::Timestamp,
1395                ),
1396                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1397            ],
1398            // Column f1 is not nullable and we use 0 for padding.
1399            rows: vec![Row {
1400                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1401            }],
1402        };
1403        assert_eq!(expect_rows, request.rows);
1404    }
1405
1406    #[test]
1407    fn test_no_default() {
1408        let rows = Rows {
1409            schema: vec![new_column_schema(
1410                "k0",
1411                ColumnDataType::Int64,
1412                SemanticType::Tag,
1413            )],
1414            rows: vec![Row {
1415                values: vec![i64_value(1)],
1416            }],
1417        };
1418        let metadata = new_region_metadata();
1419
1420        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1421        let err = request.fill_missing_columns(&metadata).unwrap_err();
1422        check_invalid_request(&err, "column ts does not have default value");
1423    }
1424
1425    #[test]
1426    fn test_missing_and_invalid() {
1427        // Missing f0 and f1 has invalid type (string).
1428        let rows = Rows {
1429            schema: vec![
1430                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1431                new_column_schema(
1432                    "ts",
1433                    ColumnDataType::TimestampMillisecond,
1434                    SemanticType::Timestamp,
1435                ),
1436                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1437            ],
1438            rows: vec![Row {
1439                values: vec![
1440                    i64_value(100),
1441                    ts_ms_value(1),
1442                    Value {
1443                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1444                    },
1445                ],
1446            }],
1447        };
1448        let metadata = region_metadata_two_fields();
1449
1450        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1451        let err = request.check_schema(&metadata).unwrap_err();
1452        check_invalid_request(
1453            &err,
1454            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1455        );
1456    }
1457
1458    #[test]
1459    fn test_write_request_metadata() {
1460        let rows = Rows {
1461            schema: vec![
1462                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1463                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1464            ],
1465            rows: vec![Row {
1466                values: vec![i64_value(1), i64_value(2)],
1467            }],
1468        };
1469
1470        let metadata = Arc::new(new_region_metadata());
1471        let request = WriteRequest::new(
1472            RegionId::new(1, 1),
1473            OpType::Put,
1474            rows,
1475            Some(metadata.clone()),
1476        )
1477        .unwrap();
1478
1479        assert!(request.region_metadata.is_some());
1480        assert_eq!(
1481            request.region_metadata.unwrap().region_id,
1482            RegionId::new(1, 1)
1483        );
1484    }
1485}