1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
39 RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
40 RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
41 RegionTruncateRequest,
42};
43use store_api::storage::{FileId, RegionId};
44use tokio::sync::oneshot::{self, Receiver, Sender};
45
46use crate::error::{
47 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
48 FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
49 Result, UnexpectedSnafu,
50};
51use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind};
52use crate::memtable::MemtableId;
53use crate::memtable::bulk::part::BulkPart;
54use crate::metrics::COMPACTION_ELAPSED_TOTAL;
55use crate::region::options::RegionOptions;
56use crate::sst::file::FileMeta;
57use crate::sst::index::IndexBuildType;
58use crate::wal::EntryId;
59use crate::wal::entry_distributor::WalEntryReceiver;
60
61#[derive(Debug)]
63pub struct WriteRequest {
64 pub region_id: RegionId,
66 pub op_type: OpType,
68 pub rows: Rows,
70 pub name_to_index: HashMap<String, usize>,
72 pub has_null: Vec<bool>,
74 pub hint: Option<WriteHint>,
76 pub(crate) region_metadata: Option<RegionMetadataRef>,
78}
79
80impl WriteRequest {
81 pub fn new(
85 region_id: RegionId,
86 op_type: OpType,
87 rows: Rows,
88 region_metadata: Option<RegionMetadataRef>,
89 ) -> Result<WriteRequest> {
90 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
91 for (index, column) in rows.schema.iter().enumerate() {
92 ensure!(
93 name_to_index
94 .insert(column.column_name.clone(), index)
95 .is_none(),
96 InvalidRequestSnafu {
97 region_id,
98 reason: format!("duplicate column {}", column.column_name),
99 }
100 );
101 }
102
103 let mut has_null = vec![false; rows.schema.len()];
104 for row in &rows.rows {
105 ensure!(
106 row.values.len() == rows.schema.len(),
107 InvalidRequestSnafu {
108 region_id,
109 reason: format!(
110 "row has {} columns but schema has {}",
111 row.values.len(),
112 rows.schema.len()
113 ),
114 }
115 );
116
117 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
118 validate_proto_value(region_id, value, column_schema)?;
119
120 if value.value_data.is_none() {
121 has_null[i] = true;
122 }
123 }
124 }
125
126 Ok(WriteRequest {
127 region_id,
128 op_type,
129 rows,
130 name_to_index,
131 has_null,
132 hint: None,
133 region_metadata,
134 })
135 }
136
137 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
139 self.hint = hint;
140 self
141 }
142
143 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
145 infer_primary_key_encoding_from_hint(self.hint.as_ref())
146 }
147
148 pub(crate) fn estimated_size(&self) -> usize {
150 let row_size = self
151 .rows
152 .rows
153 .first()
154 .map(|row| row.encoded_len())
155 .unwrap_or(0);
156 row_size * self.rows.rows.len()
157 }
158
159 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
161 self.name_to_index.get(name).copied()
162 }
163
164 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
169 debug_assert_eq!(self.region_id, metadata.region_id);
170
171 let region_id = self.region_id;
172 let mut rows_columns: HashMap<_, _> = self
174 .rows
175 .schema
176 .iter()
177 .map(|column| (&column.column_name, column))
178 .collect();
179
180 let mut need_fill_default = false;
181 for column in &metadata.column_metadatas {
183 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
184 ensure!(
186 is_column_type_value_eq(
187 input_col.datatype,
188 input_col.datatype_extension.clone(),
189 &column.column_schema.data_type
190 ),
191 InvalidRequestSnafu {
192 region_id,
193 reason: format!(
194 "column {} expect type {:?}, given: {}({})",
195 column.column_schema.name,
196 column.column_schema.data_type,
197 ColumnDataType::try_from(input_col.datatype)
198 .map(|v| v.as_str_name())
199 .unwrap_or("Unknown"),
200 input_col.datatype,
201 )
202 }
203 );
204
205 ensure!(
207 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
208 InvalidRequestSnafu {
209 region_id,
210 reason: format!(
211 "column {} has semantic type {:?}, given: {}({})",
212 column.column_schema.name,
213 column.semantic_type,
214 api::v1::SemanticType::try_from(input_col.semantic_type)
215 .map(|v| v.as_str_name())
216 .unwrap_or("Unknown"),
217 input_col.semantic_type
218 ),
219 }
220 );
221
222 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
225 ensure!(
226 !has_null || column.column_schema.is_nullable(),
227 InvalidRequestSnafu {
228 region_id,
229 reason: format!(
230 "column {} is not null but input has null",
231 column.column_schema.name
232 ),
233 }
234 );
235 } else {
236 self.check_missing_column(column)?;
238
239 need_fill_default = true;
240 }
241 }
242
243 if !rows_columns.is_empty() {
245 let names: Vec<_> = rows_columns.into_keys().collect();
246 return InvalidRequestSnafu {
247 region_id,
248 reason: format!("unknown columns: {:?}", names),
249 }
250 .fail();
251 }
252
253 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
255
256 Ok(())
257 }
258
259 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
264 debug_assert_eq!(self.region_id, metadata.region_id);
265
266 let mut columns_to_fill = vec![];
267 for column in &metadata.column_metadatas {
268 if !self.name_to_index.contains_key(&column.column_schema.name) {
269 columns_to_fill.push(column);
270 }
271 }
272 self.fill_columns(columns_to_fill)?;
273
274 Ok(())
275 }
276
277 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
279 if let Err(e) = self.check_schema(metadata) {
280 if e.is_fill_default() {
281 self.fill_missing_columns(metadata)?;
285 } else {
286 return Err(e);
287 }
288 }
289
290 Ok(())
291 }
292
293 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
295 let mut default_values = Vec::with_capacity(columns.len());
296 let mut columns_to_fill = Vec::with_capacity(columns.len());
297 for column in columns {
298 let default_value = self.column_default_value(column)?;
299 if default_value.value_data.is_some() {
300 default_values.push(default_value);
301 columns_to_fill.push(column);
302 }
303 }
304
305 for row in &mut self.rows.rows {
306 row.values.extend(default_values.iter().cloned());
307 }
308
309 for column in columns_to_fill {
310 let (datatype, datatype_ext) =
311 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
312 .with_context(|_| ConvertColumnDataTypeSnafu {
313 reason: format!(
314 "no protobuf type for column {} ({:?})",
315 column.column_schema.name, column.column_schema.data_type
316 ),
317 })?
318 .to_parts();
319 self.rows.schema.push(ColumnSchema {
320 column_name: column.column_schema.name.clone(),
321 datatype: datatype as i32,
322 semantic_type: column.semantic_type as i32,
323 datatype_extension: datatype_ext,
324 options: options_from_column_schema(&column.column_schema),
325 });
326 }
327
328 Ok(())
329 }
330
331 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
333 if self.op_type == OpType::Delete {
334 if column.semantic_type == SemanticType::Field {
335 return Ok(());
338 } else {
339 return InvalidRequestSnafu {
340 region_id: self.region_id,
341 reason: format!("delete requests need column {}", column.column_schema.name),
342 }
343 .fail();
344 }
345 }
346
347 ensure!(
349 column.column_schema.is_nullable()
350 || column.column_schema.default_constraint().is_some(),
351 InvalidRequestSnafu {
352 region_id: self.region_id,
353 reason: format!("missing column {}", column.column_schema.name),
354 }
355 );
356
357 Ok(())
358 }
359
360 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
362 let default_value = match self.op_type {
363 OpType::Delete => {
364 ensure!(
365 column.semantic_type == SemanticType::Field,
366 InvalidRequestSnafu {
367 region_id: self.region_id,
368 reason: format!(
369 "delete requests need column {}",
370 column.column_schema.name
371 ),
372 }
373 );
374
375 if column.column_schema.is_nullable() {
380 datatypes::value::Value::Null
381 } else {
382 column.column_schema.data_type.default_value()
383 }
384 }
385 OpType::Put => {
386 if column.column_schema.is_default_impure() {
388 UnexpectedSnafu {
389 reason: format!(
390 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
391 self.region_id,
392 column.column_schema.name,
393 column.column_schema.default_constraint(),
394 ),
395 }
396 .fail()?
397 }
398 column
399 .column_schema
400 .create_default()
401 .context(CreateDefaultSnafu {
402 region_id: self.region_id,
403 column: &column.column_schema.name,
404 })?
405 .with_context(|| InvalidRequestSnafu {
407 region_id: self.region_id,
408 reason: format!(
409 "column {} does not have default value",
410 column.column_schema.name
411 ),
412 })?
413 }
414 };
415
416 Ok(api::helper::to_grpc_value(default_value))
418 }
419}
420
421pub(crate) fn validate_proto_value(
423 region_id: RegionId,
424 value: &Value,
425 column_schema: &ColumnSchema,
426) -> Result<()> {
427 if let Some(value_type) = proto_value_type(value) {
428 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
429 InvalidRequestSnafu {
430 region_id,
431 reason: format!(
432 "column {} has unknown type {}",
433 column_schema.column_name, column_schema.datatype
434 ),
435 }
436 .build()
437 })?;
438 ensure!(
439 proto_value_type_match(column_type, value_type),
440 InvalidRequestSnafu {
441 region_id,
442 reason: format!(
443 "value has type {:?}, but column {} has type {:?}({})",
444 value_type, column_schema.column_name, column_type, column_schema.datatype,
445 ),
446 }
447 );
448 }
449
450 Ok(())
451}
452
453fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
454 match (column_type, value_type) {
455 (ct, vt) if ct == vt => true,
456 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
457 (ColumnDataType::Json, ColumnDataType::Binary) => true,
458 _ => false,
459 }
460}
461
462#[derive(Debug)]
464pub struct OutputTx(Sender<Result<AffectedRows>>);
465
466impl OutputTx {
467 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
469 OutputTx(sender)
470 }
471
472 pub(crate) fn send(self, result: Result<AffectedRows>) {
474 let _ = self.0.send(result);
476 }
477}
478
479#[derive(Debug)]
481pub(crate) struct OptionOutputTx(Option<OutputTx>);
482
483impl OptionOutputTx {
484 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
486 OptionOutputTx(sender)
487 }
488
489 pub(crate) fn none() -> OptionOutputTx {
491 OptionOutputTx(None)
492 }
493
494 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
496 if let Some(sender) = self.0.take() {
497 sender.send(result);
498 }
499 }
500
501 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
503 if let Some(sender) = self.0.take() {
504 sender.send(result);
505 }
506 }
507
508 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
510 self.0.take()
511 }
512}
513
514impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
515 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
516 Self::new(Some(OutputTx::new(sender)))
517 }
518}
519
520impl OnFailure for OptionOutputTx {
521 fn on_failure(&mut self, err: Error) {
522 self.send_mut(Err(err));
523 }
524}
525
526pub(crate) trait OnFailure {
528 fn on_failure(&mut self, err: Error);
530}
531
532#[derive(Debug)]
534pub(crate) struct SenderWriteRequest {
535 pub(crate) sender: OptionOutputTx,
537 pub(crate) request: WriteRequest,
538}
539
540pub(crate) struct SenderBulkRequest {
541 pub(crate) sender: OptionOutputTx,
542 pub(crate) region_id: RegionId,
543 pub(crate) request: BulkPart,
544 pub(crate) region_metadata: RegionMetadataRef,
545}
546
547#[derive(Debug)]
549pub(crate) struct WorkerRequestWithTime {
550 pub(crate) request: WorkerRequest,
551 pub(crate) created_at: Instant,
552}
553
554impl WorkerRequestWithTime {
555 pub(crate) fn new(request: WorkerRequest) -> Self {
556 Self {
557 request,
558 created_at: Instant::now(),
559 }
560 }
561}
562
563#[derive(Debug)]
565pub(crate) enum WorkerRequest {
566 Write(SenderWriteRequest),
568
569 Ddl(SenderDdlRequest),
571
572 Background {
574 region_id: RegionId,
576 notify: BackgroundNotify,
578 },
579
580 SetRegionRoleStateGracefully {
582 region_id: RegionId,
584 region_role_state: SettableRegionRoleState,
586 sender: Sender<SetRegionRoleStateResponse>,
588 },
589
590 Stop,
592
593 EditRegion(RegionEditRequest),
595
596 SyncRegion(RegionSyncRequest),
598
599 BulkInserts {
601 metadata: Option<RegionMetadataRef>,
602 request: RegionBulkInsertsRequest,
603 sender: OptionOutputTx,
604 },
605
606 RemapManifests(RemapManifestsRequest),
608}
609
610impl WorkerRequest {
611 pub(crate) fn new_open_region_request(
613 region_id: RegionId,
614 request: RegionOpenRequest,
615 entry_receiver: Option<WalEntryReceiver>,
616 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
617 let (sender, receiver) = oneshot::channel();
618
619 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
620 region_id,
621 sender: sender.into(),
622 request: DdlRequest::Open((request, entry_receiver)),
623 });
624
625 (worker_request, receiver)
626 }
627
628 pub(crate) fn new_catchup_region_request(
630 region_id: RegionId,
631 request: RegionCatchupRequest,
632 entry_receiver: Option<WalEntryReceiver>,
633 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
634 let (sender, receiver) = oneshot::channel();
635 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
636 region_id,
637 sender: sender.into(),
638 request: DdlRequest::Catchup((request, entry_receiver)),
639 });
640 (worker_request, receiver)
641 }
642
643 pub(crate) fn try_from_region_request(
645 region_id: RegionId,
646 value: RegionRequest,
647 region_metadata: Option<RegionMetadataRef>,
648 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
649 let (sender, receiver) = oneshot::channel();
650 let worker_request = match value {
651 RegionRequest::Put(v) => {
652 let mut write_request =
653 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
654 .with_hint(v.hint);
655 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
656 && let Some(region_metadata) = ®ion_metadata
657 {
658 write_request.maybe_fill_missing_columns(region_metadata)?;
659 }
660 WorkerRequest::Write(SenderWriteRequest {
661 sender: sender.into(),
662 request: write_request,
663 })
664 }
665 RegionRequest::Delete(v) => {
666 let mut write_request =
667 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
668 .with_hint(v.hint);
669 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
670 && let Some(region_metadata) = ®ion_metadata
671 {
672 write_request.maybe_fill_missing_columns(region_metadata)?;
673 }
674 WorkerRequest::Write(SenderWriteRequest {
675 sender: sender.into(),
676 request: write_request,
677 })
678 }
679 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
680 region_id,
681 sender: sender.into(),
682 request: DdlRequest::Create(v),
683 }),
684 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
685 region_id,
686 sender: sender.into(),
687 request: DdlRequest::Drop,
688 }),
689 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
690 region_id,
691 sender: sender.into(),
692 request: DdlRequest::Open((v, None)),
693 }),
694 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
695 region_id,
696 sender: sender.into(),
697 request: DdlRequest::Close(v),
698 }),
699 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
700 region_id,
701 sender: sender.into(),
702 request: DdlRequest::Alter(v),
703 }),
704 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
705 region_id,
706 sender: sender.into(),
707 request: DdlRequest::Flush(v),
708 }),
709 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
710 region_id,
711 sender: sender.into(),
712 request: DdlRequest::Compact(v),
713 }),
714 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
715 region_id,
716 sender: sender.into(),
717 request: DdlRequest::BuildIndex(v),
718 }),
719 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
720 region_id,
721 sender: sender.into(),
722 request: DdlRequest::Truncate(v),
723 }),
724 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
725 region_id,
726 sender: sender.into(),
727 request: DdlRequest::Catchup((v, None)),
728 }),
729 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
730 region_id,
731 sender: sender.into(),
732 request: DdlRequest::EnterStaging(v),
733 }),
734 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
735 metadata: region_metadata,
736 sender: sender.into(),
737 request: region_bulk_inserts_request,
738 },
739 };
740
741 Ok((worker_request, receiver))
742 }
743
744 pub(crate) fn new_set_readonly_gracefully(
745 region_id: RegionId,
746 region_role_state: SettableRegionRoleState,
747 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
748 let (sender, receiver) = oneshot::channel();
749
750 (
751 WorkerRequest::SetRegionRoleStateGracefully {
752 region_id,
753 region_role_state,
754 sender,
755 },
756 receiver,
757 )
758 }
759
760 pub(crate) fn new_sync_region_request(
761 region_id: RegionId,
762 manifest_version: ManifestVersion,
763 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
764 let (sender, receiver) = oneshot::channel();
765 (
766 WorkerRequest::SyncRegion(RegionSyncRequest {
767 region_id,
768 manifest_version,
769 sender,
770 }),
771 receiver,
772 )
773 }
774
775 #[allow(clippy::type_complexity)]
782 pub(crate) fn try_from_remap_manifests_request(
783 store_api::region_engine::RemapManifestsRequest {
784 region_id,
785 input_regions,
786 region_mapping,
787 new_partition_exprs,
788 }: store_api::region_engine::RemapManifestsRequest,
789 ) -> Result<(
790 WorkerRequest,
791 Receiver<Result<HashMap<RegionId, RegionManifest>>>,
792 )> {
793 let (sender, receiver) = oneshot::channel();
794 let new_partition_exprs = new_partition_exprs
795 .into_iter()
796 .map(|(k, v)| {
797 Ok((
798 k,
799 PartitionExpr::from_json_str(&v)
800 .context(InvalidPartitionExprSnafu { expr: v })?
801 .context(MissingPartitionExprSnafu { region_id: k })?,
802 ))
803 })
804 .collect::<Result<HashMap<_, _>>>()?;
805
806 let request = RemapManifestsRequest {
807 region_id,
808 input_regions,
809 region_mapping,
810 new_partition_exprs,
811 sender,
812 };
813
814 Ok((WorkerRequest::RemapManifests(request), receiver))
815 }
816}
817
818#[derive(Debug)]
820pub(crate) enum DdlRequest {
821 Create(RegionCreateRequest),
822 Drop,
823 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
824 Close(RegionCloseRequest),
825 Alter(RegionAlterRequest),
826 Flush(RegionFlushRequest),
827 Compact(RegionCompactRequest),
828 BuildIndex(RegionBuildIndexRequest),
829 Truncate(RegionTruncateRequest),
830 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
831 EnterStaging(EnterStagingRequest),
832}
833
834#[derive(Debug)]
836pub(crate) struct SenderDdlRequest {
837 pub(crate) region_id: RegionId,
839 pub(crate) sender: OptionOutputTx,
841 pub(crate) request: DdlRequest,
843}
844
845#[derive(Debug)]
847pub(crate) enum BackgroundNotify {
848 FlushFinished(FlushFinished),
850 FlushFailed(FlushFailed),
852 IndexBuildFinished(IndexBuildFinished),
854 IndexBuildStopped(IndexBuildStopped),
856 IndexBuildFailed(IndexBuildFailed),
858 CompactionFinished(CompactionFinished),
860 CompactionFailed(CompactionFailed),
862 Truncate(TruncateResult),
864 RegionChange(RegionChangeResult),
866 RegionEdit(RegionEditResult),
868 EnterStaging(EnterStagingResult),
870}
871
872#[derive(Debug)]
874pub(crate) struct FlushFinished {
875 pub(crate) region_id: RegionId,
877 pub(crate) flushed_entry_id: EntryId,
879 pub(crate) senders: Vec<OutputTx>,
881 pub(crate) _timer: HistogramTimer,
883 pub(crate) edit: RegionEdit,
885 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
887 pub(crate) is_staging: bool,
889}
890
891impl FlushFinished {
892 pub(crate) fn on_success(self) {
894 for sender in self.senders {
895 sender.send(Ok(0));
896 }
897 }
898}
899
900impl OnFailure for FlushFinished {
901 fn on_failure(&mut self, err: Error) {
902 let err = Arc::new(err);
903 for sender in self.senders.drain(..) {
904 sender.send(Err(err.clone()).context(FlushRegionSnafu {
905 region_id: self.region_id,
906 }));
907 }
908 }
909}
910
911#[derive(Debug)]
913pub(crate) struct FlushFailed {
914 pub(crate) err: Arc<Error>,
916}
917
918#[derive(Debug)]
919pub(crate) struct IndexBuildFinished {
920 #[allow(dead_code)]
921 pub(crate) region_id: RegionId,
922 pub(crate) edit: RegionEdit,
923}
924
925#[derive(Debug)]
927pub(crate) struct IndexBuildStopped {
928 #[allow(dead_code)]
929 pub(crate) region_id: RegionId,
930 pub(crate) file_id: FileId,
931}
932
933#[derive(Debug)]
935pub(crate) struct IndexBuildFailed {
936 pub(crate) err: Arc<Error>,
937}
938
939#[derive(Debug)]
941pub(crate) struct CompactionFinished {
942 pub(crate) region_id: RegionId,
944 pub(crate) senders: Vec<OutputTx>,
946 pub(crate) start_time: Instant,
948 pub(crate) edit: RegionEdit,
950}
951
952impl CompactionFinished {
953 pub fn on_success(self) {
954 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
956
957 for sender in self.senders {
958 sender.send(Ok(0));
959 }
960 info!("Successfully compacted region: {}", self.region_id);
961 }
962}
963
964impl OnFailure for CompactionFinished {
965 fn on_failure(&mut self, err: Error) {
967 let err = Arc::new(err);
968 for sender in self.senders.drain(..) {
969 sender.send(Err(err.clone()).context(CompactRegionSnafu {
970 region_id: self.region_id,
971 }));
972 }
973 }
974}
975
976#[derive(Debug)]
978pub(crate) struct CompactionFailed {
979 pub(crate) region_id: RegionId,
980 pub(crate) err: Arc<Error>,
982}
983
984#[derive(Debug)]
986pub(crate) struct TruncateResult {
987 pub(crate) region_id: RegionId,
989 pub(crate) sender: OptionOutputTx,
991 pub(crate) result: Result<()>,
993 pub(crate) kind: TruncateKind,
994}
995
996#[derive(Debug)]
998pub(crate) struct RegionChangeResult {
999 pub(crate) region_id: RegionId,
1001 pub(crate) new_meta: RegionMetadataRef,
1003 pub(crate) sender: OptionOutputTx,
1005 pub(crate) result: Result<()>,
1007 pub(crate) need_index: bool,
1009 pub(crate) new_options: Option<RegionOptions>,
1011}
1012
1013#[derive(Debug)]
1015pub(crate) struct EnterStagingResult {
1016 pub(crate) region_id: RegionId,
1018 pub(crate) partition_expr: String,
1020 pub(crate) sender: OptionOutputTx,
1022 pub(crate) result: Result<()>,
1024}
1025
1026#[derive(Debug)]
1028pub(crate) struct RegionEditRequest {
1029 pub(crate) region_id: RegionId,
1030 pub(crate) edit: RegionEdit,
1031 pub(crate) tx: Sender<Result<()>>,
1033}
1034
1035#[derive(Debug)]
1037pub(crate) struct RegionEditResult {
1038 pub(crate) region_id: RegionId,
1040 pub(crate) sender: Sender<Result<()>>,
1042 pub(crate) edit: RegionEdit,
1044 pub(crate) result: Result<()>,
1046 pub(crate) update_region_state: bool,
1048}
1049
1050#[derive(Debug)]
1051pub(crate) struct BuildIndexRequest {
1052 pub(crate) region_id: RegionId,
1053 pub(crate) build_type: IndexBuildType,
1054 pub(crate) file_metas: Vec<FileMeta>,
1056}
1057
1058#[derive(Debug)]
1059pub(crate) struct RegionSyncRequest {
1060 pub(crate) region_id: RegionId,
1061 pub(crate) manifest_version: ManifestVersion,
1062 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1064}
1065
1066#[derive(Debug)]
1067pub(crate) struct RemapManifestsRequest {
1068 pub(crate) region_id: RegionId,
1070 pub(crate) input_regions: Vec<RegionId>,
1072 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1074 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1076 pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use api::v1::value::ValueData;
1083 use api::v1::{Row, SemanticType};
1084 use datatypes::prelude::ConcreteDataType;
1085 use datatypes::schema::ColumnDefaultConstraint;
1086 use mito_codec::test_util::i64_value;
1087 use store_api::metadata::RegionMetadataBuilder;
1088
1089 use super::*;
1090 use crate::error::Error;
1091 use crate::test_util::ts_ms_value;
1092
1093 fn new_column_schema(
1094 name: &str,
1095 data_type: ColumnDataType,
1096 semantic_type: SemanticType,
1097 ) -> ColumnSchema {
1098 ColumnSchema {
1099 column_name: name.to_string(),
1100 datatype: data_type as i32,
1101 semantic_type: semantic_type as i32,
1102 ..Default::default()
1103 }
1104 }
1105
1106 fn check_invalid_request(err: &Error, expect: &str) {
1107 if let Error::InvalidRequest {
1108 region_id: _,
1109 reason,
1110 location: _,
1111 } = err
1112 {
1113 assert_eq!(reason, expect);
1114 } else {
1115 panic!("Unexpected error {err}")
1116 }
1117 }
1118
1119 #[test]
1120 fn test_write_request_duplicate_column() {
1121 let rows = Rows {
1122 schema: vec![
1123 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1124 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1125 ],
1126 rows: vec![],
1127 };
1128
1129 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1130 check_invalid_request(&err, "duplicate column c0");
1131 }
1132
1133 #[test]
1134 fn test_valid_write_request() {
1135 let rows = Rows {
1136 schema: vec![
1137 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1138 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1139 ],
1140 rows: vec![Row {
1141 values: vec![i64_value(1), i64_value(2)],
1142 }],
1143 };
1144
1145 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1146 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1147 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1148 assert_eq!(None, request.column_index_by_name("c2"));
1149 }
1150
1151 #[test]
1152 fn test_write_request_column_num() {
1153 let rows = Rows {
1154 schema: vec![
1155 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1156 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1157 ],
1158 rows: vec![Row {
1159 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1160 }],
1161 };
1162
1163 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1164 check_invalid_request(&err, "row has 3 columns but schema has 2");
1165 }
1166
1167 fn new_region_metadata() -> RegionMetadata {
1168 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1169 builder
1170 .push_column_metadata(ColumnMetadata {
1171 column_schema: datatypes::schema::ColumnSchema::new(
1172 "ts",
1173 ConcreteDataType::timestamp_millisecond_datatype(),
1174 false,
1175 ),
1176 semantic_type: SemanticType::Timestamp,
1177 column_id: 1,
1178 })
1179 .push_column_metadata(ColumnMetadata {
1180 column_schema: datatypes::schema::ColumnSchema::new(
1181 "k0",
1182 ConcreteDataType::int64_datatype(),
1183 true,
1184 ),
1185 semantic_type: SemanticType::Tag,
1186 column_id: 2,
1187 })
1188 .primary_key(vec![2]);
1189 builder.build().unwrap()
1190 }
1191
1192 #[test]
1193 fn test_check_schema() {
1194 let rows = Rows {
1195 schema: vec![
1196 new_column_schema(
1197 "ts",
1198 ColumnDataType::TimestampMillisecond,
1199 SemanticType::Timestamp,
1200 ),
1201 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1202 ],
1203 rows: vec![Row {
1204 values: vec![ts_ms_value(1), i64_value(2)],
1205 }],
1206 };
1207 let metadata = new_region_metadata();
1208
1209 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1210 request.check_schema(&metadata).unwrap();
1211 }
1212
1213 #[test]
1214 fn test_column_type() {
1215 let rows = Rows {
1216 schema: vec![
1217 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1218 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1219 ],
1220 rows: vec![Row {
1221 values: vec![i64_value(1), i64_value(2)],
1222 }],
1223 };
1224 let metadata = new_region_metadata();
1225
1226 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1227 let err = request.check_schema(&metadata).unwrap_err();
1228 check_invalid_request(
1229 &err,
1230 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1231 );
1232 }
1233
1234 #[test]
1235 fn test_semantic_type() {
1236 let rows = Rows {
1237 schema: vec![
1238 new_column_schema(
1239 "ts",
1240 ColumnDataType::TimestampMillisecond,
1241 SemanticType::Tag,
1242 ),
1243 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1244 ],
1245 rows: vec![Row {
1246 values: vec![ts_ms_value(1), i64_value(2)],
1247 }],
1248 };
1249 let metadata = new_region_metadata();
1250
1251 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1252 let err = request.check_schema(&metadata).unwrap_err();
1253 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1254 }
1255
1256 #[test]
1257 fn test_column_nullable() {
1258 let rows = Rows {
1259 schema: vec![
1260 new_column_schema(
1261 "ts",
1262 ColumnDataType::TimestampMillisecond,
1263 SemanticType::Timestamp,
1264 ),
1265 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1266 ],
1267 rows: vec![Row {
1268 values: vec![Value { value_data: None }, i64_value(2)],
1269 }],
1270 };
1271 let metadata = new_region_metadata();
1272
1273 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1274 let err = request.check_schema(&metadata).unwrap_err();
1275 check_invalid_request(&err, "column ts is not null but input has null");
1276 }
1277
1278 #[test]
1279 fn test_column_default() {
1280 let rows = Rows {
1281 schema: vec![new_column_schema(
1282 "k0",
1283 ColumnDataType::Int64,
1284 SemanticType::Tag,
1285 )],
1286 rows: vec![Row {
1287 values: vec![i64_value(1)],
1288 }],
1289 };
1290 let metadata = new_region_metadata();
1291
1292 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1293 let err = request.check_schema(&metadata).unwrap_err();
1294 check_invalid_request(&err, "missing column ts");
1295 }
1296
1297 #[test]
1298 fn test_unknown_column() {
1299 let rows = Rows {
1300 schema: vec![
1301 new_column_schema(
1302 "ts",
1303 ColumnDataType::TimestampMillisecond,
1304 SemanticType::Timestamp,
1305 ),
1306 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1307 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1308 ],
1309 rows: vec![Row {
1310 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1311 }],
1312 };
1313 let metadata = new_region_metadata();
1314
1315 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1316 let err = request.check_schema(&metadata).unwrap_err();
1317 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1318 }
1319
1320 #[test]
1321 fn test_fill_impure_columns_err() {
1322 let rows = Rows {
1323 schema: vec![new_column_schema(
1324 "k0",
1325 ColumnDataType::Int64,
1326 SemanticType::Tag,
1327 )],
1328 rows: vec![Row {
1329 values: vec![i64_value(1)],
1330 }],
1331 };
1332 let metadata = {
1333 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1334 builder
1335 .push_column_metadata(ColumnMetadata {
1336 column_schema: datatypes::schema::ColumnSchema::new(
1337 "ts",
1338 ConcreteDataType::timestamp_millisecond_datatype(),
1339 false,
1340 )
1341 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1342 "now()".to_string(),
1343 )))
1344 .unwrap(),
1345 semantic_type: SemanticType::Timestamp,
1346 column_id: 1,
1347 })
1348 .push_column_metadata(ColumnMetadata {
1349 column_schema: datatypes::schema::ColumnSchema::new(
1350 "k0",
1351 ConcreteDataType::int64_datatype(),
1352 true,
1353 ),
1354 semantic_type: SemanticType::Tag,
1355 column_id: 2,
1356 })
1357 .primary_key(vec![2]);
1358 builder.build().unwrap()
1359 };
1360
1361 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1362 let err = request.check_schema(&metadata).unwrap_err();
1363 assert!(err.is_fill_default());
1364 assert!(
1365 request
1366 .fill_missing_columns(&metadata)
1367 .unwrap_err()
1368 .to_string()
1369 .contains("unexpected impure default value with region_id")
1370 );
1371 }
1372
1373 #[test]
1374 fn test_fill_missing_columns() {
1375 let rows = Rows {
1376 schema: vec![new_column_schema(
1377 "ts",
1378 ColumnDataType::TimestampMillisecond,
1379 SemanticType::Timestamp,
1380 )],
1381 rows: vec![Row {
1382 values: vec![ts_ms_value(1)],
1383 }],
1384 };
1385 let metadata = new_region_metadata();
1386
1387 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1388 let err = request.check_schema(&metadata).unwrap_err();
1389 assert!(err.is_fill_default());
1390 request.fill_missing_columns(&metadata).unwrap();
1391
1392 let expect_rows = Rows {
1393 schema: vec![new_column_schema(
1394 "ts",
1395 ColumnDataType::TimestampMillisecond,
1396 SemanticType::Timestamp,
1397 )],
1398 rows: vec![Row {
1399 values: vec![ts_ms_value(1)],
1400 }],
1401 };
1402 assert_eq!(expect_rows, request.rows);
1403 }
1404
1405 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1406 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1407 builder
1408 .push_column_metadata(ColumnMetadata {
1409 column_schema: datatypes::schema::ColumnSchema::new(
1410 "ts",
1411 ConcreteDataType::timestamp_millisecond_datatype(),
1412 false,
1413 ),
1414 semantic_type: SemanticType::Timestamp,
1415 column_id: 1,
1416 })
1417 .push_column_metadata(ColumnMetadata {
1418 column_schema: datatypes::schema::ColumnSchema::new(
1419 "k0",
1420 ConcreteDataType::int64_datatype(),
1421 true,
1422 ),
1423 semantic_type: SemanticType::Tag,
1424 column_id: 2,
1425 })
1426 .primary_key(vec![2]);
1427 builder
1428 }
1429
1430 fn region_metadata_two_fields() -> RegionMetadata {
1431 let mut builder = builder_with_ts_tag();
1432 builder
1433 .push_column_metadata(ColumnMetadata {
1434 column_schema: datatypes::schema::ColumnSchema::new(
1435 "f0",
1436 ConcreteDataType::int64_datatype(),
1437 true,
1438 ),
1439 semantic_type: SemanticType::Field,
1440 column_id: 3,
1441 })
1442 .push_column_metadata(ColumnMetadata {
1444 column_schema: datatypes::schema::ColumnSchema::new(
1445 "f1",
1446 ConcreteDataType::int64_datatype(),
1447 false,
1448 )
1449 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1450 datatypes::value::Value::Int64(100),
1451 )))
1452 .unwrap(),
1453 semantic_type: SemanticType::Field,
1454 column_id: 4,
1455 });
1456 builder.build().unwrap()
1457 }
1458
1459 #[test]
1460 fn test_fill_missing_for_delete() {
1461 let rows = Rows {
1462 schema: vec![new_column_schema(
1463 "ts",
1464 ColumnDataType::TimestampMillisecond,
1465 SemanticType::Timestamp,
1466 )],
1467 rows: vec![Row {
1468 values: vec![ts_ms_value(1)],
1469 }],
1470 };
1471 let metadata = region_metadata_two_fields();
1472
1473 let mut request =
1474 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1475 let err = request.check_schema(&metadata).unwrap_err();
1476 check_invalid_request(&err, "delete requests need column k0");
1477 let err = request.fill_missing_columns(&metadata).unwrap_err();
1478 check_invalid_request(&err, "delete requests need column k0");
1479
1480 let rows = Rows {
1481 schema: vec![
1482 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1483 new_column_schema(
1484 "ts",
1485 ColumnDataType::TimestampMillisecond,
1486 SemanticType::Timestamp,
1487 ),
1488 ],
1489 rows: vec![Row {
1490 values: vec![i64_value(100), ts_ms_value(1)],
1491 }],
1492 };
1493 let mut request =
1494 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1495 let err = request.check_schema(&metadata).unwrap_err();
1496 assert!(err.is_fill_default());
1497 request.fill_missing_columns(&metadata).unwrap();
1498
1499 let expect_rows = Rows {
1500 schema: vec![
1501 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1502 new_column_schema(
1503 "ts",
1504 ColumnDataType::TimestampMillisecond,
1505 SemanticType::Timestamp,
1506 ),
1507 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1508 ],
1509 rows: vec![Row {
1511 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1512 }],
1513 };
1514 assert_eq!(expect_rows, request.rows);
1515 }
1516
1517 #[test]
1518 fn test_fill_missing_without_default_in_delete() {
1519 let mut builder = builder_with_ts_tag();
1520 builder
1521 .push_column_metadata(ColumnMetadata {
1523 column_schema: datatypes::schema::ColumnSchema::new(
1524 "f0",
1525 ConcreteDataType::int64_datatype(),
1526 true,
1527 ),
1528 semantic_type: SemanticType::Field,
1529 column_id: 3,
1530 })
1531 .push_column_metadata(ColumnMetadata {
1533 column_schema: datatypes::schema::ColumnSchema::new(
1534 "f1",
1535 ConcreteDataType::int64_datatype(),
1536 false,
1537 ),
1538 semantic_type: SemanticType::Field,
1539 column_id: 4,
1540 });
1541 let metadata = builder.build().unwrap();
1542
1543 let rows = Rows {
1544 schema: vec![
1545 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1546 new_column_schema(
1547 "ts",
1548 ColumnDataType::TimestampMillisecond,
1549 SemanticType::Timestamp,
1550 ),
1551 ],
1552 rows: vec![Row {
1554 values: vec![i64_value(100), ts_ms_value(1)],
1555 }],
1556 };
1557 let mut request =
1558 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1559 let err = request.check_schema(&metadata).unwrap_err();
1560 assert!(err.is_fill_default());
1561 request.fill_missing_columns(&metadata).unwrap();
1562
1563 let expect_rows = Rows {
1564 schema: vec![
1565 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1566 new_column_schema(
1567 "ts",
1568 ColumnDataType::TimestampMillisecond,
1569 SemanticType::Timestamp,
1570 ),
1571 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1572 ],
1573 rows: vec![Row {
1575 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1576 }],
1577 };
1578 assert_eq!(expect_rows, request.rows);
1579 }
1580
1581 #[test]
1582 fn test_no_default() {
1583 let rows = Rows {
1584 schema: vec![new_column_schema(
1585 "k0",
1586 ColumnDataType::Int64,
1587 SemanticType::Tag,
1588 )],
1589 rows: vec![Row {
1590 values: vec![i64_value(1)],
1591 }],
1592 };
1593 let metadata = new_region_metadata();
1594
1595 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1596 let err = request.fill_missing_columns(&metadata).unwrap_err();
1597 check_invalid_request(&err, "column ts does not have default value");
1598 }
1599
1600 #[test]
1601 fn test_missing_and_invalid() {
1602 let rows = Rows {
1604 schema: vec![
1605 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1606 new_column_schema(
1607 "ts",
1608 ColumnDataType::TimestampMillisecond,
1609 SemanticType::Timestamp,
1610 ),
1611 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1612 ],
1613 rows: vec![Row {
1614 values: vec![
1615 i64_value(100),
1616 ts_ms_value(1),
1617 Value {
1618 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1619 },
1620 ],
1621 }],
1622 };
1623 let metadata = region_metadata_two_fields();
1624
1625 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1626 let err = request.check_schema(&metadata).unwrap_err();
1627 check_invalid_request(
1628 &err,
1629 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1630 );
1631 }
1632
1633 #[test]
1634 fn test_write_request_metadata() {
1635 let rows = Rows {
1636 schema: vec![
1637 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1638 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1639 ],
1640 rows: vec![Row {
1641 values: vec![i64_value(1), i64_value(2)],
1642 }],
1643 };
1644
1645 let metadata = Arc::new(new_region_metadata());
1646 let request = WriteRequest::new(
1647 RegionId::new(1, 1),
1648 OpType::Put,
1649 rows,
1650 Some(metadata.clone()),
1651 )
1652 .unwrap();
1653
1654 assert!(request.region_metadata.is_some());
1655 assert_eq!(
1656 request.region_metadata.unwrap().region_id,
1657 RegionId::new(1, 1)
1658 );
1659 }
1660}