1use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_base::secrets::{ExposeSecret, SecretString};
23use common_error::ext::BoxedError;
24use common_telemetry::{debug, error, info};
25use object_store::layers::LoggingLayer;
26use object_store::services::Oss;
27use object_store::{services, ObjectStore};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30use tokio::sync::Semaphore;
31use tokio::time::Instant;
32
33use crate::database::{parse_proxy_opts, DatabaseClient};
34use crate::error::{
35 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
36 SchemaNotFoundSnafu,
37};
38use crate::{database, Tool};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44 Schema,
46 Data,
48 #[default]
50 All,
51}
52
53#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56 #[clap(long)]
58 addr: String,
59
60 #[clap(long)]
63 output_dir: Option<String>,
64
65 #[clap(long, default_value = "greptime-*")]
67 database: String,
68
69 #[clap(long, short = 'j', default_value = "1")]
71 export_jobs: usize,
72
73 #[clap(long, default_value = "3")]
75 max_retry: usize,
76
77 #[clap(long, short = 't', value_enum, default_value = "all")]
79 target: ExportTarget,
80
81 #[clap(long)]
84 start_time: Option<String>,
85
86 #[clap(long)]
89 end_time: Option<String>,
90
91 #[clap(long)]
93 auth_basic: Option<String>,
94
95 #[clap(long, value_parser = humantime::parse_duration)]
100 timeout: Option<Duration>,
101
102 #[clap(long)]
106 proxy: Option<String>,
107
108 #[clap(long)]
110 no_proxy: bool,
111
112 #[clap(long)]
114 s3: bool,
115
116 #[clap(long)]
124 ddl_local_dir: Option<String>,
125
126 #[clap(long)]
129 s3_bucket: Option<String>,
130
131 #[clap(long)]
134 s3_root: Option<String>,
135
136 #[clap(long)]
139 s3_endpoint: Option<String>,
140
141 #[clap(long)]
144 s3_access_key: Option<String>,
145
146 #[clap(long)]
149 s3_secret_key: Option<String>,
150
151 #[clap(long)]
154 s3_region: Option<String>,
155
156 #[clap(long)]
158 oss: bool,
159
160 #[clap(long)]
163 oss_bucket: Option<String>,
164
165 #[clap(long)]
168 oss_endpoint: Option<String>,
169
170 #[clap(long)]
173 oss_access_key_id: Option<String>,
174
175 #[clap(long)]
178 oss_access_key_secret: Option<String>,
179}
180
181impl ExportCommand {
182 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
183 if self.s3
184 && (self.s3_bucket.is_none()
185 || self.s3_endpoint.is_none()
186 || self.s3_access_key.is_none()
187 || self.s3_secret_key.is_none()
188 || self.s3_region.is_none())
189 {
190 return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
191 }
192 if !self.s3 && !self.oss && self.output_dir.is_none() {
193 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
194 }
195 let (catalog, schema) =
196 database::split_database(&self.database).map_err(BoxedError::new)?;
197 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
198 let database_client = DatabaseClient::new(
199 self.addr.clone(),
200 catalog.clone(),
201 self.auth_basic.clone(),
202 self.timeout.unwrap_or_default(),
204 proxy,
205 );
206
207 Ok(Box::new(Export {
208 catalog,
209 schema,
210 database_client,
211 output_dir: self.output_dir.clone(),
212 parallelism: self.export_jobs,
213 target: self.target.clone(),
214 start_time: self.start_time.clone(),
215 end_time: self.end_time.clone(),
216 s3: self.s3,
217 ddl_local_dir: self.ddl_local_dir.clone(),
218 s3_bucket: self.s3_bucket.clone(),
219 s3_root: self.s3_root.clone(),
220 s3_endpoint: self.s3_endpoint.clone(),
221 s3_access_key: self
223 .s3_access_key
224 .as_ref()
225 .map(|k| SecretString::from(k.clone())),
226 s3_secret_key: self
227 .s3_secret_key
228 .as_ref()
229 .map(|k| SecretString::from(k.clone())),
230 s3_region: self.s3_region.clone(),
231 oss: self.oss,
232 oss_bucket: self.oss_bucket.clone(),
233 oss_endpoint: self.oss_endpoint.clone(),
234 oss_access_key_id: self
236 .oss_access_key_id
237 .as_ref()
238 .map(|k| SecretString::from(k.clone())),
239 oss_access_key_secret: self
240 .oss_access_key_secret
241 .as_ref()
242 .map(|k| SecretString::from(k.clone())),
243 }))
244 }
245}
246
247#[derive(Clone)]
248pub struct Export {
249 catalog: String,
250 schema: Option<String>,
251 database_client: DatabaseClient,
252 output_dir: Option<String>,
253 parallelism: usize,
254 target: ExportTarget,
255 start_time: Option<String>,
256 end_time: Option<String>,
257 s3: bool,
258 ddl_local_dir: Option<String>,
259 s3_bucket: Option<String>,
260 s3_root: Option<String>,
261 s3_endpoint: Option<String>,
262 s3_access_key: Option<SecretString>,
264 s3_secret_key: Option<SecretString>,
265 s3_region: Option<String>,
266 oss: bool,
267 oss_bucket: Option<String>,
268 oss_endpoint: Option<String>,
269 oss_access_key_id: Option<SecretString>,
271 oss_access_key_secret: Option<SecretString>,
272}
273
274impl Export {
275 fn catalog_path(&self) -> PathBuf {
276 if self.s3 || self.oss {
277 PathBuf::from(&self.catalog)
278 } else if let Some(dir) = &self.output_dir {
279 PathBuf::from(dir).join(&self.catalog)
280 } else {
281 unreachable!("catalog_path: output_dir must be set when not using remote storage")
282 }
283 }
284
285 async fn get_db_names(&self) -> Result<Vec<String>> {
286 let db_names = self.all_db_names().await?;
287 let Some(schema) = &self.schema else {
288 return Ok(db_names);
289 };
290
291 db_names
293 .into_iter()
294 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
295 .map(|name| vec![name])
296 .context(SchemaNotFoundSnafu {
297 catalog: &self.catalog,
298 schema,
299 })
300 }
301
302 async fn all_db_names(&self) -> Result<Vec<String>> {
304 let records = self
305 .database_client
306 .sql_in_public("SHOW DATABASES")
307 .await?
308 .context(EmptyResultSnafu)?;
309 let mut result = Vec::with_capacity(records.len());
310 for value in records {
311 let Value::String(schema) = &value[0] else {
312 unreachable!()
313 };
314 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
315 continue;
316 }
317 if schema == common_catalog::consts::PG_CATALOG_NAME {
318 continue;
319 }
320 result.push(schema.clone());
321 }
322 Ok(result)
323 }
324
325 async fn get_table_list(
328 &self,
329 catalog: &str,
330 schema: &str,
331 ) -> Result<(
332 Vec<TableReference>,
333 Vec<TableReference>,
334 Vec<TableReference>,
335 )> {
336 let sql = format!(
338 "SELECT table_catalog, table_schema, table_name \
339 FROM information_schema.columns \
340 WHERE column_name = '__tsid' \
341 and table_catalog = \'{catalog}\' \
342 and table_schema = \'{schema}\'"
343 );
344 let records = self
345 .database_client
346 .sql_in_public(&sql)
347 .await?
348 .context(EmptyResultSnafu)?;
349 let mut metric_physical_tables = HashSet::with_capacity(records.len());
350 for value in records {
351 let mut t = Vec::with_capacity(3);
352 for v in &value {
353 let Value::String(value) = v else {
354 unreachable!()
355 };
356 t.push(value);
357 }
358 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
359 }
360
361 let sql = format!(
362 "SELECT table_catalog, table_schema, table_name, table_type \
363 FROM information_schema.tables \
364 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
365 and table_catalog = \'{catalog}\' \
366 and table_schema = \'{schema}\'",
367 );
368 let records = self
369 .database_client
370 .sql_in_public(&sql)
371 .await?
372 .context(EmptyResultSnafu)?;
373
374 debug!("Fetched table/view list: {:?}", records);
375
376 if records.is_empty() {
377 return Ok((vec![], vec![], vec![]));
378 }
379
380 let mut remaining_tables = Vec::with_capacity(records.len());
381 let mut views = Vec::new();
382 for value in records {
383 let mut t = Vec::with_capacity(4);
384 for v in &value {
385 let Value::String(value) = v else {
386 unreachable!()
387 };
388 t.push(value);
389 }
390 let table = (t[0].clone(), t[1].clone(), t[2].clone());
391 let table_type = t[3].as_str();
392 if !metric_physical_tables.contains(&table) {
394 if table_type == "VIEW" {
395 views.push(table);
396 } else {
397 remaining_tables.push(table);
398 }
399 }
400 }
401
402 Ok((
403 metric_physical_tables.into_iter().collect(),
404 remaining_tables,
405 views,
406 ))
407 }
408
409 async fn show_create(
410 &self,
411 show_type: &str,
412 catalog: &str,
413 schema: &str,
414 table: Option<&str>,
415 ) -> Result<String> {
416 let sql = match table {
417 Some(table) => format!(
418 r#"SHOW CREATE {} "{}"."{}"."{}""#,
419 show_type, catalog, schema, table
420 ),
421 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
422 };
423 let records = self
424 .database_client
425 .sql_in_public(&sql)
426 .await?
427 .context(EmptyResultSnafu)?;
428 let Value::String(create) = &records[0][1] else {
429 unreachable!()
430 };
431
432 Ok(format!("{};\n", create))
433 }
434
435 async fn export_create_database(&self) -> Result<()> {
436 let timer = Instant::now();
437 let db_names = self.get_db_names().await?;
438 let db_count = db_names.len();
439 let operator = self.build_prefer_fs_operator().await?;
440
441 for schema in db_names {
442 let create_database = self
443 .show_create("DATABASE", &self.catalog, &schema, None)
444 .await?;
445
446 let file_path = self.get_file_path(&schema, "create_database.sql");
447 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
448 .await?;
449
450 info!(
451 "Exported {}.{} database creation SQL to {}",
452 self.catalog,
453 schema,
454 self.format_output_path(&file_path)
455 );
456 }
457
458 let elapsed = timer.elapsed();
459 info!("Success {db_count} jobs, cost: {elapsed:?}");
460
461 Ok(())
462 }
463
464 async fn export_create_table(&self) -> Result<()> {
465 let timer = Instant::now();
466 let semaphore = Arc::new(Semaphore::new(self.parallelism));
467 let db_names = self.get_db_names().await?;
468 let db_count = db_names.len();
469 let operator = Arc::new(self.build_prefer_fs_operator().await?);
470 let mut tasks = Vec::with_capacity(db_names.len());
471
472 for schema in db_names {
473 let semaphore_moved = semaphore.clone();
474 let export_self = self.clone();
475 let operator = operator.clone();
476 tasks.push(async move {
477 let _permit = semaphore_moved.acquire().await.unwrap();
478 let (metric_physical_tables, remaining_tables, views) = export_self
479 .get_table_list(&export_self.catalog, &schema)
480 .await?;
481
482 if !export_self.s3 && !export_self.oss {
484 let db_dir = format!("{}/{}/", export_self.catalog, schema);
485 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
486 }
487
488 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
489 let mut content = Vec::new();
490
491 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
493 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
494 content.extend_from_slice(create_table.as_bytes());
495 }
496
497 for (c, s, v) in &views {
499 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
500 content.extend_from_slice(create_view.as_bytes());
501 }
502
503 export_self
505 .write_to_storage(&operator, &file_path, content)
506 .await?;
507
508 info!(
509 "Finished exporting {}.{schema} with {} table schemas to path: {}",
510 export_self.catalog,
511 metric_physical_tables.len() + remaining_tables.len() + views.len(),
512 export_self.format_output_path(&file_path)
513 );
514
515 Ok::<(), Error>(())
516 });
517 }
518
519 let success = self.execute_tasks(tasks).await;
520 let elapsed = timer.elapsed();
521 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
522
523 Ok(())
524 }
525
526 async fn build_operator(&self) -> Result<ObjectStore> {
527 if self.s3 {
528 self.build_s3_operator().await
529 } else if self.oss {
530 self.build_oss_operator().await
531 } else {
532 self.build_fs_operator().await
533 }
534 }
535
536 async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
538 if (self.s3 || self.oss) && self.ddl_local_dir.is_some() {
539 let root = self.ddl_local_dir.as_ref().unwrap().clone();
540 let op = ObjectStore::new(services::Fs::default().root(&root))
541 .context(OpenDalSnafu)?
542 .layer(LoggingLayer::default())
543 .finish();
544 Ok(op)
545 } else if self.s3 {
546 self.build_s3_operator().await
547 } else if self.oss {
548 self.build_oss_operator().await
549 } else {
550 self.build_fs_operator().await
551 }
552 }
553
554 async fn build_s3_operator(&self) -> Result<ObjectStore> {
555 let mut builder = services::S3::default().bucket(
556 self.s3_bucket
557 .as_ref()
558 .expect("s3_bucket must be provided when s3 is enabled"),
559 );
560
561 if let Some(root) = self.s3_root.as_ref() {
562 builder = builder.root(root);
563 }
564
565 if let Some(endpoint) = self.s3_endpoint.as_ref() {
566 builder = builder.endpoint(endpoint);
567 }
568
569 if let Some(region) = self.s3_region.as_ref() {
570 builder = builder.region(region);
571 }
572
573 if let Some(key_id) = self.s3_access_key.as_ref() {
574 builder = builder.access_key_id(key_id.expose_secret());
575 }
576
577 if let Some(secret_key) = self.s3_secret_key.as_ref() {
578 builder = builder.secret_access_key(secret_key.expose_secret());
579 }
580
581 let op = ObjectStore::new(builder)
582 .context(OpenDalSnafu)?
583 .layer(LoggingLayer::default())
584 .finish();
585 Ok(op)
586 }
587
588 async fn build_oss_operator(&self) -> Result<ObjectStore> {
589 let mut builder = Oss::default()
590 .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set"))
591 .endpoint(
592 self.oss_endpoint
593 .as_ref()
594 .expect("oss_endpoint must be set"),
595 );
596
597 if let Some(key_id) = self.oss_access_key_id.as_ref() {
599 builder = builder.access_key_id(key_id.expose_secret());
600 }
601 if let Some(secret_key) = self.oss_access_key_secret.as_ref() {
602 builder = builder.access_key_secret(secret_key.expose_secret());
603 }
604
605 let op = ObjectStore::new(builder)
606 .context(OpenDalSnafu)?
607 .layer(LoggingLayer::default())
608 .finish();
609 Ok(op)
610 }
611
612 async fn build_fs_operator(&self) -> Result<ObjectStore> {
613 let root = self
614 .output_dir
615 .as_ref()
616 .context(OutputDirNotSetSnafu)?
617 .clone();
618 let op = ObjectStore::new(services::Fs::default().root(&root))
619 .context(OpenDalSnafu)?
620 .layer(LoggingLayer::default())
621 .finish();
622 Ok(op)
623 }
624
625 async fn export_database_data(&self) -> Result<()> {
626 let timer = Instant::now();
627 let semaphore = Arc::new(Semaphore::new(self.parallelism));
628 let db_names = self.get_db_names().await?;
629 let db_count = db_names.len();
630 let mut tasks = Vec::with_capacity(db_count);
631 let operator = Arc::new(self.build_operator().await?);
632 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
633 let with_options = build_with_options(&self.start_time, &self.end_time);
634
635 for schema in db_names {
636 let semaphore_moved = semaphore.clone();
637 let export_self = self.clone();
638 let with_options_clone = with_options.clone();
639 let operator = operator.clone();
640 let fs_first_operator = fs_first_operator.clone();
641
642 tasks.push(async move {
643 let _permit = semaphore_moved.acquire().await.unwrap();
644
645 if !export_self.s3 && !export_self.oss {
647 let db_dir = format!("{}/{}/", export_self.catalog, schema);
648 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
649 }
650
651 let (path, connection_part) = export_self.get_storage_params(&schema);
652
653 let sql = format!(
655 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
656 export_self.catalog, schema, path, with_options_clone, connection_part
657 );
658
659 let safe_sql = export_self.mask_sensitive_sql(&sql);
661 info!("Executing sql: {}", safe_sql);
662
663 export_self.database_client.sql_in_public(&sql).await?;
664 info!(
665 "Finished exporting {}.{} data to {}",
666 export_self.catalog, schema, path
667 );
668
669 let copy_database_from_sql = format!(
671 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
672 export_self.catalog, schema, path, with_options_clone, connection_part
673 );
674
675 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
676 export_self
677 .write_to_storage(
678 &fs_first_operator,
679 ©_from_path,
680 copy_database_from_sql.into_bytes(),
681 )
682 .await?;
683
684 info!(
685 "Finished exporting {}.{} copy_from.sql to {}",
686 export_self.catalog,
687 schema,
688 export_self.format_output_path(©_from_path)
689 );
690
691 Ok::<(), Error>(())
692 });
693 }
694
695 let success = self.execute_tasks(tasks).await;
696 let elapsed = timer.elapsed();
697 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
698
699 Ok(())
700 }
701
702 fn mask_sensitive_sql(&self, sql: &str) -> String {
704 let mut masked_sql = sql.to_string();
705
706 if let Some(access_key) = &self.s3_access_key {
708 masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]");
709 }
710 if let Some(secret_key) = &self.s3_secret_key {
711 masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]");
712 }
713
714 if let Some(access_key_id) = &self.oss_access_key_id {
716 masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]");
717 }
718 if let Some(access_key_secret) = &self.oss_access_key_secret {
719 masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]");
720 }
721
722 masked_sql
723 }
724
725 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
726 format!("{}/{}/{}", self.catalog, schema, file_name)
727 }
728
729 fn format_output_path(&self, file_path: &str) -> String {
730 if self.s3 {
731 format!(
732 "s3://{}{}/{}",
733 self.s3_bucket.as_ref().unwrap_or(&String::new()),
734 if let Some(root) = &self.s3_root {
735 format!("/{}", root)
736 } else {
737 String::new()
738 },
739 file_path
740 )
741 } else if self.oss {
742 format!(
743 "oss://{}/{}/{}",
744 self.oss_bucket.as_ref().unwrap_or(&String::new()),
745 self.catalog,
746 file_path
747 )
748 } else {
749 format!(
750 "{}/{}",
751 self.output_dir.as_ref().unwrap_or(&String::new()),
752 file_path
753 )
754 }
755 }
756
757 async fn write_to_storage(
758 &self,
759 op: &ObjectStore,
760 file_path: &str,
761 content: Vec<u8>,
762 ) -> Result<()> {
763 op.write(file_path, content)
764 .await
765 .context(OpenDalSnafu)
766 .map(|_| ())
767 }
768
769 fn get_storage_params(&self, schema: &str) -> (String, String) {
770 if self.s3 {
771 let s3_path = format!(
772 "s3://{}{}/{}/{}/",
773 self.s3_bucket.as_ref().unwrap(),
775 if let Some(root) = &self.s3_root {
776 format!("/{}", root)
777 } else {
778 String::new()
779 },
780 self.catalog,
781 schema
782 );
783
784 let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
786 format!(", ENDPOINT='{}'", endpoint)
787 } else {
788 String::new()
789 };
790
791 let connection_options = format!(
794 "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
795 self.s3_access_key.as_ref().unwrap().expose_secret(),
796 self.s3_secret_key.as_ref().unwrap().expose_secret(),
797 self.s3_region.as_ref().unwrap(),
798 endpoint_option
799 );
800
801 (s3_path, format!(" CONNECTION ({})", connection_options))
802 } else if self.oss {
803 let oss_path = format!(
804 "oss://{}/{}/{}/",
805 self.oss_bucket.as_ref().unwrap(),
806 self.catalog,
807 schema
808 );
809 let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() {
810 format!(", ENDPOINT='{}'", endpoint)
811 } else {
812 String::new()
813 };
814
815 let connection_options = format!(
816 "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}",
817 self.oss_access_key_id.as_ref().unwrap().expose_secret(),
818 self.oss_access_key_secret.as_ref().unwrap().expose_secret(),
819 endpoint_option
820 );
821 (oss_path, format!(" CONNECTION ({})", connection_options))
822 } else {
823 (
824 self.catalog_path()
825 .join(format!("{schema}/"))
826 .to_string_lossy()
827 .to_string(),
828 String::new(),
829 )
830 }
831 }
832
833 async fn execute_tasks(
834 &self,
835 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
836 ) -> usize {
837 futures::future::join_all(tasks)
838 .await
839 .into_iter()
840 .filter(|r| match r {
841 Ok(_) => true,
842 Err(e) => {
843 error!(e; "export job failed");
844 false
845 }
846 })
847 .count()
848 }
849}
850
851#[async_trait]
852impl Tool for Export {
853 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
854 match self.target {
855 ExportTarget::Schema => {
856 self.export_create_database()
857 .await
858 .map_err(BoxedError::new)?;
859 self.export_create_table().await.map_err(BoxedError::new)
860 }
861 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
862 ExportTarget::All => {
863 self.export_create_database()
864 .await
865 .map_err(BoxedError::new)?;
866 self.export_create_table().await.map_err(BoxedError::new)?;
867 self.export_database_data().await.map_err(BoxedError::new)
868 }
869 }
870 }
871}
872
873fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
875 let mut options = vec!["format = 'parquet'".to_string()];
876 if let Some(start) = start_time {
877 options.push(format!("start_time = '{}'", start));
878 }
879 if let Some(end) = end_time {
880 options.push(format!("end_time = '{}'", end));
881 }
882 options.join(", ")
883}