1use std::collections::BTreeMap;
18use std::time::Duration;
19
20use common_telemetry::debug;
21use common_telemetry::tracing::warn;
22use common_time::Timestamp;
23use datatypes::value::Value;
24use session::context::QueryContextRef;
25use snafu::{OptionExt, ResultExt, ensure};
26use tokio::sync::oneshot;
27use tokio::time::Instant;
28
29use crate::batching_mode::task::BatchingTask;
30use crate::batching_mode::time_window::TimeWindowExpr;
31use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
32use crate::metrics::{
33 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
34 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
35};
36use crate::{Error, FlowId};
37
38#[derive(Debug)]
40pub struct TaskState {
41 pub(crate) query_ctx: QueryContextRef,
43 last_update_time: Instant,
45 last_query_duration: Duration,
47 pub(crate) dirty_time_windows: DirtyTimeWindows,
50 exec_state: ExecState,
51 pub(crate) shutdown_rx: oneshot::Receiver<()>,
53 pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
55}
56impl TaskState {
57 pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
58 Self {
59 query_ctx,
60 last_update_time: Instant::now(),
61 last_query_duration: Duration::from_secs(0),
62 dirty_time_windows: Default::default(),
63 exec_state: ExecState::Idle,
64 shutdown_rx,
65 task_handle: None,
66 }
67 }
68
69 pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
72 self.exec_state = ExecState::Idle;
73 self.last_query_duration = elapsed;
74 self.last_update_time = Instant::now();
75 }
76
77 pub fn get_next_start_query_time(
88 &self,
89 flow_id: FlowId,
90 time_window_size: &Option<Duration>,
91 min_refresh_duration: Duration,
92 max_timeout: Option<Duration>,
93 max_filter_num_per_query: usize,
94 ) -> Instant {
95 let lower = time_window_size.unwrap_or(min_refresh_duration);
97 let next_duration = self.last_query_duration.max(lower);
98 let next_duration = if let Some(max_timeout) = max_timeout {
99 next_duration.min(max_timeout)
100 } else {
101 next_duration
102 };
103
104 let cur_dirty_window_size = self.dirty_time_windows.window_size();
105 let max_query_update_range = (*time_window_size)
107 .unwrap_or_default()
108 .mul_f64(max_filter_num_per_query as f64);
109 if cur_dirty_window_size < max_query_update_range {
112 self.last_update_time + next_duration
113 } else {
114 debug!(
117 "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
118 flow_id,
119 self.dirty_time_windows.windows.len(),
120 self.dirty_time_windows.windows
121 );
122 Instant::now()
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
130pub struct DirtyTimeWindows {
131 windows: BTreeMap<Timestamp, Option<Timestamp>>,
134 max_filter_num_per_query: usize,
136 time_window_merge_threshold: usize,
139}
140
141impl DirtyTimeWindows {
142 pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
143 Self {
144 windows: BTreeMap::new(),
145 max_filter_num_per_query,
146 time_window_merge_threshold,
147 }
148 }
149}
150
151impl Default for DirtyTimeWindows {
152 fn default() -> Self {
153 Self {
154 windows: BTreeMap::new(),
155 max_filter_num_per_query: 20,
156 time_window_merge_threshold: 3,
157 }
158 }
159}
160
161impl DirtyTimeWindows {
162 pub const MERGE_DIST: i32 = 3;
166
167 pub fn add_lower_bounds(&mut self, lower_bounds: impl Iterator<Item = Timestamp>) {
173 for lower_bound in lower_bounds {
174 let entry = self.windows.entry(lower_bound);
175 entry.or_insert(None);
176 }
177 }
178
179 pub fn window_size(&self) -> Duration {
180 let mut ret = Duration::from_secs(0);
181 for (start, end) in &self.windows {
182 if let Some(end) = end
183 && let Some(duration) = end.sub(start)
184 {
185 ret += duration.to_std().unwrap_or_default();
186 }
187 }
188 ret
189 }
190
191 pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
192 self.windows.insert(start, end);
193 }
194
195 pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
196 for (start, end) in time_ranges {
197 self.windows.insert(start, Some(end));
198 }
199 }
200
201 pub fn clean(&mut self) {
203 self.windows.clear();
204 }
205
206 pub fn set_dirty(&mut self) {
209 self.windows.insert(Timestamp::new_second(0), None);
210 }
211
212 pub fn len(&self) -> usize {
214 self.windows.len()
215 }
216
217 pub fn is_empty(&self) -> bool {
218 self.windows.is_empty()
219 }
220
221 pub fn effective_count(&self, window_size: &Duration) -> usize {
224 if self.windows.is_empty() {
225 return 0;
226 }
227 let window_size =
228 chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
229 let total_window_time_range =
230 self.windows
231 .iter()
232 .fold(chrono::Duration::zero(), |acc, (start, end)| {
233 if let Some(end) = end {
234 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
235 } else {
236 acc + window_size
237 }
238 });
239
240 if window_size.num_seconds() == 0 {
242 0
243 } else {
244 (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
245 }
246 }
247
248 pub fn gen_filter_exprs(
254 &mut self,
255 col_name: &str,
256 expire_lower_bound: Option<Timestamp>,
257 window_size: chrono::Duration,
258 window_cnt: usize,
259 flow_id: FlowId,
260 task_ctx: Option<&BatchingTask>,
261 ) -> Result<Option<FilterExprInfo>, Error> {
262 ensure!(
263 window_size.num_seconds() > 0,
264 UnexpectedSnafu {
265 reason: "window_size is zero, can't generate filter exprs",
266 }
267 );
268
269 debug!(
270 "expire_lower_bound: {:?}, window_size: {:?}",
271 expire_lower_bound.map(|t| t.to_iso8601_string()),
272 window_size
273 );
274 self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
275
276 if self.windows.len() > self.max_filter_num_per_query {
277 let first_time_window = self.windows.first_key_value();
278 let last_time_window = self.windows.last_key_value();
279
280 if let Some(task_ctx) = task_ctx {
281 warn!(
282 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
283 task_ctx.config.flow_id,
284 self.windows.len(),
285 self.max_filter_num_per_query,
286 task_ctx.config.time_window_expr,
287 task_ctx.config.expire_after,
288 first_time_window,
289 last_time_window,
290 task_ctx.config.query
291 );
292 } else {
293 warn!(
294 "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
295 flow_id,
296 self.windows.len(),
297 self.max_filter_num_per_query,
298 first_time_window,
299 last_time_window
300 )
301 }
302 }
303
304 let max_time_range = window_size * window_cnt as i32;
306
307 let mut to_be_query = BTreeMap::new();
308 let mut new_windows = self.windows.clone();
309 let mut cur_time_range = chrono::Duration::zero();
310 for (idx, (start, end)) in self.windows.iter().enumerate() {
311 let first_end = start
312 .add_duration(window_size.to_std().unwrap())
313 .context(TimeSnafu)?;
314 let end = end.unwrap_or(first_end);
315
316 if cur_time_range >= max_time_range {
318 break;
319 }
320
321 if idx >= window_cnt {
323 break;
324 }
325
326 let Some(x) = end.sub(start) else {
327 continue;
328 };
329 if cur_time_range + x <= max_time_range {
330 to_be_query.insert(*start, Some(end));
331 new_windows.remove(start);
332 cur_time_range += x;
333 } else {
334 let surplus = max_time_range - cur_time_range;
337 if surplus.num_seconds() <= window_size.num_seconds() {
338 break;
340 }
341 let times = surplus.num_seconds() / window_size.num_seconds();
342
343 let split_offset = window_size * times as i32;
344 let split_at = start
345 .add_duration(split_offset.to_std().unwrap())
346 .context(TimeSnafu)?;
347 to_be_query.insert(*start, Some(split_at));
348
349 new_windows.remove(start);
351 new_windows.insert(split_at, Some(end));
352 cur_time_range += split_offset;
353 break;
354 }
355 }
356
357 self.windows = new_windows;
358
359 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
360 .with_label_values(&[flow_id.to_string().as_str()])
361 .observe(to_be_query.len() as f64);
362
363 let full_time_range = to_be_query
364 .iter()
365 .fold(chrono::Duration::zero(), |acc, (start, end)| {
366 if let Some(end) = end {
367 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
368 } else {
369 acc + window_size
370 }
371 })
372 .num_seconds() as f64;
373 METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
374 .with_label_values(&[flow_id.to_string().as_str()])
375 .observe(full_time_range);
376
377 let stalled_time_range =
378 self.windows
379 .iter()
380 .fold(chrono::Duration::zero(), |acc, (start, end)| {
381 if let Some(end) = end {
382 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
383 } else {
384 acc + window_size
385 }
386 });
387
388 METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
389 .with_label_values(&[flow_id.to_string().as_str()])
390 .observe(stalled_time_range.num_seconds() as f64);
391
392 let std_window_size = window_size.to_std().map_err(|e| {
393 InternalSnafu {
394 reason: e.to_string(),
395 }
396 .build()
397 })?;
398
399 let mut expr_lst = vec![];
400 let mut time_ranges = vec![];
401 for (start, end) in to_be_query.into_iter() {
402 let (start, end) = if let Some(ctx) = task_ctx {
404 let Some(time_window_expr) = &ctx.config.time_window_expr else {
405 UnexpectedSnafu {
406 reason: "time_window_expr is not set",
407 }
408 .fail()?
409 };
410 self.align_time_window(start, end, time_window_expr)?
411 } else {
412 (start, end)
413 };
414 let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
415 time_ranges.push((start, end));
416
417 debug!(
418 "Time window start: {:?}, end: {:?}",
419 start.to_iso8601_string(),
420 end.to_iso8601_string()
421 );
422
423 use datafusion_expr::{col, lit};
424 let lower = to_df_literal(start)?;
425 let upper = to_df_literal(end)?;
426 let expr = col(col_name)
427 .gt_eq(lit(lower))
428 .and(col(col_name).lt(lit(upper)));
429 expr_lst.push(expr);
430 }
431 let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
432 let ret = expr.map(|expr| FilterExprInfo {
433 expr,
434 col_name: col_name.to_string(),
435 time_ranges,
436 window_size,
437 });
438 Ok(ret)
439 }
440
441 fn align_time_window(
442 &self,
443 start: Timestamp,
444 end: Option<Timestamp>,
445 time_window_expr: &TimeWindowExpr,
446 ) -> Result<(Timestamp, Option<Timestamp>), Error> {
447 let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
448 reason: format!(
449 "Failed to align start time {:?} with time window expr {:?}",
450 start, time_window_expr
451 ),
452 })?;
453 let align_end = end
454 .and_then(|end| {
455 time_window_expr
456 .eval(end)
457 .map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
459 .transpose()
460 })
461 .transpose()?;
462 Ok((align_start, align_end))
463 }
464
465 pub fn merge_dirty_time_windows(
469 &mut self,
470 window_size: chrono::Duration,
471 expire_lower_bound: Option<Timestamp>,
472 ) -> Result<(), Error> {
473 if self.windows.is_empty() {
474 return Ok(());
475 }
476
477 let mut new_windows = BTreeMap::new();
478
479 let std_window_size = window_size.to_std().map_err(|e| {
480 InternalSnafu {
481 reason: e.to_string(),
482 }
483 .build()
484 })?;
485
486 let mut prev_tw = None;
488 for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
489 if let Some(expire_lower_bound) = expire_lower_bound
491 && lower_bound < expire_lower_bound
492 {
493 continue;
494 }
495
496 let Some(prev_tw) = &mut prev_tw else {
497 prev_tw = Some((lower_bound, upper_bound));
498 continue;
499 };
500
501 let prev_upper = prev_tw
504 .1
505 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
506 prev_tw.1 = Some(prev_upper);
507
508 let cur_upper = upper_bound.unwrap_or(
509 lower_bound
510 .add_duration(std_window_size)
511 .context(TimeSnafu)?,
512 );
513
514 if lower_bound
515 .sub(&prev_upper)
516 .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
517 .unwrap_or(false)
518 {
519 prev_tw.1 = Some(cur_upper);
520 } else {
521 new_windows.insert(prev_tw.0, prev_tw.1);
522 *prev_tw = (lower_bound, Some(cur_upper));
523 }
524 }
525
526 if let Some(prev_tw) = prev_tw {
527 new_windows.insert(prev_tw.0, prev_tw.1);
528 }
529
530 self.windows = new_windows;
531
532 Ok(())
533 }
534}
535
536fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Error> {
537 let value = Value::from(value);
538 let value = value
539 .try_to_scalar_value(&value.data_type())
540 .with_context(|_| DatatypesSnafu {
541 extra: format!("Failed to convert to scalar value: {}", value),
542 })?;
543 Ok(value)
544}
545
546#[derive(Debug, Clone)]
547enum ExecState {
548 Idle,
549 Executing,
550}
551
552#[derive(Debug, Clone)]
554pub struct FilterExprInfo {
555 pub expr: datafusion_expr::Expr,
556 pub col_name: String,
557 pub time_ranges: Vec<(Timestamp, Timestamp)>,
558 pub window_size: chrono::Duration,
559}
560
561impl FilterExprInfo {
562 pub fn total_window_length(&self) -> chrono::Duration {
563 self.time_ranges
564 .iter()
565 .fold(chrono::Duration::zero(), |acc, (start, end)| {
566 acc + end.sub(start).unwrap_or(chrono::Duration::zero())
567 })
568 }
569}
570
571#[cfg(test)]
572mod test {
573 use pretty_assertions::assert_eq;
574 use session::context::QueryContext;
575
576 use super::*;
577 use crate::batching_mode::time_window::find_time_window_expr;
578 use crate::batching_mode::utils::sql_to_df_plan;
579 use crate::test_utils::create_test_query_engine;
580
581 #[test]
582 fn test_merge_dirty_time_windows() {
583 let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
584 let testcases = vec![
585 (
587 vec![
588 Timestamp::new_second(0),
589 Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
590 ],
591 (chrono::Duration::seconds(5 * 60), None),
592 BTreeMap::from([(
593 Timestamp::new_second(0),
594 Some(Timestamp::new_second((2 + merge_dist as i64) * 5 * 60)),
595 )]),
596 Some(
597 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:25:00' AS TIMESTAMP)))",
598 ),
599 ),
600 (
602 vec![
603 Timestamp::new_second(0),
604 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
605 ],
606 (chrono::Duration::seconds(5 * 60), None),
607 BTreeMap::from([
608 (
609 Timestamp::new_second(0),
610 Some(Timestamp::new_second(5 * 60)),
611 ),
612 (
613 Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
614 Some(Timestamp::new_second((3 + merge_dist as i64) * 5 * 60)),
615 ),
616 ]),
617 Some(
618 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:05:00' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:25:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:30:00' AS TIMESTAMP))))",
619 ),
620 ),
621 (
623 vec![
624 Timestamp::new_second(0),
625 Timestamp::new_second((merge_dist as i64) * 5 * 60),
626 ],
627 (chrono::Duration::seconds(5 * 60), None),
628 BTreeMap::from([(
629 Timestamp::new_second(0),
630 Some(Timestamp::new_second((1 + merge_dist as i64) * 5 * 60)),
631 )]),
632 Some(
633 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:20:00' AS TIMESTAMP)))",
634 ),
635 ),
636 (
638 vec![
639 Timestamp::new_second(0),
640 Timestamp::new_second((merge_dist as i64) * 3),
641 Timestamp::new_second((merge_dist as i64) * 3 * 2),
642 ],
643 (chrono::Duration::seconds(3), None),
644 BTreeMap::from([(
645 Timestamp::new_second(0),
646 Some(Timestamp::new_second((merge_dist as i64) * 7)),
647 )]),
648 Some(
649 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
650 ),
651 ),
652 (
654 Vec::from_iter((0..20).map(|i| Timestamp::new_second(i * 3)).chain(
655 std::iter::once(Timestamp::new_second(
656 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1),
657 )),
658 )),
659 (chrono::Duration::seconds(3), None),
660 BTreeMap::from([
661 (Timestamp::new_second(0), Some(Timestamp::new_second(60))),
662 (
663 Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
664 Some(Timestamp::new_second(
665 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3,
666 )),
667 ),
668 ]),
669 Some(
670 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
671 ),
672 ),
673 (
675 Vec::from_iter((0..40).map(|i| Timestamp::new_second(i * 3))),
676 (chrono::Duration::seconds(3), None),
677 BTreeMap::from([(
678 Timestamp::new_second(0),
679 Some(Timestamp::new_second(40 * 3)),
680 )]),
681 Some(
682 "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
683 ),
684 ),
685 (
687 Vec::from_iter(
688 std::iter::once(Timestamp::new_second(0))
689 .chain((0..40).map(|i| Timestamp::new_second(20 + i * 3))),
690 ),
691 (chrono::Duration::seconds(3), None),
692 BTreeMap::from([
693 (Timestamp::new_second(0), Some(Timestamp::new_second(3))),
694 (Timestamp::new_second(20), Some(Timestamp::new_second(140))),
695 ]),
696 Some(
697 "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
698 ),
699 ),
700 (
702 vec![
703 Timestamp::new_second(0),
704 Timestamp::new_second((merge_dist as i64) * 5 * 60),
705 ],
706 (
707 chrono::Duration::seconds(5 * 60),
708 Some(Timestamp::new_second((merge_dist as i64) * 6 * 60)),
709 ),
710 BTreeMap::from([]),
711 None,
712 ),
713 ];
714 for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
717 testcases
718 {
719 let mut dirty = DirtyTimeWindows::default();
720 dirty.add_lower_bounds(lower_bounds.into_iter());
721 dirty
722 .merge_dirty_time_windows(window_size, expire_lower_bound)
723 .unwrap();
724 assert_eq!(expected, dirty.windows);
725 let filter_expr = dirty
726 .gen_filter_exprs(
727 "ts",
728 expire_lower_bound,
729 window_size,
730 dirty.max_filter_num_per_query,
731 0,
732 None,
733 )
734 .unwrap()
735 .map(|e| e.expr);
736
737 let unparser = datafusion::sql::unparser::Unparser::default();
738 let to_sql = filter_expr
739 .as_ref()
740 .map(|e| unparser.expr_to_sql(e).unwrap().to_string());
741 assert_eq!(expected_filter_expr, to_sql.as_deref());
742 }
743 }
744
745 #[tokio::test]
746 async fn test_align_time_window() {
747 type TimeWindow = (Timestamp, Option<Timestamp>);
748 struct TestCase {
749 sql: String,
750 aligns: Vec<(TimeWindow, TimeWindow)>,
751 }
752 let testcases: Vec<TestCase> = vec![TestCase{
753 sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
754 aligns: vec![
755 ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
756 ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
757 ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
758 ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
759 ],
760 }];
761
762 let query_engine = create_test_query_engine();
763 let ctx = QueryContext::arc();
764 for TestCase { sql, aligns } in testcases {
765 let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
766 .await
767 .unwrap();
768
769 let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
770 &plan,
771 query_engine.engine_state().catalog_manager().clone(),
772 ctx.clone(),
773 )
774 .await
775 .unwrap();
776
777 let time_window_expr = time_window_expr
778 .map(|expr| {
779 TimeWindowExpr::from_expr(
780 &expr,
781 &column_name,
782 &df_schema,
783 &query_engine.engine_state().session_state(),
784 )
785 })
786 .transpose()
787 .unwrap()
788 .unwrap();
789
790 let dirty = DirtyTimeWindows::default();
791 for (before_align, expected_after_align) in aligns {
792 let after_align = dirty
793 .align_time_window(before_align.0, before_align.1, &time_window_expr)
794 .unwrap();
795 assert_eq!(expected_after_align, after_align);
796 }
797 }
798 }
799}