1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::{RegionId, SequenceNumber};
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56#[derive(Debug)]
58pub struct WriteRequest {
59 pub region_id: RegionId,
61 pub op_type: OpType,
63 pub rows: Rows,
65 pub name_to_index: HashMap<String, usize>,
67 pub has_null: Vec<bool>,
69 pub hint: Option<WriteHint>,
71 pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76 pub fn new(
80 region_id: RegionId,
81 op_type: OpType,
82 rows: Rows,
83 region_metadata: Option<RegionMetadataRef>,
84 ) -> Result<WriteRequest> {
85 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86 for (index, column) in rows.schema.iter().enumerate() {
87 ensure!(
88 name_to_index
89 .insert(column.column_name.clone(), index)
90 .is_none(),
91 InvalidRequestSnafu {
92 region_id,
93 reason: format!("duplicate column {}", column.column_name),
94 }
95 );
96 }
97
98 let mut has_null = vec![false; rows.schema.len()];
99 for row in &rows.rows {
100 ensure!(
101 row.values.len() == rows.schema.len(),
102 InvalidRequestSnafu {
103 region_id,
104 reason: format!(
105 "row has {} columns but schema has {}",
106 row.values.len(),
107 rows.schema.len()
108 ),
109 }
110 );
111
112 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113 validate_proto_value(region_id, value, column_schema)?;
114
115 if value.value_data.is_none() {
116 has_null[i] = true;
117 }
118 }
119 }
120
121 Ok(WriteRequest {
122 region_id,
123 op_type,
124 rows,
125 name_to_index,
126 has_null,
127 hint: None,
128 region_metadata,
129 })
130 }
131
132 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134 self.hint = hint;
135 self
136 }
137
138 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140 infer_primary_key_encoding_from_hint(self.hint.as_ref())
141 }
142
143 pub(crate) fn estimated_size(&self) -> usize {
145 let row_size = self
146 .rows
147 .rows
148 .first()
149 .map(|row| row.encoded_len())
150 .unwrap_or(0);
151 row_size * self.rows.rows.len()
152 }
153
154 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156 self.name_to_index.get(name).copied()
157 }
158
159 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164 debug_assert_eq!(self.region_id, metadata.region_id);
165
166 let region_id = self.region_id;
167 let mut rows_columns: HashMap<_, _> = self
169 .rows
170 .schema
171 .iter()
172 .map(|column| (&column.column_name, column))
173 .collect();
174
175 let mut need_fill_default = false;
176 for column in &metadata.column_metadatas {
178 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179 ensure!(
181 is_column_type_value_eq(
182 input_col.datatype,
183 input_col.datatype_extension,
184 &column.column_schema.data_type
185 ),
186 InvalidRequestSnafu {
187 region_id,
188 reason: format!(
189 "column {} expect type {:?}, given: {}({})",
190 column.column_schema.name,
191 column.column_schema.data_type,
192 ColumnDataType::try_from(input_col.datatype)
193 .map(|v| v.as_str_name())
194 .unwrap_or("Unknown"),
195 input_col.datatype,
196 )
197 }
198 );
199
200 ensure!(
202 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203 InvalidRequestSnafu {
204 region_id,
205 reason: format!(
206 "column {} has semantic type {:?}, given: {}({})",
207 column.column_schema.name,
208 column.semantic_type,
209 api::v1::SemanticType::try_from(input_col.semantic_type)
210 .map(|v| v.as_str_name())
211 .unwrap_or("Unknown"),
212 input_col.semantic_type
213 ),
214 }
215 );
216
217 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220 ensure!(
221 !has_null || column.column_schema.is_nullable(),
222 InvalidRequestSnafu {
223 region_id,
224 reason: format!(
225 "column {} is not null but input has null",
226 column.column_schema.name
227 ),
228 }
229 );
230 } else {
231 self.check_missing_column(column)?;
233
234 need_fill_default = true;
235 }
236 }
237
238 if !rows_columns.is_empty() {
240 let names: Vec<_> = rows_columns.into_keys().collect();
241 return InvalidRequestSnafu {
242 region_id,
243 reason: format!("unknown columns: {:?}", names),
244 }
245 .fail();
246 }
247
248 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251 Ok(())
252 }
253
254 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259 debug_assert_eq!(self.region_id, metadata.region_id);
260
261 let mut columns_to_fill = vec![];
262 for column in &metadata.column_metadatas {
263 if !self.name_to_index.contains_key(&column.column_schema.name) {
264 columns_to_fill.push(column);
265 }
266 }
267 self.fill_columns(columns_to_fill)?;
268
269 Ok(())
270 }
271
272 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274 if let Err(e) = self.check_schema(metadata) {
275 if e.is_fill_default() {
276 self.fill_missing_columns(metadata)?;
280 } else {
281 return Err(e);
282 }
283 }
284
285 Ok(())
286 }
287
288 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290 let mut default_values = Vec::with_capacity(columns.len());
291 let mut columns_to_fill = Vec::with_capacity(columns.len());
292 for column in columns {
293 let default_value = self.column_default_value(column)?;
294 if default_value.value_data.is_some() {
295 default_values.push(default_value);
296 columns_to_fill.push(column);
297 }
298 }
299
300 for row in &mut self.rows.rows {
301 row.values.extend(default_values.iter().cloned());
302 }
303
304 for column in columns_to_fill {
305 let (datatype, datatype_ext) =
306 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307 .with_context(|_| ConvertColumnDataTypeSnafu {
308 reason: format!(
309 "no protobuf type for column {} ({:?})",
310 column.column_schema.name, column.column_schema.data_type
311 ),
312 })?
313 .to_parts();
314 self.rows.schema.push(ColumnSchema {
315 column_name: column.column_schema.name.clone(),
316 datatype: datatype as i32,
317 semantic_type: column.semantic_type as i32,
318 datatype_extension: datatype_ext,
319 options: options_from_column_schema(&column.column_schema),
320 });
321 }
322
323 Ok(())
324 }
325
326 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328 if self.op_type == OpType::Delete {
329 if column.semantic_type == SemanticType::Field {
330 return Ok(());
333 } else {
334 return InvalidRequestSnafu {
335 region_id: self.region_id,
336 reason: format!("delete requests need column {}", column.column_schema.name),
337 }
338 .fail();
339 }
340 }
341
342 ensure!(
344 column.column_schema.is_nullable()
345 || column.column_schema.default_constraint().is_some(),
346 InvalidRequestSnafu {
347 region_id: self.region_id,
348 reason: format!("missing column {}", column.column_schema.name),
349 }
350 );
351
352 Ok(())
353 }
354
355 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357 let default_value = match self.op_type {
358 OpType::Delete => {
359 ensure!(
360 column.semantic_type == SemanticType::Field,
361 InvalidRequestSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "delete requests need column {}",
365 column.column_schema.name
366 ),
367 }
368 );
369
370 if column.column_schema.is_nullable() {
375 datatypes::value::Value::Null
376 } else {
377 column.column_schema.data_type.default_value()
378 }
379 }
380 OpType::Put => {
381 if column.column_schema.is_default_impure() {
383 UnexpectedSnafu {
384 reason: format!(
385 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
386 self.region_id,
387 column.column_schema.name,
388 column.column_schema.default_constraint(),
389 ),
390 }
391 .fail()?
392 }
393 column
394 .column_schema
395 .create_default()
396 .context(CreateDefaultSnafu {
397 region_id: self.region_id,
398 column: &column.column_schema.name,
399 })?
400 .with_context(|| InvalidRequestSnafu {
402 region_id: self.region_id,
403 reason: format!(
404 "column {} does not have default value",
405 column.column_schema.name
406 ),
407 })?
408 }
409 };
410
411 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413 region_id: self.region_id,
414 reason: format!(
415 "no protobuf type for default value of column {} ({:?})",
416 column.column_schema.name, column.column_schema.data_type
417 ),
418 })
419 }
420}
421
422pub(crate) fn validate_proto_value(
424 region_id: RegionId,
425 value: &Value,
426 column_schema: &ColumnSchema,
427) -> Result<()> {
428 if let Some(value_type) = proto_value_type(value) {
429 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430 InvalidRequestSnafu {
431 region_id,
432 reason: format!(
433 "column {} has unknown type {}",
434 column_schema.column_name, column_schema.datatype
435 ),
436 }
437 .build()
438 })?;
439 ensure!(
440 proto_value_type_match(column_type, value_type),
441 InvalidRequestSnafu {
442 region_id,
443 reason: format!(
444 "value has type {:?}, but column {} has type {:?}({})",
445 value_type, column_schema.column_name, column_type, column_schema.datatype,
446 ),
447 }
448 );
449 }
450
451 Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455 match (column_type, value_type) {
456 (ct, vt) if ct == vt => true,
457 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458 (ColumnDataType::Json, ColumnDataType::Binary) => true,
459 _ => false,
460 }
461}
462
463#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470 OutputTx(sender)
471 }
472
473 pub(crate) fn send(self, result: Result<AffectedRows>) {
475 let _ = self.0.send(result);
477 }
478}
479
480#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487 OptionOutputTx(sender)
488 }
489
490 pub(crate) fn none() -> OptionOutputTx {
492 OptionOutputTx(None)
493 }
494
495 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497 if let Some(sender) = self.0.take() {
498 sender.send(result);
499 }
500 }
501
502 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504 if let Some(sender) = self.0.take() {
505 sender.send(result);
506 }
507 }
508
509 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511 self.0.take()
512 }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517 Self::new(Some(OutputTx::new(sender)))
518 }
519}
520
521impl OnFailure for OptionOutputTx {
522 fn on_failure(&mut self, err: Error) {
523 self.send_mut(Err(err));
524 }
525}
526
527pub(crate) trait OnFailure {
529 fn on_failure(&mut self, err: Error);
531}
532
533#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536 pub(crate) sender: OptionOutputTx,
538 pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542 pub(crate) sender: OptionOutputTx,
543 pub(crate) region_id: RegionId,
544 pub(crate) request: BulkPart,
545 pub(crate) region_metadata: RegionMetadataRef,
546}
547
548#[derive(Debug)]
550pub(crate) enum WorkerRequest {
551 Write(SenderWriteRequest),
553
554 Ddl(SenderDdlRequest),
556
557 Background {
559 region_id: RegionId,
561 notify: BackgroundNotify,
563 },
564
565 SetRegionRoleStateGracefully {
567 region_id: RegionId,
569 region_role_state: SettableRegionRoleState,
571 sender: Sender<SetRegionRoleStateResponse>,
573 },
574
575 Stop,
577
578 EditRegion(RegionEditRequest),
580
581 SyncRegion(RegionSyncRequest),
583
584 BulkInserts {
586 metadata: Option<RegionMetadataRef>,
587 request: RegionBulkInsertsRequest,
588 sender: OptionOutputTx,
589 },
590}
591
592impl WorkerRequest {
593 pub(crate) fn new_open_region_request(
594 region_id: RegionId,
595 request: RegionOpenRequest,
596 entry_receiver: Option<WalEntryReceiver>,
597 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
598 let (sender, receiver) = oneshot::channel();
599
600 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
601 region_id,
602 sender: sender.into(),
603 request: DdlRequest::Open((request, entry_receiver)),
604 });
605
606 (worker_request, receiver)
607 }
608
609 pub(crate) fn try_from_region_request(
611 region_id: RegionId,
612 value: RegionRequest,
613 region_metadata: Option<RegionMetadataRef>,
614 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
615 let (sender, receiver) = oneshot::channel();
616 let worker_request = match value {
617 RegionRequest::Put(v) => {
618 let mut write_request =
619 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
620 .with_hint(v.hint);
621 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
622 && let Some(region_metadata) = ®ion_metadata
623 {
624 write_request.maybe_fill_missing_columns(region_metadata)?;
625 }
626 WorkerRequest::Write(SenderWriteRequest {
627 sender: sender.into(),
628 request: write_request,
629 })
630 }
631 RegionRequest::Delete(v) => {
632 let mut write_request =
633 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
634 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
635 && let Some(region_metadata) = ®ion_metadata
636 {
637 write_request.maybe_fill_missing_columns(region_metadata)?;
638 }
639 WorkerRequest::Write(SenderWriteRequest {
640 sender: sender.into(),
641 request: write_request,
642 })
643 }
644 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
645 region_id,
646 sender: sender.into(),
647 request: DdlRequest::Create(v),
648 }),
649 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
650 region_id,
651 sender: sender.into(),
652 request: DdlRequest::Drop,
653 }),
654 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
655 region_id,
656 sender: sender.into(),
657 request: DdlRequest::Open((v, None)),
658 }),
659 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
660 region_id,
661 sender: sender.into(),
662 request: DdlRequest::Close(v),
663 }),
664 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
665 region_id,
666 sender: sender.into(),
667 request: DdlRequest::Alter(v),
668 }),
669 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
670 region_id,
671 sender: sender.into(),
672 request: DdlRequest::Flush(v),
673 }),
674 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
675 region_id,
676 sender: sender.into(),
677 request: DdlRequest::Compact(v),
678 }),
679 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
680 region_id,
681 sender: sender.into(),
682 request: DdlRequest::Truncate(v),
683 }),
684 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
685 region_id,
686 sender: sender.into(),
687 request: DdlRequest::Catchup(v),
688 }),
689 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
690 metadata: region_metadata,
691 sender: sender.into(),
692 request: region_bulk_inserts_request,
693 },
694 };
695
696 Ok((worker_request, receiver))
697 }
698
699 pub(crate) fn new_set_readonly_gracefully(
700 region_id: RegionId,
701 region_role_state: SettableRegionRoleState,
702 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
703 let (sender, receiver) = oneshot::channel();
704
705 (
706 WorkerRequest::SetRegionRoleStateGracefully {
707 region_id,
708 region_role_state,
709 sender,
710 },
711 receiver,
712 )
713 }
714
715 pub(crate) fn new_sync_region_request(
716 region_id: RegionId,
717 manifest_version: ManifestVersion,
718 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
719 let (sender, receiver) = oneshot::channel();
720 (
721 WorkerRequest::SyncRegion(RegionSyncRequest {
722 region_id,
723 manifest_version,
724 sender,
725 }),
726 receiver,
727 )
728 }
729}
730
731#[derive(Debug)]
733pub(crate) enum DdlRequest {
734 Create(RegionCreateRequest),
735 Drop,
736 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
737 Close(RegionCloseRequest),
738 Alter(RegionAlterRequest),
739 Flush(RegionFlushRequest),
740 Compact(RegionCompactRequest),
741 Truncate(RegionTruncateRequest),
742 Catchup(RegionCatchupRequest),
743}
744
745#[derive(Debug)]
747pub(crate) struct SenderDdlRequest {
748 pub(crate) region_id: RegionId,
750 pub(crate) sender: OptionOutputTx,
752 pub(crate) request: DdlRequest,
754}
755
756#[derive(Debug)]
758pub(crate) enum BackgroundNotify {
759 FlushFinished(FlushFinished),
761 FlushFailed(FlushFailed),
763 CompactionFinished(CompactionFinished),
765 CompactionFailed(CompactionFailed),
767 Truncate(TruncateResult),
769 RegionChange(RegionChangeResult),
771 RegionEdit(RegionEditResult),
773}
774
775#[derive(Debug)]
777pub(crate) struct FlushFinished {
778 pub(crate) region_id: RegionId,
780 pub(crate) flushed_entry_id: EntryId,
782 pub(crate) senders: Vec<OutputTx>,
784 pub(crate) _timer: HistogramTimer,
786 pub(crate) edit: RegionEdit,
788 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
790}
791
792impl FlushFinished {
793 pub(crate) fn on_success(self) {
795 for sender in self.senders {
796 sender.send(Ok(0));
797 }
798 }
799}
800
801impl OnFailure for FlushFinished {
802 fn on_failure(&mut self, err: Error) {
803 let err = Arc::new(err);
804 for sender in self.senders.drain(..) {
805 sender.send(Err(err.clone()).context(FlushRegionSnafu {
806 region_id: self.region_id,
807 }));
808 }
809 }
810}
811
812#[derive(Debug)]
814pub(crate) struct FlushFailed {
815 pub(crate) err: Arc<Error>,
817}
818
819#[derive(Debug)]
821pub(crate) struct CompactionFinished {
822 pub(crate) region_id: RegionId,
824 pub(crate) senders: Vec<OutputTx>,
826 pub(crate) start_time: Instant,
828 pub(crate) edit: RegionEdit,
830}
831
832impl CompactionFinished {
833 pub fn on_success(self) {
834 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
836
837 for sender in self.senders {
838 sender.send(Ok(0));
839 }
840 info!("Successfully compacted region: {}", self.region_id);
841 }
842}
843
844impl OnFailure for CompactionFinished {
845 fn on_failure(&mut self, err: Error) {
847 let err = Arc::new(err);
848 for sender in self.senders.drain(..) {
849 sender.send(Err(err.clone()).context(CompactRegionSnafu {
850 region_id: self.region_id,
851 }));
852 }
853 }
854}
855
856#[derive(Debug)]
858pub(crate) struct CompactionFailed {
859 pub(crate) region_id: RegionId,
860 pub(crate) err: Arc<Error>,
862}
863
864#[derive(Debug)]
866pub(crate) struct TruncateResult {
867 pub(crate) region_id: RegionId,
869 pub(crate) sender: OptionOutputTx,
871 pub(crate) result: Result<()>,
873 pub(crate) truncated_entry_id: EntryId,
875 pub(crate) truncated_sequence: SequenceNumber,
877}
878
879#[derive(Debug)]
881pub(crate) struct RegionChangeResult {
882 pub(crate) region_id: RegionId,
884 pub(crate) new_meta: RegionMetadataRef,
886 pub(crate) sender: OptionOutputTx,
888 pub(crate) result: Result<()>,
890}
891
892#[derive(Debug)]
894pub(crate) struct RegionEditRequest {
895 pub(crate) region_id: RegionId,
896 pub(crate) edit: RegionEdit,
897 pub(crate) tx: Sender<Result<()>>,
899}
900
901#[derive(Debug)]
903pub(crate) struct RegionEditResult {
904 pub(crate) region_id: RegionId,
906 pub(crate) sender: Sender<Result<()>>,
908 pub(crate) edit: RegionEdit,
910 pub(crate) result: Result<()>,
912}
913
914#[derive(Debug)]
915pub(crate) struct RegionSyncRequest {
916 pub(crate) region_id: RegionId,
917 pub(crate) manifest_version: ManifestVersion,
918 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
920}
921
922#[cfg(test)]
923mod tests {
924 use api::v1::value::ValueData;
925 use api::v1::{Row, SemanticType};
926 use datatypes::prelude::ConcreteDataType;
927 use datatypes::schema::ColumnDefaultConstraint;
928 use mito_codec::test_util::i64_value;
929 use store_api::metadata::RegionMetadataBuilder;
930
931 use super::*;
932 use crate::error::Error;
933 use crate::test_util::ts_ms_value;
934
935 fn new_column_schema(
936 name: &str,
937 data_type: ColumnDataType,
938 semantic_type: SemanticType,
939 ) -> ColumnSchema {
940 ColumnSchema {
941 column_name: name.to_string(),
942 datatype: data_type as i32,
943 semantic_type: semantic_type as i32,
944 ..Default::default()
945 }
946 }
947
948 fn check_invalid_request(err: &Error, expect: &str) {
949 if let Error::InvalidRequest {
950 region_id: _,
951 reason,
952 location: _,
953 } = err
954 {
955 assert_eq!(reason, expect);
956 } else {
957 panic!("Unexpected error {err}")
958 }
959 }
960
961 #[test]
962 fn test_write_request_duplicate_column() {
963 let rows = Rows {
964 schema: vec![
965 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
966 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
967 ],
968 rows: vec![],
969 };
970
971 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
972 check_invalid_request(&err, "duplicate column c0");
973 }
974
975 #[test]
976 fn test_valid_write_request() {
977 let rows = Rows {
978 schema: vec![
979 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
980 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
981 ],
982 rows: vec![Row {
983 values: vec![i64_value(1), i64_value(2)],
984 }],
985 };
986
987 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
988 assert_eq!(0, request.column_index_by_name("c0").unwrap());
989 assert_eq!(1, request.column_index_by_name("c1").unwrap());
990 assert_eq!(None, request.column_index_by_name("c2"));
991 }
992
993 #[test]
994 fn test_write_request_column_num() {
995 let rows = Rows {
996 schema: vec![
997 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
998 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
999 ],
1000 rows: vec![Row {
1001 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1002 }],
1003 };
1004
1005 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1006 check_invalid_request(&err, "row has 3 columns but schema has 2");
1007 }
1008
1009 fn new_region_metadata() -> RegionMetadata {
1010 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1011 builder
1012 .push_column_metadata(ColumnMetadata {
1013 column_schema: datatypes::schema::ColumnSchema::new(
1014 "ts",
1015 ConcreteDataType::timestamp_millisecond_datatype(),
1016 false,
1017 ),
1018 semantic_type: SemanticType::Timestamp,
1019 column_id: 1,
1020 })
1021 .push_column_metadata(ColumnMetadata {
1022 column_schema: datatypes::schema::ColumnSchema::new(
1023 "k0",
1024 ConcreteDataType::int64_datatype(),
1025 true,
1026 ),
1027 semantic_type: SemanticType::Tag,
1028 column_id: 2,
1029 })
1030 .primary_key(vec![2]);
1031 builder.build().unwrap()
1032 }
1033
1034 #[test]
1035 fn test_check_schema() {
1036 let rows = Rows {
1037 schema: vec![
1038 new_column_schema(
1039 "ts",
1040 ColumnDataType::TimestampMillisecond,
1041 SemanticType::Timestamp,
1042 ),
1043 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1044 ],
1045 rows: vec![Row {
1046 values: vec![ts_ms_value(1), i64_value(2)],
1047 }],
1048 };
1049 let metadata = new_region_metadata();
1050
1051 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1052 request.check_schema(&metadata).unwrap();
1053 }
1054
1055 #[test]
1056 fn test_column_type() {
1057 let rows = Rows {
1058 schema: vec![
1059 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1060 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1061 ],
1062 rows: vec![Row {
1063 values: vec![i64_value(1), i64_value(2)],
1064 }],
1065 };
1066 let metadata = new_region_metadata();
1067
1068 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1069 let err = request.check_schema(&metadata).unwrap_err();
1070 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1071 }
1072
1073 #[test]
1074 fn test_semantic_type() {
1075 let rows = Rows {
1076 schema: vec![
1077 new_column_schema(
1078 "ts",
1079 ColumnDataType::TimestampMillisecond,
1080 SemanticType::Tag,
1081 ),
1082 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1083 ],
1084 rows: vec![Row {
1085 values: vec![ts_ms_value(1), i64_value(2)],
1086 }],
1087 };
1088 let metadata = new_region_metadata();
1089
1090 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1091 let err = request.check_schema(&metadata).unwrap_err();
1092 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1093 }
1094
1095 #[test]
1096 fn test_column_nullable() {
1097 let rows = Rows {
1098 schema: vec![
1099 new_column_schema(
1100 "ts",
1101 ColumnDataType::TimestampMillisecond,
1102 SemanticType::Timestamp,
1103 ),
1104 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1105 ],
1106 rows: vec![Row {
1107 values: vec![Value { value_data: None }, i64_value(2)],
1108 }],
1109 };
1110 let metadata = new_region_metadata();
1111
1112 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1113 let err = request.check_schema(&metadata).unwrap_err();
1114 check_invalid_request(&err, "column ts is not null but input has null");
1115 }
1116
1117 #[test]
1118 fn test_column_default() {
1119 let rows = Rows {
1120 schema: vec![new_column_schema(
1121 "k0",
1122 ColumnDataType::Int64,
1123 SemanticType::Tag,
1124 )],
1125 rows: vec![Row {
1126 values: vec![i64_value(1)],
1127 }],
1128 };
1129 let metadata = new_region_metadata();
1130
1131 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1132 let err = request.check_schema(&metadata).unwrap_err();
1133 check_invalid_request(&err, "missing column ts");
1134 }
1135
1136 #[test]
1137 fn test_unknown_column() {
1138 let rows = Rows {
1139 schema: vec![
1140 new_column_schema(
1141 "ts",
1142 ColumnDataType::TimestampMillisecond,
1143 SemanticType::Timestamp,
1144 ),
1145 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1146 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1147 ],
1148 rows: vec![Row {
1149 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1150 }],
1151 };
1152 let metadata = new_region_metadata();
1153
1154 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1155 let err = request.check_schema(&metadata).unwrap_err();
1156 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1157 }
1158
1159 #[test]
1160 fn test_fill_impure_columns_err() {
1161 let rows = Rows {
1162 schema: vec![new_column_schema(
1163 "k0",
1164 ColumnDataType::Int64,
1165 SemanticType::Tag,
1166 )],
1167 rows: vec![Row {
1168 values: vec![i64_value(1)],
1169 }],
1170 };
1171 let metadata = {
1172 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1173 builder
1174 .push_column_metadata(ColumnMetadata {
1175 column_schema: datatypes::schema::ColumnSchema::new(
1176 "ts",
1177 ConcreteDataType::timestamp_millisecond_datatype(),
1178 false,
1179 )
1180 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1181 "now()".to_string(),
1182 )))
1183 .unwrap(),
1184 semantic_type: SemanticType::Timestamp,
1185 column_id: 1,
1186 })
1187 .push_column_metadata(ColumnMetadata {
1188 column_schema: datatypes::schema::ColumnSchema::new(
1189 "k0",
1190 ConcreteDataType::int64_datatype(),
1191 true,
1192 ),
1193 semantic_type: SemanticType::Tag,
1194 column_id: 2,
1195 })
1196 .primary_key(vec![2]);
1197 builder.build().unwrap()
1198 };
1199
1200 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1201 let err = request.check_schema(&metadata).unwrap_err();
1202 assert!(err.is_fill_default());
1203 assert!(request
1204 .fill_missing_columns(&metadata)
1205 .unwrap_err()
1206 .to_string()
1207 .contains("unexpected impure default value with region_id"));
1208 }
1209
1210 #[test]
1211 fn test_fill_missing_columns() {
1212 let rows = Rows {
1213 schema: vec![new_column_schema(
1214 "ts",
1215 ColumnDataType::TimestampMillisecond,
1216 SemanticType::Timestamp,
1217 )],
1218 rows: vec![Row {
1219 values: vec![ts_ms_value(1)],
1220 }],
1221 };
1222 let metadata = new_region_metadata();
1223
1224 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1225 let err = request.check_schema(&metadata).unwrap_err();
1226 assert!(err.is_fill_default());
1227 request.fill_missing_columns(&metadata).unwrap();
1228
1229 let expect_rows = Rows {
1230 schema: vec![new_column_schema(
1231 "ts",
1232 ColumnDataType::TimestampMillisecond,
1233 SemanticType::Timestamp,
1234 )],
1235 rows: vec![Row {
1236 values: vec![ts_ms_value(1)],
1237 }],
1238 };
1239 assert_eq!(expect_rows, request.rows);
1240 }
1241
1242 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1243 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1244 builder
1245 .push_column_metadata(ColumnMetadata {
1246 column_schema: datatypes::schema::ColumnSchema::new(
1247 "ts",
1248 ConcreteDataType::timestamp_millisecond_datatype(),
1249 false,
1250 ),
1251 semantic_type: SemanticType::Timestamp,
1252 column_id: 1,
1253 })
1254 .push_column_metadata(ColumnMetadata {
1255 column_schema: datatypes::schema::ColumnSchema::new(
1256 "k0",
1257 ConcreteDataType::int64_datatype(),
1258 true,
1259 ),
1260 semantic_type: SemanticType::Tag,
1261 column_id: 2,
1262 })
1263 .primary_key(vec![2]);
1264 builder
1265 }
1266
1267 fn region_metadata_two_fields() -> RegionMetadata {
1268 let mut builder = builder_with_ts_tag();
1269 builder
1270 .push_column_metadata(ColumnMetadata {
1271 column_schema: datatypes::schema::ColumnSchema::new(
1272 "f0",
1273 ConcreteDataType::int64_datatype(),
1274 true,
1275 ),
1276 semantic_type: SemanticType::Field,
1277 column_id: 3,
1278 })
1279 .push_column_metadata(ColumnMetadata {
1281 column_schema: datatypes::schema::ColumnSchema::new(
1282 "f1",
1283 ConcreteDataType::int64_datatype(),
1284 false,
1285 )
1286 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1287 datatypes::value::Value::Int64(100),
1288 )))
1289 .unwrap(),
1290 semantic_type: SemanticType::Field,
1291 column_id: 4,
1292 });
1293 builder.build().unwrap()
1294 }
1295
1296 #[test]
1297 fn test_fill_missing_for_delete() {
1298 let rows = Rows {
1299 schema: vec![new_column_schema(
1300 "ts",
1301 ColumnDataType::TimestampMillisecond,
1302 SemanticType::Timestamp,
1303 )],
1304 rows: vec![Row {
1305 values: vec![ts_ms_value(1)],
1306 }],
1307 };
1308 let metadata = region_metadata_two_fields();
1309
1310 let mut request =
1311 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1312 let err = request.check_schema(&metadata).unwrap_err();
1313 check_invalid_request(&err, "delete requests need column k0");
1314 let err = request.fill_missing_columns(&metadata).unwrap_err();
1315 check_invalid_request(&err, "delete requests need column k0");
1316
1317 let rows = Rows {
1318 schema: vec![
1319 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1320 new_column_schema(
1321 "ts",
1322 ColumnDataType::TimestampMillisecond,
1323 SemanticType::Timestamp,
1324 ),
1325 ],
1326 rows: vec![Row {
1327 values: vec![i64_value(100), ts_ms_value(1)],
1328 }],
1329 };
1330 let mut request =
1331 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1332 let err = request.check_schema(&metadata).unwrap_err();
1333 assert!(err.is_fill_default());
1334 request.fill_missing_columns(&metadata).unwrap();
1335
1336 let expect_rows = Rows {
1337 schema: vec![
1338 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1339 new_column_schema(
1340 "ts",
1341 ColumnDataType::TimestampMillisecond,
1342 SemanticType::Timestamp,
1343 ),
1344 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1345 ],
1346 rows: vec![Row {
1348 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1349 }],
1350 };
1351 assert_eq!(expect_rows, request.rows);
1352 }
1353
1354 #[test]
1355 fn test_fill_missing_without_default_in_delete() {
1356 let mut builder = builder_with_ts_tag();
1357 builder
1358 .push_column_metadata(ColumnMetadata {
1360 column_schema: datatypes::schema::ColumnSchema::new(
1361 "f0",
1362 ConcreteDataType::int64_datatype(),
1363 true,
1364 ),
1365 semantic_type: SemanticType::Field,
1366 column_id: 3,
1367 })
1368 .push_column_metadata(ColumnMetadata {
1370 column_schema: datatypes::schema::ColumnSchema::new(
1371 "f1",
1372 ConcreteDataType::int64_datatype(),
1373 false,
1374 ),
1375 semantic_type: SemanticType::Field,
1376 column_id: 4,
1377 });
1378 let metadata = builder.build().unwrap();
1379
1380 let rows = Rows {
1381 schema: vec![
1382 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1383 new_column_schema(
1384 "ts",
1385 ColumnDataType::TimestampMillisecond,
1386 SemanticType::Timestamp,
1387 ),
1388 ],
1389 rows: vec![Row {
1391 values: vec![i64_value(100), ts_ms_value(1)],
1392 }],
1393 };
1394 let mut request =
1395 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1396 let err = request.check_schema(&metadata).unwrap_err();
1397 assert!(err.is_fill_default());
1398 request.fill_missing_columns(&metadata).unwrap();
1399
1400 let expect_rows = Rows {
1401 schema: vec![
1402 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1403 new_column_schema(
1404 "ts",
1405 ColumnDataType::TimestampMillisecond,
1406 SemanticType::Timestamp,
1407 ),
1408 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1409 ],
1410 rows: vec![Row {
1412 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1413 }],
1414 };
1415 assert_eq!(expect_rows, request.rows);
1416 }
1417
1418 #[test]
1419 fn test_no_default() {
1420 let rows = Rows {
1421 schema: vec![new_column_schema(
1422 "k0",
1423 ColumnDataType::Int64,
1424 SemanticType::Tag,
1425 )],
1426 rows: vec![Row {
1427 values: vec![i64_value(1)],
1428 }],
1429 };
1430 let metadata = new_region_metadata();
1431
1432 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1433 let err = request.fill_missing_columns(&metadata).unwrap_err();
1434 check_invalid_request(&err, "column ts does not have default value");
1435 }
1436
1437 #[test]
1438 fn test_missing_and_invalid() {
1439 let rows = Rows {
1441 schema: vec![
1442 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1443 new_column_schema(
1444 "ts",
1445 ColumnDataType::TimestampMillisecond,
1446 SemanticType::Timestamp,
1447 ),
1448 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1449 ],
1450 rows: vec![Row {
1451 values: vec![
1452 i64_value(100),
1453 ts_ms_value(1),
1454 Value {
1455 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1456 },
1457 ],
1458 }],
1459 };
1460 let metadata = region_metadata_two_fields();
1461
1462 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1463 let err = request.check_schema(&metadata).unwrap_err();
1464 check_invalid_request(
1465 &err,
1466 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1467 );
1468 }
1469
1470 #[test]
1471 fn test_write_request_metadata() {
1472 let rows = Rows {
1473 schema: vec![
1474 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1475 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1476 ],
1477 rows: vec![Row {
1478 values: vec![i64_value(1), i64_value(2)],
1479 }],
1480 };
1481
1482 let metadata = Arc::new(new_region_metadata());
1483 let request = WriteRequest::new(
1484 RegionId::new(1, 1),
1485 OpType::Put,
1486 rows,
1487 Some(metadata.clone()),
1488 )
1489 .unwrap();
1490
1491 assert!(request.region_metadata.is_some());
1492 assert_eq!(
1493 request.region_metadata.unwrap().region_id,
1494 RegionId::new(1, 1)
1495 );
1496 }
1497}