1use std::any::Any;
16use std::sync::Arc;
17
18use common_query::request::{
19 DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
20 INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
21 InitialDynFilterRegs, InitialDynFilterSnapshot,
22};
23use datafusion_common::Result;
24use datafusion_physical_expr::PhysicalExpr;
25use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
26use session::context::{QueryContext, QueryContextRef};
27use store_api::storage::RegionId;
28
29use crate::dist_plan::filter_id::build_remote_dyn_filter_id;
30use crate::dist_plan::{FilterId, QueryDynFilterRegistry, RemoteDynFilterProducerId, Subscriber};
31
32#[derive(Debug, Clone)]
33pub(crate) struct CapturedDynFilter {
34 filter_id: FilterId,
35 initial_registration: InitialDynFilterReg,
36 pub(crate) alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
37}
38
39#[derive(Debug, Clone)]
40pub(crate) struct RemoteDynFilterPushdown {
41 pub(crate) captured_dyn_filters: Vec<CapturedDynFilter>,
42 pub(crate) pushed_down: Vec<bool>,
44}
45
46pub(crate) fn capture_remote_dyn_filters_for_pushdown(
47 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
48 parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
49) -> RemoteDynFilterPushdown {
50 let mut pushed_down = Vec::with_capacity(parent_filters.len());
51 let mut captured_dyn_filters = Vec::new();
52
53 for (producer_local_ordinal, filter) in parent_filters.into_iter().enumerate() {
54 let Some(alive_dyn_filter) = downcast_dynamic_filter(filter) else {
55 pushed_down.push(false);
56 continue;
57 };
58
59 match build_captured_dyn_filter(
60 remote_dyn_filter_producer_id,
61 producer_local_ordinal,
62 alive_dyn_filter,
63 ) {
64 Ok(captured_dyn_filter) => {
65 pushed_down.push(true);
66 captured_dyn_filters.push(captured_dyn_filter);
67 }
68 Err(error) => {
69 common_telemetry::warn!(error; "Remote dyn filter is not pushed down because initial registration cannot be built");
70 pushed_down.push(false);
71 }
72 }
73 }
74
75 if let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters) {
76 common_telemetry::warn!(error; "Remote dyn filters are not pushed down because initial registrations are invalid");
77 return RemoteDynFilterPushdown {
78 captured_dyn_filters: Vec::new(),
79 pushed_down: vec![false; pushed_down.len()],
80 };
81 }
82
83 RemoteDynFilterPushdown {
84 captured_dyn_filters,
85 pushed_down,
86 }
87}
88
89fn downcast_dynamic_filter(
90 expr: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
91) -> Option<Arc<DynamicFilterPhysicalExpr>> {
92 (expr as Arc<dyn Any + Send + Sync + 'static>)
93 .downcast::<DynamicFilterPhysicalExpr>()
94 .ok()
95}
96
97pub(crate) fn register_dyn_filters_for_region(
98 registry: &QueryDynFilterRegistry,
99 region_id: RegionId,
100 captured_dyn_filters: &[CapturedDynFilter],
101) {
102 for captured_dyn_filter in captured_dyn_filters {
103 let _ = registry.register_remote_dyn_filter(
104 captured_dyn_filter.filter_id.clone(),
105 captured_dyn_filter.alive_dyn_filter.clone(),
106 );
107 let _ = registry
108 .register_subscriber(&captured_dyn_filter.filter_id, Subscriber::new(region_id));
109 }
110}
111
112fn build_captured_dyn_filter(
113 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
114 producer_local_ordinal: usize,
115 alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
116) -> Result<CapturedDynFilter> {
117 let children = alive_dyn_filter
118 .children()
119 .into_iter()
120 .cloned()
121 .collect::<Vec<_>>();
122 let filter_id = build_remote_dyn_filter_id(
123 remote_dyn_filter_producer_id,
124 producer_local_ordinal,
125 &children,
126 )?;
127 let initial_registration =
128 InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?;
129
130 Ok(CapturedDynFilter {
131 filter_id,
132 initial_registration: attach_initial_snapshot(initial_registration, &alive_dyn_filter),
133 alive_dyn_filter,
134 })
135}
136
137fn validate_initial_registrations_for_pushdown(
138 captured_dyn_filters: &[CapturedDynFilter],
139) -> std::result::Result<(), String> {
140 let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
141 regs.validate_default_bounds()?;
142 regs.to_extension_value()
143 .map_err(|error| error.to_string())?;
144 Ok(())
145}
146
147fn attach_initial_snapshot(
148 initial_registration: InitialDynFilterReg,
149 alive_dyn_filter: &DynamicFilterPhysicalExpr,
150) -> InitialDynFilterReg {
151 let Some(initial_snapshot) = initial_snapshot(alive_dyn_filter) else {
152 return initial_registration;
153 };
154
155 initial_registration.with_initial_snapshot(initial_snapshot)
156}
157
158fn initial_snapshot(
159 alive_dyn_filter: &DynamicFilterPhysicalExpr,
160) -> Option<InitialDynFilterSnapshot> {
161 let generation = alive_dyn_filter.snapshot_generation();
162 let current = match alive_dyn_filter.current() {
163 Ok(current) => current,
164 Err(error) => {
165 common_telemetry::warn!(error; "Failed to read remote dyn filter initial snapshot");
166 return None;
167 }
168 };
169
170 let payload = match DynFilterPayload::from_datafusion_expr(
171 ¤t,
172 INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES,
173 ) {
174 Ok(payload) => payload,
175 Err(error) => {
176 common_telemetry::warn!(error; "Failed to encode remote dyn filter initial snapshot");
177 return None;
178 }
179 };
180
181 let is_complete = false;
183 Some(InitialDynFilterSnapshot::new(
184 payload,
185 generation,
186 is_complete,
187 ))
188}
189
190fn build_initial_dyn_filter_regs_for_region(
191 captured_dyn_filters: &[CapturedDynFilter],
192) -> InitialDynFilterRegs {
193 InitialDynFilterRegs::new(
194 captured_dyn_filters
195 .iter()
196 .map(|captured| captured.initial_registration.clone())
197 .collect(),
198 )
199}
200
201pub(crate) fn query_context_with_initial_dyn_filter_regs(
202 query_ctx: &QueryContextRef,
203 region_id: RegionId,
204 captured_dyn_filters: &[CapturedDynFilter],
205) -> QueryContext {
206 let mut region_query_ctx = query_ctx.as_ref().clone();
207 let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
208 if regs.is_empty() {
209 return region_query_ctx;
210 }
211
212 if let Err(error) = regs.validate_default_bounds() {
213 common_telemetry::warn!(error; "Dropping initial remote dyn filter registrations for region {} that exceed configured bounds", region_id);
214 return region_query_ctx;
215 }
216
217 match regs.to_extension_value() {
218 Ok(serialized) => region_query_ctx.set_extension(
219 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
220 serialized,
221 ),
222 Err(error) => {
223 common_telemetry::warn!(error; "Failed to serialize initial remote dyn filter registrations");
224 }
225 }
226
227 region_query_ctx
228}
229
230#[cfg(test)]
231mod tests {
232 use std::fmt;
233 use std::hash::{Hash, Hasher};
234
235 use datafusion::execution::TaskContext;
236 use datafusion_common::ScalarValue;
237 use datafusion_expr::ColumnarValue;
238 use datafusion_physical_expr::expressions::{Column, lit};
239 use session::query_id::QueryId;
240 use uuid::Uuid;
241
242 use super::*;
243
244 #[derive(Debug)]
245 struct UnserializableExpr;
246
247 impl fmt::Display for UnserializableExpr {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "unserializable_expr")
250 }
251 }
252
253 impl Hash for UnserializableExpr {
254 fn hash<H: Hasher>(&self, state: &mut H) {
255 "unserializable_expr".hash(state);
256 }
257 }
258
259 impl PartialEq for UnserializableExpr {
260 fn eq(&self, _other: &Self) -> bool {
261 true
262 }
263 }
264
265 impl Eq for UnserializableExpr {}
266
267 impl datafusion_physical_expr::PhysicalExpr for UnserializableExpr {
268 fn as_any(&self) -> &dyn Any {
269 self
270 }
271
272 fn data_type(
273 &self,
274 _input_schema: &arrow_schema::Schema,
275 ) -> datafusion_common::Result<arrow_schema::DataType> {
276 Ok(arrow_schema::DataType::Boolean)
277 }
278
279 fn nullable(
280 &self,
281 _input_schema: &arrow_schema::Schema,
282 ) -> datafusion_common::Result<bool> {
283 Ok(false)
284 }
285
286 fn evaluate(
287 &self,
288 _batch: &common_recordbatch::DfRecordBatch,
289 ) -> datafusion_common::Result<ColumnarValue> {
290 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))))
291 }
292
293 fn children(&self) -> Vec<&Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
294 Vec::new()
295 }
296
297 fn with_new_children(
298 self: Arc<Self>,
299 _children: Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>>,
300 ) -> datafusion_common::Result<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
301 Ok(self)
302 }
303
304 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305 write!(f, "{self}")
306 }
307 }
308
309 fn test_query_id(value: u128) -> QueryId {
310 QueryId::from(Uuid::from_u128(value))
311 }
312
313 fn test_remote_dyn_filter_producer_id(value: u64) -> RemoteDynFilterProducerId {
314 RemoteDynFilterProducerId::new(value)
315 }
316
317 fn test_captured_dyn_filter(
318 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
319 producer_local_ordinal: usize,
320 column_name: &str,
321 column_index: usize,
322 ) -> CapturedDynFilter {
323 build_captured_dyn_filter(
324 remote_dyn_filter_producer_id,
325 producer_local_ordinal,
326 Arc::new(DynamicFilterPhysicalExpr::new(
327 vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
328 lit(true) as _,
329 )),
330 )
331 .unwrap()
332 }
333
334 fn test_dyn_filter_with_snapshot_payload(
335 column_name: &str,
336 column_index: usize,
337 payload_bytes: usize,
338 ) -> Arc<DynamicFilterPhysicalExpr> {
339 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
340 vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
341 lit(true) as _,
342 ));
343 dyn_filter
344 .update(lit(ScalarValue::Utf8(Some("x".repeat(payload_bytes)))) as _)
345 .unwrap();
346 dyn_filter
347 }
348
349 #[test]
350 fn capture_remote_dyn_filters_for_pushdown_preserves_parent_filter_ordinals() {
351 let parent_filters = vec![
352 Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
353 Arc::new(DynamicFilterPhysicalExpr::new(
354 vec![Arc::new(Column::new("host", 1)) as Arc<_>],
355 lit(true) as _,
356 )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
357 Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
358 Arc::new(DynamicFilterPhysicalExpr::new(
359 vec![Arc::new(Column::new("pod", 3)) as Arc<_>],
360 lit(true) as _,
361 )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
362 ];
363
364 let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
365 let captured =
366 capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters)
367 .captured_dyn_filters;
368
369 assert_eq!(captured.len(), 2);
370 assert_eq!(
371 captured[0].filter_id.remote_dyn_filter_producer_id(),
372 remote_dyn_filter_producer_id
373 );
374 assert_eq!(
375 captured[1].filter_id.remote_dyn_filter_producer_id(),
376 remote_dyn_filter_producer_id
377 );
378 assert_eq!(captured[0].filter_id.producer_ordinal(), 1);
379 assert_eq!(captured[1].filter_id.producer_ordinal(), 3);
380 }
381
382 #[test]
383 fn capture_remote_dyn_filters_for_pushdown_marks_only_valid_initial_regs() {
384 let parent_filters = vec![
385 Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
386 Arc::new(DynamicFilterPhysicalExpr::new(
387 vec![Arc::new(Column::new("host", 1)) as Arc<_>],
388 lit(true) as _,
389 )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
390 Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
391 ];
392
393 let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
394 let pushdown =
395 capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
396
397 assert_eq!(pushdown.pushed_down, vec![false, true, false]);
398 assert_eq!(pushdown.captured_dyn_filters.len(), 1);
399 assert_eq!(
400 pushdown.captured_dyn_filters[0]
401 .filter_id
402 .remote_dyn_filter_producer_id(),
403 remote_dyn_filter_producer_id
404 );
405 assert_eq!(
406 pushdown.captured_dyn_filters[0]
407 .filter_id
408 .producer_ordinal(),
409 1
410 );
411 assert!(
412 pushdown.captured_dyn_filters[0]
413 .initial_registration
414 .initial_snapshot
415 .is_some()
416 );
417 }
418
419 #[test]
420 fn capture_remote_dyn_filters_for_pushdown_rejects_unencodable_registration() {
421 let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
422 vec![Arc::new(UnserializableExpr) as Arc<_>],
423 lit(true) as _,
424 ))
425 as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
426
427 let pushdown = capture_remote_dyn_filters_for_pushdown(
428 test_remote_dyn_filter_producer_id(42),
429 parent_filters,
430 );
431
432 assert_eq!(pushdown.pushed_down, vec![false]);
433 assert!(pushdown.captured_dyn_filters.is_empty());
434 }
435
436 #[test]
437 fn capture_remote_dyn_filters_for_pushdown_attaches_initial_snapshot() {
438 let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
439 vec![Arc::new(Column::new("host", 1)) as Arc<_>],
440 lit(true) as _,
441 ))
442 as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
443
444 let pushdown = capture_remote_dyn_filters_for_pushdown(
445 test_remote_dyn_filter_producer_id(42),
446 parent_filters,
447 );
448
449 assert_eq!(pushdown.pushed_down, vec![true]);
450 assert!(
451 pushdown.captured_dyn_filters[0]
452 .initial_registration
453 .initial_snapshot
454 .is_some()
455 );
456 }
457
458 #[test]
459 fn capture_remote_dyn_filters_for_pushdown_attaches_initial_snapshot_after_update() {
460 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
461 vec![Arc::new(Column::new("host", 1)) as Arc<_>],
462 lit(true) as _,
463 ));
464 dyn_filter.update(lit(false) as _).unwrap();
465 let parent_filters = vec![dyn_filter as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
466
467 let pushdown = capture_remote_dyn_filters_for_pushdown(
468 test_remote_dyn_filter_producer_id(42),
469 parent_filters,
470 );
471
472 assert_eq!(pushdown.pushed_down, vec![true]);
473 let snapshot = pushdown.captured_dyn_filters[0]
474 .initial_registration
475 .initial_snapshot
476 .as_ref()
477 .unwrap();
478 assert_eq!(snapshot.generation, 2);
479 assert!(!snapshot.is_complete);
480 assert!(matches!(
481 snapshot.payload,
482 DynFilterPayload::Datafusion(ref bytes) if !bytes.is_empty()
483 ));
484 }
485
486 #[test]
487 fn capture_remote_dyn_filters_for_pushdown_rejects_oversized_snapshots() {
488 let parent_filters = vec![
489 test_dyn_filter_with_snapshot_payload("host", 0, 40 * 1024)
490 as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
491 test_dyn_filter_with_snapshot_payload("pod", 1, 40 * 1024)
492 as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
493 ];
494
495 let pushdown = capture_remote_dyn_filters_for_pushdown(
496 test_remote_dyn_filter_producer_id(42),
497 parent_filters,
498 );
499
500 assert_eq!(pushdown.pushed_down, vec![false, false]);
501 assert!(pushdown.captured_dyn_filters.is_empty());
502 }
503
504 #[test]
505 fn capture_remote_dyn_filters_for_pushdown_rejects_too_many_regs_with_snapshots() {
506 const TOO_MANY_INITIAL_REGS: usize = 65;
507
508 let parent_filters = (0..TOO_MANY_INITIAL_REGS)
509 .map(|ordinal| {
510 test_dyn_filter_with_snapshot_payload(&format!("host_{ordinal}"), ordinal, 1)
511 as Arc<dyn datafusion::physical_plan::PhysicalExpr>
512 })
513 .collect::<Vec<_>>();
514
515 let pushdown = capture_remote_dyn_filters_for_pushdown(
516 test_remote_dyn_filter_producer_id(42),
517 parent_filters,
518 );
519
520 assert!(pushdown.captured_dyn_filters.is_empty());
521 assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
522 }
523
524 #[test]
525 fn capture_remote_dyn_filters_for_pushdown_rejects_regs_exceeding_bounds() {
526 const TOO_MANY_INITIAL_REGS: usize = 65;
527
528 let parent_filters = (0..TOO_MANY_INITIAL_REGS)
529 .map(|_| {
530 Arc::new(DynamicFilterPhysicalExpr::new(
531 vec![Arc::new(Column::new("host", 0)) as Arc<_>],
532 lit(true) as _,
533 )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>
534 })
535 .collect::<Vec<_>>();
536
537 let pushdown = capture_remote_dyn_filters_for_pushdown(
538 test_remote_dyn_filter_producer_id(42),
539 parent_filters,
540 );
541
542 assert!(pushdown.captured_dyn_filters.is_empty());
543 assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
544 }
545
546 #[test]
547 fn register_dyn_filters_for_region_reuses_existing_entry() {
548 let registry = QueryDynFilterRegistry::new(test_query_id(1));
549 let captured_dyn_filters = vec![test_captured_dyn_filter(
550 test_remote_dyn_filter_producer_id(42),
551 2,
552 "host",
553 0,
554 )];
555 let first_region_id = RegionId::new(1024, 7);
556 let second_region_id = RegionId::new(1024, 8);
557
558 register_dyn_filters_for_region(®istry, first_region_id, &captured_dyn_filters);
559 register_dyn_filters_for_region(®istry, second_region_id, &captured_dyn_filters);
560
561 assert_eq!(registry.entry_count(), 1);
562 let entry = registry.entries().pop().unwrap();
563 assert_eq!(
564 entry.filter_id().remote_dyn_filter_producer_id(),
565 test_remote_dyn_filter_producer_id(42)
566 );
567 assert_eq!(entry.filter_id().producer_ordinal(), 2);
568 let subscribers = entry.subscribers();
569 assert_eq!(subscribers.len(), 2);
570 assert!(
571 subscribers
572 .iter()
573 .any(|subscriber| subscriber.region_id() == first_region_id)
574 );
575 assert!(
576 subscribers
577 .iter()
578 .any(|subscriber| subscriber.region_id() == second_region_id)
579 );
580 }
581
582 #[test]
583 fn register_dyn_filters_for_region_keeps_independent_producer_ids_distinct() {
584 let registry = QueryDynFilterRegistry::new(test_query_id(1));
585 let region_id = RegionId::new(1024, 7);
586 let make_filter = |remote_dyn_filter_producer_id| {
587 test_captured_dyn_filter(remote_dyn_filter_producer_id, 2, "host", 0)
588 };
589
590 register_dyn_filters_for_region(
591 ®istry,
592 region_id,
593 &[make_filter(test_remote_dyn_filter_producer_id(42))],
594 );
595 register_dyn_filters_for_region(
596 ®istry,
597 region_id,
598 &[make_filter(test_remote_dyn_filter_producer_id(43))],
599 );
600
601 assert_eq!(registry.entry_count(), 2);
602 }
603
604 #[test]
605 fn query_context_includes_region_initial_dyn_filter_regs() {
606 let captured_dyn_filters = vec![test_captured_dyn_filter(
607 test_remote_dyn_filter_producer_id(42),
608 2,
609 "host",
610 0,
611 )];
612 let region_id = RegionId::new(1024, 7);
613 let query_ctx = QueryContext::arc();
614
615 let region_query_ctx = query_context_with_initial_dyn_filter_regs(
616 &query_ctx,
617 region_id,
618 &captured_dyn_filters,
619 );
620 let extension = region_query_ctx
621 .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
622 .unwrap();
623 let regs = InitialDynFilterRegs::from_extension_value(extension).unwrap();
624 let decoded_children = regs.regs[0]
625 .decode_children(
626 &TaskContext::default(),
627 &arrow_schema::Schema::new(vec![arrow_schema::Field::new(
628 "host",
629 arrow_schema::DataType::Utf8,
630 false,
631 )]),
632 1024,
633 )
634 .unwrap();
635 assert_eq!(regs.regs.len(), 1);
636 assert_eq!(
637 regs.regs[0].filter_id,
638 captured_dyn_filters[0].filter_id.to_string()
639 );
640 assert_eq!(decoded_children.len(), 1);
641 assert!(decoded_children[0].as_any().is::<Column>());
642 }
643
644 #[test]
645 fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() {
646 let captured_dyn_filters = vec![
647 test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
648 test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
649 ];
650 let region_id = RegionId::new(1024, 7);
651 let query_ctx = QueryContext::arc();
652
653 let region_query_ctx = query_context_with_initial_dyn_filter_regs(
654 &query_ctx,
655 region_id,
656 &captured_dyn_filters,
657 );
658
659 assert!(
660 region_query_ctx
661 .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
662 .is_none()
663 );
664 }
665}