1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56#[derive(Debug)]
58pub struct WriteRequest {
59 pub region_id: RegionId,
61 pub op_type: OpType,
63 pub rows: Rows,
65 pub name_to_index: HashMap<String, usize>,
67 pub has_null: Vec<bool>,
69 pub hint: Option<WriteHint>,
71 pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76 pub fn new(
80 region_id: RegionId,
81 op_type: OpType,
82 rows: Rows,
83 region_metadata: Option<RegionMetadataRef>,
84 ) -> Result<WriteRequest> {
85 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86 for (index, column) in rows.schema.iter().enumerate() {
87 ensure!(
88 name_to_index
89 .insert(column.column_name.clone(), index)
90 .is_none(),
91 InvalidRequestSnafu {
92 region_id,
93 reason: format!("duplicate column {}", column.column_name),
94 }
95 );
96 }
97
98 let mut has_null = vec![false; rows.schema.len()];
99 for row in &rows.rows {
100 ensure!(
101 row.values.len() == rows.schema.len(),
102 InvalidRequestSnafu {
103 region_id,
104 reason: format!(
105 "row has {} columns but schema has {}",
106 row.values.len(),
107 rows.schema.len()
108 ),
109 }
110 );
111
112 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113 validate_proto_value(region_id, value, column_schema)?;
114
115 if value.value_data.is_none() {
116 has_null[i] = true;
117 }
118 }
119 }
120
121 Ok(WriteRequest {
122 region_id,
123 op_type,
124 rows,
125 name_to_index,
126 has_null,
127 hint: None,
128 region_metadata,
129 })
130 }
131
132 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134 self.hint = hint;
135 self
136 }
137
138 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140 infer_primary_key_encoding_from_hint(self.hint.as_ref())
141 }
142
143 pub(crate) fn estimated_size(&self) -> usize {
145 let row_size = self
146 .rows
147 .rows
148 .first()
149 .map(|row| row.encoded_len())
150 .unwrap_or(0);
151 row_size * self.rows.rows.len()
152 }
153
154 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156 self.name_to_index.get(name).copied()
157 }
158
159 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164 debug_assert_eq!(self.region_id, metadata.region_id);
165
166 let region_id = self.region_id;
167 let mut rows_columns: HashMap<_, _> = self
169 .rows
170 .schema
171 .iter()
172 .map(|column| (&column.column_name, column))
173 .collect();
174
175 let mut need_fill_default = false;
176 for column in &metadata.column_metadatas {
178 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179 ensure!(
181 is_column_type_value_eq(
182 input_col.datatype,
183 input_col.datatype_extension,
184 &column.column_schema.data_type
185 ),
186 InvalidRequestSnafu {
187 region_id,
188 reason: format!(
189 "column {} expect type {:?}, given: {}({})",
190 column.column_schema.name,
191 column.column_schema.data_type,
192 ColumnDataType::try_from(input_col.datatype)
193 .map(|v| v.as_str_name())
194 .unwrap_or("Unknown"),
195 input_col.datatype,
196 )
197 }
198 );
199
200 ensure!(
202 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203 InvalidRequestSnafu {
204 region_id,
205 reason: format!(
206 "column {} has semantic type {:?}, given: {}({})",
207 column.column_schema.name,
208 column.semantic_type,
209 api::v1::SemanticType::try_from(input_col.semantic_type)
210 .map(|v| v.as_str_name())
211 .unwrap_or("Unknown"),
212 input_col.semantic_type
213 ),
214 }
215 );
216
217 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220 ensure!(
221 !has_null || column.column_schema.is_nullable(),
222 InvalidRequestSnafu {
223 region_id,
224 reason: format!(
225 "column {} is not null but input has null",
226 column.column_schema.name
227 ),
228 }
229 );
230 } else {
231 self.check_missing_column(column)?;
233
234 need_fill_default = true;
235 }
236 }
237
238 if !rows_columns.is_empty() {
240 let names: Vec<_> = rows_columns.into_keys().collect();
241 return InvalidRequestSnafu {
242 region_id,
243 reason: format!("unknown columns: {:?}", names),
244 }
245 .fail();
246 }
247
248 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251 Ok(())
252 }
253
254 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259 debug_assert_eq!(self.region_id, metadata.region_id);
260
261 let mut columns_to_fill = vec![];
262 for column in &metadata.column_metadatas {
263 if !self.name_to_index.contains_key(&column.column_schema.name) {
264 columns_to_fill.push(column);
265 }
266 }
267 self.fill_columns(columns_to_fill)?;
268
269 Ok(())
270 }
271
272 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274 if let Err(e) = self.check_schema(metadata) {
275 if e.is_fill_default() {
276 self.fill_missing_columns(metadata)?;
280 } else {
281 return Err(e);
282 }
283 }
284
285 Ok(())
286 }
287
288 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290 let mut default_values = Vec::with_capacity(columns.len());
291 let mut columns_to_fill = Vec::with_capacity(columns.len());
292 for column in columns {
293 let default_value = self.column_default_value(column)?;
294 if default_value.value_data.is_some() {
295 default_values.push(default_value);
296 columns_to_fill.push(column);
297 }
298 }
299
300 for row in &mut self.rows.rows {
301 row.values.extend(default_values.iter().cloned());
302 }
303
304 for column in columns_to_fill {
305 let (datatype, datatype_ext) =
306 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307 .with_context(|_| ConvertColumnDataTypeSnafu {
308 reason: format!(
309 "no protobuf type for column {} ({:?})",
310 column.column_schema.name, column.column_schema.data_type
311 ),
312 })?
313 .to_parts();
314 self.rows.schema.push(ColumnSchema {
315 column_name: column.column_schema.name.clone(),
316 datatype: datatype as i32,
317 semantic_type: column.semantic_type as i32,
318 datatype_extension: datatype_ext,
319 options: options_from_column_schema(&column.column_schema),
320 });
321 }
322
323 Ok(())
324 }
325
326 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328 if self.op_type == OpType::Delete {
329 if column.semantic_type == SemanticType::Field {
330 return Ok(());
333 } else {
334 return InvalidRequestSnafu {
335 region_id: self.region_id,
336 reason: format!("delete requests need column {}", column.column_schema.name),
337 }
338 .fail();
339 }
340 }
341
342 ensure!(
344 column.column_schema.is_nullable()
345 || column.column_schema.default_constraint().is_some(),
346 InvalidRequestSnafu {
347 region_id: self.region_id,
348 reason: format!("missing column {}", column.column_schema.name),
349 }
350 );
351
352 Ok(())
353 }
354
355 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357 let default_value = match self.op_type {
358 OpType::Delete => {
359 ensure!(
360 column.semantic_type == SemanticType::Field,
361 InvalidRequestSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "delete requests need column {}",
365 column.column_schema.name
366 ),
367 }
368 );
369
370 if column.column_schema.is_nullable() {
375 datatypes::value::Value::Null
376 } else {
377 column.column_schema.data_type.default_value()
378 }
379 }
380 OpType::Put => {
381 if column.column_schema.is_default_impure() {
383 UnexpectedImpureDefaultSnafu {
384 region_id: self.region_id,
385 column: &column.column_schema.name,
386 default_value: format!("{:?}", column.column_schema.default_constraint()),
387 }
388 .fail()?
389 }
390 column
391 .column_schema
392 .create_default()
393 .context(CreateDefaultSnafu {
394 region_id: self.region_id,
395 column: &column.column_schema.name,
396 })?
397 .with_context(|| InvalidRequestSnafu {
399 region_id: self.region_id,
400 reason: format!(
401 "column {} does not have default value",
402 column.column_schema.name
403 ),
404 })?
405 }
406 };
407
408 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
410 region_id: self.region_id,
411 reason: format!(
412 "no protobuf type for default value of column {} ({:?})",
413 column.column_schema.name, column.column_schema.data_type
414 ),
415 })
416 }
417}
418
419pub(crate) fn validate_proto_value(
421 region_id: RegionId,
422 value: &Value,
423 column_schema: &ColumnSchema,
424) -> Result<()> {
425 if let Some(value_type) = proto_value_type(value) {
426 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427 InvalidRequestSnafu {
428 region_id,
429 reason: format!(
430 "column {} has unknown type {}",
431 column_schema.column_name, column_schema.datatype
432 ),
433 }
434 .build()
435 })?;
436 ensure!(
437 proto_value_type_match(column_type, value_type),
438 InvalidRequestSnafu {
439 region_id,
440 reason: format!(
441 "value has type {:?}, but column {} has type {:?}({})",
442 value_type, column_schema.column_name, column_type, column_schema.datatype,
443 ),
444 }
445 );
446 }
447
448 Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452 match (column_type, value_type) {
453 (ct, vt) if ct == vt => true,
454 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455 (ColumnDataType::Json, ColumnDataType::Binary) => true,
456 _ => false,
457 }
458}
459
460#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467 OutputTx(sender)
468 }
469
470 pub(crate) fn send(self, result: Result<AffectedRows>) {
472 let _ = self.0.send(result);
474 }
475}
476
477#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484 OptionOutputTx(sender)
485 }
486
487 pub(crate) fn none() -> OptionOutputTx {
489 OptionOutputTx(None)
490 }
491
492 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494 if let Some(sender) = self.0.take() {
495 sender.send(result);
496 }
497 }
498
499 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501 if let Some(sender) = self.0.take() {
502 sender.send(result);
503 }
504 }
505
506 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508 self.0.take()
509 }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514 Self::new(Some(OutputTx::new(sender)))
515 }
516}
517
518impl OnFailure for OptionOutputTx {
519 fn on_failure(&mut self, err: Error) {
520 self.send_mut(Err(err));
521 }
522}
523
524pub(crate) trait OnFailure {
526 fn on_failure(&mut self, err: Error);
528}
529
530#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533 pub(crate) sender: OptionOutputTx,
535 pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539 pub(crate) sender: OptionOutputTx,
540 pub(crate) region_id: RegionId,
541 pub(crate) request: BulkPart,
542 pub(crate) region_metadata: RegionMetadataRef,
543}
544
545#[derive(Debug)]
547pub(crate) enum WorkerRequest {
548 Write(SenderWriteRequest),
550
551 Ddl(SenderDdlRequest),
553
554 Background {
556 region_id: RegionId,
558 notify: BackgroundNotify,
560 },
561
562 SetRegionRoleStateGracefully {
564 region_id: RegionId,
566 region_role_state: SettableRegionRoleState,
568 sender: Sender<SetRegionRoleStateResponse>,
570 },
571
572 Stop,
574
575 EditRegion(RegionEditRequest),
577
578 SyncRegion(RegionSyncRequest),
580
581 BulkInserts {
583 metadata: Option<RegionMetadataRef>,
584 request: RegionBulkInsertsRequest,
585 sender: OptionOutputTx,
586 },
587}
588
589impl WorkerRequest {
590 pub(crate) fn new_open_region_request(
591 region_id: RegionId,
592 request: RegionOpenRequest,
593 entry_receiver: Option<WalEntryReceiver>,
594 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
595 let (sender, receiver) = oneshot::channel();
596
597 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
598 region_id,
599 sender: sender.into(),
600 request: DdlRequest::Open((request, entry_receiver)),
601 });
602
603 (worker_request, receiver)
604 }
605
606 pub(crate) fn try_from_region_request(
608 region_id: RegionId,
609 value: RegionRequest,
610 region_metadata: Option<RegionMetadataRef>,
611 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
612 let (sender, receiver) = oneshot::channel();
613 let worker_request = match value {
614 RegionRequest::Put(v) => {
615 let mut write_request =
616 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
617 .with_hint(v.hint);
618 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
619 && let Some(region_metadata) = ®ion_metadata
620 {
621 write_request.maybe_fill_missing_columns(region_metadata)?;
622 }
623 WorkerRequest::Write(SenderWriteRequest {
624 sender: sender.into(),
625 request: write_request,
626 })
627 }
628 RegionRequest::Delete(v) => {
629 let mut write_request =
630 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
631 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
632 && let Some(region_metadata) = ®ion_metadata
633 {
634 write_request.maybe_fill_missing_columns(region_metadata)?;
635 }
636 WorkerRequest::Write(SenderWriteRequest {
637 sender: sender.into(),
638 request: write_request,
639 })
640 }
641 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
642 region_id,
643 sender: sender.into(),
644 request: DdlRequest::Create(v),
645 }),
646 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
647 region_id,
648 sender: sender.into(),
649 request: DdlRequest::Drop,
650 }),
651 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
652 region_id,
653 sender: sender.into(),
654 request: DdlRequest::Open((v, None)),
655 }),
656 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
657 region_id,
658 sender: sender.into(),
659 request: DdlRequest::Close(v),
660 }),
661 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
662 region_id,
663 sender: sender.into(),
664 request: DdlRequest::Alter(v),
665 }),
666 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
667 region_id,
668 sender: sender.into(),
669 request: DdlRequest::Flush(v),
670 }),
671 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
672 region_id,
673 sender: sender.into(),
674 request: DdlRequest::Compact(v),
675 }),
676 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
677 region_id,
678 sender: sender.into(),
679 request: DdlRequest::Truncate(v),
680 }),
681 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
682 region_id,
683 sender: sender.into(),
684 request: DdlRequest::Catchup(v),
685 }),
686 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
687 metadata: region_metadata,
688 sender: sender.into(),
689 request: region_bulk_inserts_request,
690 },
691 };
692
693 Ok((worker_request, receiver))
694 }
695
696 pub(crate) fn new_set_readonly_gracefully(
697 region_id: RegionId,
698 region_role_state: SettableRegionRoleState,
699 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
700 let (sender, receiver) = oneshot::channel();
701
702 (
703 WorkerRequest::SetRegionRoleStateGracefully {
704 region_id,
705 region_role_state,
706 sender,
707 },
708 receiver,
709 )
710 }
711
712 pub(crate) fn new_sync_region_request(
713 region_id: RegionId,
714 manifest_version: ManifestVersion,
715 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
716 let (sender, receiver) = oneshot::channel();
717 (
718 WorkerRequest::SyncRegion(RegionSyncRequest {
719 region_id,
720 manifest_version,
721 sender,
722 }),
723 receiver,
724 )
725 }
726}
727
728#[derive(Debug)]
730pub(crate) enum DdlRequest {
731 Create(RegionCreateRequest),
732 Drop,
733 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
734 Close(RegionCloseRequest),
735 Alter(RegionAlterRequest),
736 Flush(RegionFlushRequest),
737 Compact(RegionCompactRequest),
738 Truncate(RegionTruncateRequest),
739 Catchup(RegionCatchupRequest),
740}
741
742#[derive(Debug)]
744pub(crate) struct SenderDdlRequest {
745 pub(crate) region_id: RegionId,
747 pub(crate) sender: OptionOutputTx,
749 pub(crate) request: DdlRequest,
751}
752
753#[derive(Debug)]
755pub(crate) enum BackgroundNotify {
756 FlushFinished(FlushFinished),
758 FlushFailed(FlushFailed),
760 CompactionFinished(CompactionFinished),
762 CompactionFailed(CompactionFailed),
764 Truncate(TruncateResult),
766 RegionChange(RegionChangeResult),
768 RegionEdit(RegionEditResult),
770}
771
772#[derive(Debug)]
774pub(crate) struct FlushFinished {
775 pub(crate) region_id: RegionId,
777 pub(crate) flushed_entry_id: EntryId,
779 pub(crate) senders: Vec<OutputTx>,
781 pub(crate) _timer: HistogramTimer,
783 pub(crate) edit: RegionEdit,
785 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
787}
788
789impl FlushFinished {
790 pub(crate) fn on_success(self) {
792 for sender in self.senders {
793 sender.send(Ok(0));
794 }
795 }
796}
797
798impl OnFailure for FlushFinished {
799 fn on_failure(&mut self, err: Error) {
800 let err = Arc::new(err);
801 for sender in self.senders.drain(..) {
802 sender.send(Err(err.clone()).context(FlushRegionSnafu {
803 region_id: self.region_id,
804 }));
805 }
806 }
807}
808
809#[derive(Debug)]
811pub(crate) struct FlushFailed {
812 pub(crate) err: Arc<Error>,
814}
815
816#[derive(Debug)]
818pub(crate) struct CompactionFinished {
819 pub(crate) region_id: RegionId,
821 pub(crate) senders: Vec<OutputTx>,
823 pub(crate) start_time: Instant,
825 pub(crate) edit: RegionEdit,
827}
828
829impl CompactionFinished {
830 pub fn on_success(self) {
831 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
833
834 for sender in self.senders {
835 sender.send(Ok(0));
836 }
837 info!("Successfully compacted region: {}", self.region_id);
838 }
839}
840
841impl OnFailure for CompactionFinished {
842 fn on_failure(&mut self, err: Error) {
844 let err = Arc::new(err);
845 for sender in self.senders.drain(..) {
846 sender.send(Err(err.clone()).context(CompactRegionSnafu {
847 region_id: self.region_id,
848 }));
849 }
850 }
851}
852
853#[derive(Debug)]
855pub(crate) struct CompactionFailed {
856 pub(crate) region_id: RegionId,
857 pub(crate) err: Arc<Error>,
859}
860
861#[derive(Debug)]
863pub(crate) struct TruncateResult {
864 pub(crate) region_id: RegionId,
866 pub(crate) sender: OptionOutputTx,
868 pub(crate) result: Result<()>,
870 pub(crate) truncated_entry_id: EntryId,
872 pub(crate) truncated_sequence: SequenceNumber,
874}
875
876#[derive(Debug)]
878pub(crate) struct RegionChangeResult {
879 pub(crate) region_id: RegionId,
881 pub(crate) new_meta: RegionMetadataRef,
883 pub(crate) sender: OptionOutputTx,
885 pub(crate) result: Result<()>,
887}
888
889#[derive(Debug)]
891pub(crate) struct RegionEditRequest {
892 pub(crate) region_id: RegionId,
893 pub(crate) edit: RegionEdit,
894 pub(crate) tx: Sender<Result<()>>,
896}
897
898#[derive(Debug)]
900pub(crate) struct RegionEditResult {
901 pub(crate) region_id: RegionId,
903 pub(crate) sender: Sender<Result<()>>,
905 pub(crate) edit: RegionEdit,
907 pub(crate) result: Result<()>,
909}
910
911#[derive(Debug)]
912pub(crate) struct RegionSyncRequest {
913 pub(crate) region_id: RegionId,
914 pub(crate) manifest_version: ManifestVersion,
915 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
917}
918
919#[cfg(test)]
920mod tests {
921 use api::v1::value::ValueData;
922 use api::v1::{Row, SemanticType};
923 use datatypes::prelude::ConcreteDataType;
924 use datatypes::schema::ColumnDefaultConstraint;
925 use store_api::metadata::RegionMetadataBuilder;
926
927 use super::*;
928 use crate::error::Error;
929 use crate::test_util::{i64_value, ts_ms_value};
930
931 fn new_column_schema(
932 name: &str,
933 data_type: ColumnDataType,
934 semantic_type: SemanticType,
935 ) -> ColumnSchema {
936 ColumnSchema {
937 column_name: name.to_string(),
938 datatype: data_type as i32,
939 semantic_type: semantic_type as i32,
940 ..Default::default()
941 }
942 }
943
944 fn check_invalid_request(err: &Error, expect: &str) {
945 if let Error::InvalidRequest {
946 region_id: _,
947 reason,
948 location: _,
949 } = err
950 {
951 assert_eq!(reason, expect);
952 } else {
953 panic!("Unexpected error {err}")
954 }
955 }
956
957 #[test]
958 fn test_write_request_duplicate_column() {
959 let rows = Rows {
960 schema: vec![
961 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
962 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
963 ],
964 rows: vec![],
965 };
966
967 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
968 check_invalid_request(&err, "duplicate column c0");
969 }
970
971 #[test]
972 fn test_valid_write_request() {
973 let rows = Rows {
974 schema: vec![
975 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
976 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
977 ],
978 rows: vec![Row {
979 values: vec![i64_value(1), i64_value(2)],
980 }],
981 };
982
983 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
984 assert_eq!(0, request.column_index_by_name("c0").unwrap());
985 assert_eq!(1, request.column_index_by_name("c1").unwrap());
986 assert_eq!(None, request.column_index_by_name("c2"));
987 }
988
989 #[test]
990 fn test_write_request_column_num() {
991 let rows = Rows {
992 schema: vec![
993 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
994 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
995 ],
996 rows: vec![Row {
997 values: vec![i64_value(1), i64_value(2), i64_value(3)],
998 }],
999 };
1000
1001 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1002 check_invalid_request(&err, "row has 3 columns but schema has 2");
1003 }
1004
1005 fn new_region_metadata() -> RegionMetadata {
1006 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1007 builder
1008 .push_column_metadata(ColumnMetadata {
1009 column_schema: datatypes::schema::ColumnSchema::new(
1010 "ts",
1011 ConcreteDataType::timestamp_millisecond_datatype(),
1012 false,
1013 ),
1014 semantic_type: SemanticType::Timestamp,
1015 column_id: 1,
1016 })
1017 .push_column_metadata(ColumnMetadata {
1018 column_schema: datatypes::schema::ColumnSchema::new(
1019 "k0",
1020 ConcreteDataType::int64_datatype(),
1021 true,
1022 ),
1023 semantic_type: SemanticType::Tag,
1024 column_id: 2,
1025 })
1026 .primary_key(vec![2]);
1027 builder.build().unwrap()
1028 }
1029
1030 #[test]
1031 fn test_check_schema() {
1032 let rows = Rows {
1033 schema: vec![
1034 new_column_schema(
1035 "ts",
1036 ColumnDataType::TimestampMillisecond,
1037 SemanticType::Timestamp,
1038 ),
1039 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1040 ],
1041 rows: vec![Row {
1042 values: vec![ts_ms_value(1), i64_value(2)],
1043 }],
1044 };
1045 let metadata = new_region_metadata();
1046
1047 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1048 request.check_schema(&metadata).unwrap();
1049 }
1050
1051 #[test]
1052 fn test_column_type() {
1053 let rows = Rows {
1054 schema: vec![
1055 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1056 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1057 ],
1058 rows: vec![Row {
1059 values: vec![i64_value(1), i64_value(2)],
1060 }],
1061 };
1062 let metadata = new_region_metadata();
1063
1064 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1065 let err = request.check_schema(&metadata).unwrap_err();
1066 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1067 }
1068
1069 #[test]
1070 fn test_semantic_type() {
1071 let rows = Rows {
1072 schema: vec![
1073 new_column_schema(
1074 "ts",
1075 ColumnDataType::TimestampMillisecond,
1076 SemanticType::Tag,
1077 ),
1078 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1079 ],
1080 rows: vec![Row {
1081 values: vec![ts_ms_value(1), i64_value(2)],
1082 }],
1083 };
1084 let metadata = new_region_metadata();
1085
1086 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1087 let err = request.check_schema(&metadata).unwrap_err();
1088 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1089 }
1090
1091 #[test]
1092 fn test_column_nullable() {
1093 let rows = Rows {
1094 schema: vec![
1095 new_column_schema(
1096 "ts",
1097 ColumnDataType::TimestampMillisecond,
1098 SemanticType::Timestamp,
1099 ),
1100 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1101 ],
1102 rows: vec![Row {
1103 values: vec![Value { value_data: None }, i64_value(2)],
1104 }],
1105 };
1106 let metadata = new_region_metadata();
1107
1108 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1109 let err = request.check_schema(&metadata).unwrap_err();
1110 check_invalid_request(&err, "column ts is not null but input has null");
1111 }
1112
1113 #[test]
1114 fn test_column_default() {
1115 let rows = Rows {
1116 schema: vec![new_column_schema(
1117 "k0",
1118 ColumnDataType::Int64,
1119 SemanticType::Tag,
1120 )],
1121 rows: vec![Row {
1122 values: vec![i64_value(1)],
1123 }],
1124 };
1125 let metadata = new_region_metadata();
1126
1127 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1128 let err = request.check_schema(&metadata).unwrap_err();
1129 check_invalid_request(&err, "missing column ts");
1130 }
1131
1132 #[test]
1133 fn test_unknown_column() {
1134 let rows = Rows {
1135 schema: vec![
1136 new_column_schema(
1137 "ts",
1138 ColumnDataType::TimestampMillisecond,
1139 SemanticType::Timestamp,
1140 ),
1141 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1142 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1143 ],
1144 rows: vec![Row {
1145 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1146 }],
1147 };
1148 let metadata = new_region_metadata();
1149
1150 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1151 let err = request.check_schema(&metadata).unwrap_err();
1152 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1153 }
1154
1155 #[test]
1156 fn test_fill_impure_columns_err() {
1157 let rows = Rows {
1158 schema: vec![new_column_schema(
1159 "k0",
1160 ColumnDataType::Int64,
1161 SemanticType::Tag,
1162 )],
1163 rows: vec![Row {
1164 values: vec![i64_value(1)],
1165 }],
1166 };
1167 let metadata = {
1168 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1169 builder
1170 .push_column_metadata(ColumnMetadata {
1171 column_schema: datatypes::schema::ColumnSchema::new(
1172 "ts",
1173 ConcreteDataType::timestamp_millisecond_datatype(),
1174 false,
1175 )
1176 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1177 "now()".to_string(),
1178 )))
1179 .unwrap(),
1180 semantic_type: SemanticType::Timestamp,
1181 column_id: 1,
1182 })
1183 .push_column_metadata(ColumnMetadata {
1184 column_schema: datatypes::schema::ColumnSchema::new(
1185 "k0",
1186 ConcreteDataType::int64_datatype(),
1187 true,
1188 ),
1189 semantic_type: SemanticType::Tag,
1190 column_id: 2,
1191 })
1192 .primary_key(vec![2]);
1193 builder.build().unwrap()
1194 };
1195
1196 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1197 let err = request.check_schema(&metadata).unwrap_err();
1198 assert!(err.is_fill_default());
1199 assert!(request
1200 .fill_missing_columns(&metadata)
1201 .unwrap_err()
1202 .to_string()
1203 .contains("Unexpected impure default value with region_id"));
1204 }
1205
1206 #[test]
1207 fn test_fill_missing_columns() {
1208 let rows = Rows {
1209 schema: vec![new_column_schema(
1210 "ts",
1211 ColumnDataType::TimestampMillisecond,
1212 SemanticType::Timestamp,
1213 )],
1214 rows: vec![Row {
1215 values: vec![ts_ms_value(1)],
1216 }],
1217 };
1218 let metadata = new_region_metadata();
1219
1220 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1221 let err = request.check_schema(&metadata).unwrap_err();
1222 assert!(err.is_fill_default());
1223 request.fill_missing_columns(&metadata).unwrap();
1224
1225 let expect_rows = Rows {
1226 schema: vec![new_column_schema(
1227 "ts",
1228 ColumnDataType::TimestampMillisecond,
1229 SemanticType::Timestamp,
1230 )],
1231 rows: vec![Row {
1232 values: vec![ts_ms_value(1)],
1233 }],
1234 };
1235 assert_eq!(expect_rows, request.rows);
1236 }
1237
1238 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1239 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1240 builder
1241 .push_column_metadata(ColumnMetadata {
1242 column_schema: datatypes::schema::ColumnSchema::new(
1243 "ts",
1244 ConcreteDataType::timestamp_millisecond_datatype(),
1245 false,
1246 ),
1247 semantic_type: SemanticType::Timestamp,
1248 column_id: 1,
1249 })
1250 .push_column_metadata(ColumnMetadata {
1251 column_schema: datatypes::schema::ColumnSchema::new(
1252 "k0",
1253 ConcreteDataType::int64_datatype(),
1254 true,
1255 ),
1256 semantic_type: SemanticType::Tag,
1257 column_id: 2,
1258 })
1259 .primary_key(vec![2]);
1260 builder
1261 }
1262
1263 fn region_metadata_two_fields() -> RegionMetadata {
1264 let mut builder = builder_with_ts_tag();
1265 builder
1266 .push_column_metadata(ColumnMetadata {
1267 column_schema: datatypes::schema::ColumnSchema::new(
1268 "f0",
1269 ConcreteDataType::int64_datatype(),
1270 true,
1271 ),
1272 semantic_type: SemanticType::Field,
1273 column_id: 3,
1274 })
1275 .push_column_metadata(ColumnMetadata {
1277 column_schema: datatypes::schema::ColumnSchema::new(
1278 "f1",
1279 ConcreteDataType::int64_datatype(),
1280 false,
1281 )
1282 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1283 datatypes::value::Value::Int64(100),
1284 )))
1285 .unwrap(),
1286 semantic_type: SemanticType::Field,
1287 column_id: 4,
1288 });
1289 builder.build().unwrap()
1290 }
1291
1292 #[test]
1293 fn test_fill_missing_for_delete() {
1294 let rows = Rows {
1295 schema: vec![new_column_schema(
1296 "ts",
1297 ColumnDataType::TimestampMillisecond,
1298 SemanticType::Timestamp,
1299 )],
1300 rows: vec![Row {
1301 values: vec![ts_ms_value(1)],
1302 }],
1303 };
1304 let metadata = region_metadata_two_fields();
1305
1306 let mut request =
1307 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1308 let err = request.check_schema(&metadata).unwrap_err();
1309 check_invalid_request(&err, "delete requests need column k0");
1310 let err = request.fill_missing_columns(&metadata).unwrap_err();
1311 check_invalid_request(&err, "delete requests need column k0");
1312
1313 let rows = Rows {
1314 schema: vec![
1315 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1316 new_column_schema(
1317 "ts",
1318 ColumnDataType::TimestampMillisecond,
1319 SemanticType::Timestamp,
1320 ),
1321 ],
1322 rows: vec![Row {
1323 values: vec![i64_value(100), ts_ms_value(1)],
1324 }],
1325 };
1326 let mut request =
1327 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1328 let err = request.check_schema(&metadata).unwrap_err();
1329 assert!(err.is_fill_default());
1330 request.fill_missing_columns(&metadata).unwrap();
1331
1332 let expect_rows = Rows {
1333 schema: vec![
1334 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1335 new_column_schema(
1336 "ts",
1337 ColumnDataType::TimestampMillisecond,
1338 SemanticType::Timestamp,
1339 ),
1340 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1341 ],
1342 rows: vec![Row {
1344 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1345 }],
1346 };
1347 assert_eq!(expect_rows, request.rows);
1348 }
1349
1350 #[test]
1351 fn test_fill_missing_without_default_in_delete() {
1352 let mut builder = builder_with_ts_tag();
1353 builder
1354 .push_column_metadata(ColumnMetadata {
1356 column_schema: datatypes::schema::ColumnSchema::new(
1357 "f0",
1358 ConcreteDataType::int64_datatype(),
1359 true,
1360 ),
1361 semantic_type: SemanticType::Field,
1362 column_id: 3,
1363 })
1364 .push_column_metadata(ColumnMetadata {
1366 column_schema: datatypes::schema::ColumnSchema::new(
1367 "f1",
1368 ConcreteDataType::int64_datatype(),
1369 false,
1370 ),
1371 semantic_type: SemanticType::Field,
1372 column_id: 4,
1373 });
1374 let metadata = builder.build().unwrap();
1375
1376 let rows = Rows {
1377 schema: vec![
1378 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1379 new_column_schema(
1380 "ts",
1381 ColumnDataType::TimestampMillisecond,
1382 SemanticType::Timestamp,
1383 ),
1384 ],
1385 rows: vec![Row {
1387 values: vec![i64_value(100), ts_ms_value(1)],
1388 }],
1389 };
1390 let mut request =
1391 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1392 let err = request.check_schema(&metadata).unwrap_err();
1393 assert!(err.is_fill_default());
1394 request.fill_missing_columns(&metadata).unwrap();
1395
1396 let expect_rows = Rows {
1397 schema: vec![
1398 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399 new_column_schema(
1400 "ts",
1401 ColumnDataType::TimestampMillisecond,
1402 SemanticType::Timestamp,
1403 ),
1404 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1405 ],
1406 rows: vec![Row {
1408 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1409 }],
1410 };
1411 assert_eq!(expect_rows, request.rows);
1412 }
1413
1414 #[test]
1415 fn test_no_default() {
1416 let rows = Rows {
1417 schema: vec![new_column_schema(
1418 "k0",
1419 ColumnDataType::Int64,
1420 SemanticType::Tag,
1421 )],
1422 rows: vec![Row {
1423 values: vec![i64_value(1)],
1424 }],
1425 };
1426 let metadata = new_region_metadata();
1427
1428 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1429 let err = request.fill_missing_columns(&metadata).unwrap_err();
1430 check_invalid_request(&err, "column ts does not have default value");
1431 }
1432
1433 #[test]
1434 fn test_missing_and_invalid() {
1435 let rows = Rows {
1437 schema: vec![
1438 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1439 new_column_schema(
1440 "ts",
1441 ColumnDataType::TimestampMillisecond,
1442 SemanticType::Timestamp,
1443 ),
1444 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1445 ],
1446 rows: vec![Row {
1447 values: vec![
1448 i64_value(100),
1449 ts_ms_value(1),
1450 Value {
1451 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1452 },
1453 ],
1454 }],
1455 };
1456 let metadata = region_metadata_two_fields();
1457
1458 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1459 let err = request.check_schema(&metadata).unwrap_err();
1460 check_invalid_request(
1461 &err,
1462 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1463 );
1464 }
1465
1466 #[test]
1467 fn test_write_request_metadata() {
1468 let rows = Rows {
1469 schema: vec![
1470 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1471 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1472 ],
1473 rows: vec![Row {
1474 values: vec![i64_value(1), i64_value(2)],
1475 }],
1476 };
1477
1478 let metadata = Arc::new(new_region_metadata());
1479 let request = WriteRequest::new(
1480 RegionId::new(1, 1),
1481 OpType::Put,
1482 rows,
1483 Some(metadata.clone()),
1484 )
1485 .unwrap();
1486
1487 assert!(request.region_metadata.is_some());
1488 assert_eq!(
1489 request.region_metadata.unwrap().region_id,
1490 RegionId::new(1, 1)
1491 );
1492 }
1493}