1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::error::{
41 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
42 EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
43 UnexpectedSnafu,
44};
45use crate::read::Batch;
46use crate::read::compat::CompatBatch;
47use crate::read::flat_projection::CompactionProjectionMapper;
48use crate::read::last_row::RowGroupLastRowCachedReader;
49use crate::read::prune::{FlatPruneReader, PruneReader};
50use crate::sst::file::FileHandle;
51use crate::sst::parquet::flat_format::{
52 DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
53};
54use crate::sst::parquet::format::ReadFormat;
55use crate::sst::parquet::reader::{
56 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
57};
58use crate::sst::parquet::row_group::ParquetFetchMetrics;
59use crate::sst::parquet::stats::RowGroupPruningStats;
60
61pub(crate) fn row_group_contains_delete(
66 parquet_meta: &ParquetMetaData,
67 row_group_index: usize,
68 file_path: &str,
69) -> Result<bool> {
70 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
71
72 let column_metadata = &row_group_metadata.columns().last().unwrap();
74 let stats = column_metadata
75 .statistics()
76 .context(StatsNotPresentSnafu { file_path })?;
77 stats
78 .min_bytes_opt()
79 .context(StatsNotPresentSnafu { file_path })?
80 .try_into()
81 .map(i32::from_le_bytes)
82 .map(|min_op_type| min_op_type == OpType::Delete as i32)
83 .ok()
84 .context(DecodeStatsSnafu { file_path })
85}
86
87#[derive(Clone)]
90pub struct FileRange {
91 context: FileRangeContextRef,
93 row_group_idx: usize,
95 row_selection: Option<RowSelection>,
97}
98
99impl FileRange {
100 pub(crate) fn new(
102 context: FileRangeContextRef,
103 row_group_idx: usize,
104 row_selection: Option<RowSelection>,
105 ) -> Self {
106 Self {
107 context,
108 row_group_idx,
109 row_selection,
110 }
111 }
112
113 fn select_all(&self) -> bool {
115 let rows_in_group = self
116 .context
117 .reader_builder
118 .parquet_metadata()
119 .row_group(self.row_group_idx)
120 .num_rows();
121
122 let Some(row_selection) = &self.row_selection else {
123 return true;
124 };
125 row_selection.row_count() == rows_in_group as usize
126 }
127
128 fn in_dynamic_filter_range(&self) -> bool {
133 if self.context.base.dyn_filters.is_empty() {
134 return true;
135 }
136 let curr_row_group = self
137 .context
138 .reader_builder
139 .parquet_metadata()
140 .row_group(self.row_group_idx);
141 let read_format = self.context.read_format();
142 let prune_schema = &self.context.base.prune_schema;
143 let stats = RowGroupPruningStats::new(
144 std::slice::from_ref(curr_row_group),
145 read_format,
146 self.context.base.expected_metadata.clone(),
147 self.compute_skip_fields(),
148 );
149
150 let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
152
153 pred.prune_with_stats(&stats, prune_schema.arrow_schema())
154 .first()
155 .cloned()
156 .unwrap_or(true) }
158
159 fn compute_skip_fields(&self) -> bool {
160 match self.context.base.pre_filter_mode {
161 PreFilterMode::All => false,
162 PreFilterMode::SkipFields => true,
163 PreFilterMode::SkipFieldsOnDelete => {
164 row_group_contains_delete(
166 self.context.reader_builder.parquet_metadata(),
167 self.row_group_idx,
168 self.context.reader_builder.file_path(),
169 )
170 .unwrap_or(true)
171 }
172 }
173 }
174
175 pub(crate) async fn reader(
177 &self,
178 selector: Option<TimeSeriesRowSelector>,
179 fetch_metrics: Option<&ParquetFetchMetrics>,
180 ) -> Result<Option<PruneReader>> {
181 if !self.in_dynamic_filter_range() {
182 return Ok(None);
183 }
184 let parquet_reader = self
185 .context
186 .reader_builder
187 .build(
188 self.row_group_idx,
189 self.row_selection.clone(),
190 fetch_metrics,
191 )
192 .await?;
193
194 let use_last_row_reader = if selector
195 .map(|s| s == TimeSeriesRowSelector::LastRow)
196 .unwrap_or(false)
197 {
198 let put_only = !self
201 .context
202 .contains_delete(self.row_group_idx)
203 .inspect_err(|e| {
204 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
205 })
206 .unwrap_or(true);
207 put_only && self.select_all()
208 } else {
209 false
211 };
212
213 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
215
216 let prune_reader = if use_last_row_reader {
217 let reader = RowGroupLastRowCachedReader::new(
219 self.file_handle().file_id().file_id(),
220 self.row_group_idx,
221 self.context.reader_builder.cache_strategy().clone(),
222 RowGroupReader::new(self.context.clone(), parquet_reader),
223 );
224 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
225 } else {
226 PruneReader::new_with_row_group_reader(
228 self.context.clone(),
229 RowGroupReader::new(self.context.clone(), parquet_reader),
230 skip_fields,
231 )
232 };
233
234 Ok(Some(prune_reader))
235 }
236
237 pub(crate) async fn flat_reader(
239 &self,
240 fetch_metrics: Option<&ParquetFetchMetrics>,
241 ) -> Result<Option<FlatPruneReader>> {
242 if !self.in_dynamic_filter_range() {
243 return Ok(None);
244 }
245 let parquet_reader = self
246 .context
247 .reader_builder
248 .build(
249 self.row_group_idx,
250 self.row_selection.clone(),
251 fetch_metrics,
252 )
253 .await?;
254
255 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
257
258 let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
259 let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
260 self.context.clone(),
261 flat_row_group_reader,
262 skip_fields,
263 );
264
265 Ok(Some(flat_prune_reader))
266 }
267
268 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
270 self.context.compat_batch()
271 }
272
273 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
275 self.context.compaction_projection_mapper()
276 }
277
278 pub(crate) fn file_handle(&self) -> &FileHandle {
280 self.context.reader_builder.file_handle()
281 }
282}
283
284pub(crate) struct FileRangeContext {
286 reader_builder: RowGroupReaderBuilder,
288 base: RangeBase,
290}
291
292pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
293
294impl FileRangeContext {
295 pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
297 Self {
298 reader_builder,
299 base,
300 }
301 }
302
303 pub(crate) fn file_path(&self) -> &str {
305 self.reader_builder.file_path()
306 }
307
308 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
310 &self.base.filters
311 }
312
313 pub(crate) fn has_partition_filter(&self) -> bool {
315 self.base.partition_filter.is_some()
316 }
317
318 pub(crate) fn read_format(&self) -> &ReadFormat {
320 &self.base.read_format
321 }
322
323 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
325 &self.reader_builder
326 }
327
328 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
330 self.base.compat_batch.as_ref()
331 }
332
333 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
335 self.base.compaction_projection_mapper.as_ref()
336 }
337
338 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
340 self.base.compat_batch = compat;
341 }
342
343 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
347 self.base.precise_filter(input, skip_fields)
348 }
349
350 pub(crate) fn precise_filter_flat(
353 &self,
354 input: RecordBatch,
355 skip_fields: bool,
356 ) -> Result<Option<RecordBatch>> {
357 self.base.precise_filter_flat(input, skip_fields)
358 }
359
360 pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
362 match self.base.pre_filter_mode {
363 PreFilterMode::All => false,
364 PreFilterMode::SkipFields => true,
365 PreFilterMode::SkipFieldsOnDelete => {
366 self.contains_delete(row_group_idx).unwrap_or(true)
368 }
369 }
370 }
371
372 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
374 let metadata = self.reader_builder.parquet_metadata();
375 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
376 }
377
378 pub(crate) fn memory_size(&self) -> usize {
381 crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
382 }
383}
384
385#[derive(Debug, Clone, Copy)]
387pub enum PreFilterMode {
388 All,
390 SkipFieldsOnDelete,
393 SkipFields,
395}
396
397pub(crate) struct PartitionFilterContext {
399 pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
400 pub(crate) partition_schema: Arc<Schema>,
403}
404
405pub(crate) struct RangeBase {
407 pub(crate) filters: Vec<SimpleFilterContext>,
409 pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
411 pub(crate) read_format: ReadFormat,
413 pub(crate) expected_metadata: Option<RegionMetadataRef>,
414 pub(crate) prune_schema: Arc<Schema>,
416 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
418 pub(crate) compat_batch: Option<CompatBatch>,
420 pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
422 pub(crate) pre_filter_mode: PreFilterMode,
424 pub(crate) partition_filter: Option<PartitionFilterContext>,
426}
427
428pub(crate) struct TagDecodeState {
429 decoded_pks: Option<DecodedPrimaryKeys>,
430 decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
431}
432
433impl TagDecodeState {
434 pub(crate) fn new() -> Self {
435 Self {
436 decoded_pks: None,
437 decoded_tag_cache: HashMap::new(),
438 }
439 }
440}
441
442impl RangeBase {
443 pub(crate) fn precise_filter(
455 &self,
456 mut input: Batch,
457 skip_fields: bool,
458 ) -> Result<Option<Batch>> {
459 let mut mask = BooleanBuffer::new_set(input.num_rows());
460
461 for filter_ctx in &self.filters {
464 let filter = match filter_ctx.filter() {
465 MaybeFilter::Filter(f) => f,
466 MaybeFilter::Matched => continue,
468 MaybeFilter::Pruned => return Ok(None),
470 };
471 let result = match filter_ctx.semantic_type() {
472 SemanticType::Tag => {
473 let pk_values = if let Some(pk_values) = input.pk_values() {
474 pk_values
475 } else {
476 input.set_pk_values(
477 self.codec
478 .decode(input.primary_key())
479 .context(DecodeSnafu)?,
480 );
481 input.pk_values().unwrap()
482 };
483 let pk_value = match pk_values {
484 CompositeValues::Dense(v) => {
485 let pk_index = self
487 .read_format
488 .metadata()
489 .primary_key_index(filter_ctx.column_id())
490 .unwrap();
491 v[pk_index]
492 .1
493 .try_to_scalar_value(filter_ctx.data_type())
494 .context(DataTypeMismatchSnafu)?
495 }
496 CompositeValues::Sparse(v) => {
497 let v = v.get_or_null(filter_ctx.column_id());
498 v.try_to_scalar_value(filter_ctx.data_type())
499 .context(DataTypeMismatchSnafu)?
500 }
501 };
502 if filter
503 .evaluate_scalar(&pk_value)
504 .context(RecordBatchSnafu)?
505 {
506 continue;
507 } else {
508 return Ok(None);
510 }
511 }
512 SemanticType::Field => {
513 if skip_fields {
515 continue;
516 }
517 let Some(field_index) = self
519 .read_format
520 .as_primary_key()
521 .unwrap()
522 .field_index_by_id(filter_ctx.column_id())
523 else {
524 continue;
525 };
526 let field_col = &input.fields()[field_index].data;
527 filter
528 .evaluate_vector(field_col)
529 .context(RecordBatchSnafu)?
530 }
531 SemanticType::Timestamp => filter
532 .evaluate_vector(input.timestamps())
533 .context(RecordBatchSnafu)?,
534 };
535
536 mask = mask.bitand(&result);
537 }
538
539 if mask.count_set_bits() == 0 {
540 return Ok(None);
541 }
542
543 if let Some(partition_filter) = &self.partition_filter {
545 let record_batch = self
546 .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
547 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
548 mask = mask.bitand(&partition_mask);
549 }
550
551 if mask.count_set_bits() == 0 {
552 Ok(None)
553 } else {
554 input.filter(&BooleanArray::from(mask).into())?;
555 Ok(Some(input))
556 }
557 }
558
559 pub(crate) fn precise_filter_flat(
567 &self,
568 input: RecordBatch,
569 skip_fields: bool,
570 ) -> Result<Option<RecordBatch>> {
571 let mut tag_decode_state = TagDecodeState::new();
572 let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
573
574 let Some(mut mask) = mask else {
576 return Ok(None);
577 };
578
579 if let Some(partition_filter) = &self.partition_filter {
581 let record_batch = self.project_record_batch_for_pruning_flat(
582 &input,
583 &partition_filter.partition_schema,
584 &mut tag_decode_state,
585 )?;
586 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
587 mask = mask.bitand(&partition_mask);
588 }
589
590 if mask.count_set_bits() == 0 {
591 return Ok(None);
592 }
593
594 let filtered_batch =
595 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
596 .context(ComputeArrowSnafu)?;
597
598 if filtered_batch.num_rows() > 0 {
599 Ok(Some(filtered_batch))
600 } else {
601 Ok(None)
602 }
603 }
604
605 pub(crate) fn compute_filter_mask_flat(
614 &self,
615 input: &RecordBatch,
616 skip_fields: bool,
617 tag_decode_state: &mut TagDecodeState,
618 ) -> Result<Option<BooleanBuffer>> {
619 let mut mask = BooleanBuffer::new_set(input.num_rows());
620
621 let flat_format = self
622 .read_format
623 .as_flat()
624 .context(crate::error::UnexpectedSnafu {
625 reason: "Expected flat format for precise_filter_flat",
626 })?;
627 let metadata = flat_format.metadata();
628
629 for filter_ctx in &self.filters {
631 let filter = match filter_ctx.filter() {
632 MaybeFilter::Filter(f) => f,
633 MaybeFilter::Matched => continue,
635 MaybeFilter::Pruned => return Ok(None),
637 };
638
639 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
641 continue;
642 }
643
644 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
648 if let Some(idx) = column_idx {
649 let column = &input.columns().get(idx).unwrap();
650 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
651 mask = mask.bitand(&result);
652 } else if filter_ctx.semantic_type() == SemanticType::Tag {
653 let column_id = filter_ctx.column_id();
655
656 if let Some(tag_column) =
657 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
658 {
659 let result = filter
660 .evaluate_array(&tag_column)
661 .context(RecordBatchSnafu)?;
662 mask = mask.bitand(&result);
663 }
664 } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
665 let time_index_pos = time_index_column_index(input.num_columns());
666 let column = &input.columns()[time_index_pos];
667 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
668 mask = mask.bitand(&result);
669 }
670 }
672
673 Ok(Some(mask))
674 }
675
676 fn maybe_decode_tag_column(
678 &self,
679 metadata: &RegionMetadataRef,
680 column_id: ColumnId,
681 input: &RecordBatch,
682 tag_decode_state: &mut TagDecodeState,
683 ) -> Result<Option<ArrayRef>> {
684 let Some(pk_index) = metadata.primary_key_index(column_id) else {
685 return Ok(None);
686 };
687
688 if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
689 return Ok(Some(cached_column.clone()));
690 }
691
692 if tag_decode_state.decoded_pks.is_none() {
693 tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
694 }
695
696 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
697 None
698 } else {
699 Some(pk_index)
700 };
701 let Some(column_index) = metadata.column_index_by_id(column_id) else {
702 return Ok(None);
703 };
704 let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
705 return Ok(None);
706 };
707
708 let column_metadata = &metadata.column_metadatas[column_index];
709 let tag_column = decoded.get_tag_column(
710 column_id,
711 pk_index,
712 &column_metadata.column_schema.data_type,
713 )?;
714 tag_decode_state
715 .decoded_tag_cache
716 .insert(column_id, tag_column.clone());
717
718 Ok(Some(tag_column))
719 }
720
721 fn evaluate_partition_filter(
723 &self,
724 record_batch: &RecordBatch,
725 partition_filter: &PartitionFilterContext,
726 ) -> Result<BooleanBuffer> {
727 let columnar_value = partition_filter
728 .region_partition_physical_expr
729 .evaluate(record_batch)
730 .context(EvalPartitionFilterSnafu)?;
731 let array = columnar_value
732 .into_array(record_batch.num_rows())
733 .context(EvalPartitionFilterSnafu)?;
734 let boolean_array =
735 array
736 .as_any()
737 .downcast_ref::<BooleanArray>()
738 .context(UnexpectedSnafu {
739 reason: "Failed to downcast to BooleanArray".to_string(),
740 })?;
741
742 Ok(boolean_array.values().clone())
743 }
744
745 fn build_record_batch_for_pruning(
750 &self,
751 input: &mut Batch,
752 schema: &Arc<Schema>,
753 ) -> Result<RecordBatch> {
754 let arrow_schema = schema.arrow_schema();
755 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
756
757 if input.pk_values().is_none() {
759 input.set_pk_values(
760 self.codec
761 .decode(input.primary_key())
762 .context(DecodeSnafu)?,
763 );
764 }
765
766 for field in arrow_schema.fields() {
767 let metadata = self.read_format.metadata();
768 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
769
770 let Some(column_id) = column_id else {
772 return UnexpectedSnafu {
773 reason: format!(
774 "Partition pruning schema expects column '{}' but it is missing in \
775 region metadata",
776 field.name()
777 ),
778 }
779 .fail();
780 };
781
782 if let Some(pk_index) = metadata.primary_key_index(column_id) {
784 let pk_values = input.pk_values().unwrap();
785 let value = match pk_values {
786 CompositeValues::Dense(v) => &v[pk_index].1,
787 CompositeValues::Sparse(v) => v.get_or_null(column_id),
788 };
789 let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
790 let arrow_scalar = value
791 .try_to_scalar_value(&concrete_type)
792 .context(DataTypeMismatchSnafu)?;
793 let array = arrow_scalar
794 .to_array_of_size(input.num_rows())
795 .context(EvalPartitionFilterSnafu)?;
796 columns.push(array);
797 } else if metadata.time_index_column().column_id == column_id {
798 columns.push(input.timestamps().to_arrow_array());
800 } else if let Some(field_index) = self
801 .read_format
802 .as_primary_key()
803 .and_then(|f| f.field_index_by_id(column_id))
804 {
805 columns.push(input.fields()[field_index].data.to_arrow_array());
807 } else {
808 return UnexpectedSnafu {
809 reason: format!(
810 "Partition pruning schema expects column '{}' (id {}) but it is not \
811 present in input batch",
812 field.name(),
813 column_id
814 ),
815 }
816 .fail();
817 }
818 }
819
820 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
821 }
822
823 fn project_record_batch_for_pruning_flat(
828 &self,
829 input: &RecordBatch,
830 schema: &Arc<Schema>,
831 tag_decode_state: &mut TagDecodeState,
832 ) -> Result<RecordBatch> {
833 let arrow_schema = schema.arrow_schema();
834 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
835
836 let flat_format = self
837 .read_format
838 .as_flat()
839 .context(crate::error::UnexpectedSnafu {
840 reason: "Expected flat format for precise_filter_flat",
841 })?;
842 let metadata = flat_format.metadata();
843
844 for field in arrow_schema.fields() {
845 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
846
847 let Some(column_id) = column_id else {
848 return UnexpectedSnafu {
849 reason: format!(
850 "Partition pruning schema expects column '{}' but it is missing in \
851 region metadata",
852 field.name()
853 ),
854 }
855 .fail();
856 };
857
858 if let Some(idx) = flat_format.projected_index_by_id(column_id) {
859 columns.push(input.column(idx).clone());
860 continue;
861 }
862
863 if metadata.time_index_column().column_id == column_id {
864 let time_index_pos = time_index_column_index(input.num_columns());
865 columns.push(input.column(time_index_pos).clone());
866 continue;
867 }
868
869 if let Some(tag_column) =
870 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
871 {
872 columns.push(tag_column);
873 continue;
874 }
875
876 return UnexpectedSnafu {
877 reason: format!(
878 "Partition pruning schema expects column '{}' (id {}) but it is not \
879 present in projected record batch",
880 field.name(),
881 column_id
882 ),
883 }
884 .fail();
885 }
886
887 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
888 }
889}