1use std::collections::BTreeMap;
20
21use ordered_float::NotNan;
22use snafu::{OptionExt, ResultExt};
23use urlencoding::decode;
24use vrl::prelude::Bytes;
25use vrl::value::{KeyString, Value as VrlValue};
26
27use crate::error::{
28 CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
29 FailedToParseIntKeySnafu, FloatIsNanSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
30 ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
31};
32use crate::etl::field::Fields;
33use crate::etl::processor::{
34 yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
35 IGNORE_MISSING_NAME,
36};
37
38pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
39
40const CMCD_KEY_BR: &str = "br"; const CMCD_KEY_BL: &str = "bl"; const CMCD_KEY_BS: &str = "bs"; const CMCD_KEY_CID: &str = "cid"; const CMCD_KEY_D: &str = "d"; const CMCD_KEY_DL: &str = "dl"; const CMCD_KEY_MTP: &str = "mtp"; const CMCD_KEY_NOR: &str = "nor"; const CMCD_KEY_NRR: &str = "nrr"; const CMCD_KEY_OT: &str = "ot"; const CMCD_KEY_PR: &str = "pr"; const CMCD_KEY_RTP: &str = "rtp"; const CMCD_KEY_SF: &str = "sf"; const CMCD_KEY_SID: &str = "sid"; const CMCD_KEY_ST: &str = "st"; const CMCD_KEY_SU: &str = "su"; const CMCD_KEY_TB: &str = "tb"; const CMCD_KEY_V: &str = "v"; const CMCD_KEYS: [&str; 18] = [
60 CMCD_KEY_BR,
61 CMCD_KEY_BL,
62 CMCD_KEY_BS,
63 CMCD_KEY_CID,
64 CMCD_KEY_D,
65 CMCD_KEY_DL,
66 CMCD_KEY_MTP,
67 CMCD_KEY_NOR,
68 CMCD_KEY_NRR,
69 CMCD_KEY_OT,
70 CMCD_KEY_PR,
71 CMCD_KEY_RTP,
72 CMCD_KEY_SF,
73 CMCD_KEY_SID,
74 CMCD_KEY_ST,
75 CMCD_KEY_SU,
76 CMCD_KEY_TB,
77 CMCD_KEY_V,
78];
79
80fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<VrlValue> {
82 Ok(VrlValue::Boolean(true))
83}
84
85fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
87 let v = v.context(CmcdMissingValueSnafu { k, s })?;
88 let val: i64 = v
89 .parse()
90 .context(FailedToParseIntKeySnafu { key: k, value: v })?;
91 Ok(VrlValue::Integer(val))
92}
93
94fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
96 let v = v.context(CmcdMissingValueSnafu { k, s })?;
97 Ok(VrlValue::Bytes(Bytes::from(v.to_string())))
98}
99
100fn nor(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
102 let v = v.context(CmcdMissingValueSnafu { k, s })?;
103 let val = match decode(v) {
104 Ok(val) => val.to_string(),
105 Err(_) => v.to_string(),
106 };
107 Ok(VrlValue::Bytes(Bytes::from(val)))
108}
109
110fn pr(s: &str, k: &str, v: Option<&str>) -> Result<VrlValue> {
112 let v = v.context(CmcdMissingValueSnafu { k, s })?;
113 let val: f64 = v
114 .parse()
115 .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
116 let val = NotNan::new(val).context(FloatIsNanSnafu)?;
117 Ok(VrlValue::Float(val))
118}
119
120#[derive(Debug, Default)]
156pub struct CmcdProcessor {
157 fields: Fields,
158 ignore_missing: bool,
159}
160
161impl CmcdProcessor {
162 fn generate_key(prefix: &str, key: &str) -> KeyString {
163 KeyString::from(format!("{}_{}", prefix, key))
164 }
165
166 fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<KeyString, VrlValue>> {
167 let mut working_set = BTreeMap::new();
168
169 let parts = value.split(',');
170
171 for part in parts {
172 let mut kv = part.split('=');
173 let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
174 let v = kv.next();
175
176 for cmcd_key in CMCD_KEYS {
177 if cmcd_key == k {
178 match cmcd_key {
179 CMCD_KEY_BS | CMCD_KEY_SU => {
180 working_set
181 .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
182 }
183 CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
184 | CMCD_KEY_RTP | CMCD_KEY_TB => {
185 working_set
186 .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
187 }
188 CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
189 | CMCD_KEY_ST | CMCD_KEY_V => {
190 working_set
191 .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
192 }
193 CMCD_KEY_NOR => {
194 working_set
195 .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
196 }
197 CMCD_KEY_PR => {
198 working_set
199 .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
200 }
201
202 _ => {}
203 }
204 }
205 }
206 }
207 Ok(working_set)
208 }
209}
210
211impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
212 type Error = Error;
213
214 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
215 let mut fields = Fields::default();
216 let mut ignore_missing = false;
217
218 for (k, v) in value.iter() {
219 let key = k
220 .as_str()
221 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
222 match key {
223 FIELD_NAME => {
224 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
225 }
226 FIELDS_NAME => {
227 fields = yaml_new_fields(v, FIELDS_NAME)?;
228 }
229
230 IGNORE_MISSING_NAME => {
231 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
232 }
233
234 _ => {}
235 }
236 }
237
238 let proc = CmcdProcessor {
239 fields,
240 ignore_missing,
241 };
242
243 Ok(proc)
244 }
245}
246
247impl Processor for CmcdProcessor {
248 fn kind(&self) -> &str {
249 PROCESSOR_CMCD
250 }
251
252 fn ignore_missing(&self) -> bool {
253 self.ignore_missing
254 }
255
256 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
257 for field in self.fields.iter() {
258 let name = field.input_field();
259 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
260 match val.get(name) {
261 Some(VrlValue::Bytes(s)) => {
262 let s = String::from_utf8_lossy(s);
263 let results = self.parse(field.target_or_input_field(), &s)?;
264
265 val.extend(results);
266 }
267 Some(VrlValue::Null) | None => {
268 if !self.ignore_missing {
269 return ProcessorMissingFieldSnafu {
270 processor: self.kind().to_string(),
271 field: name.to_string(),
272 }
273 .fail();
274 }
275 }
276 Some(v) => {
277 return ProcessorExpectStringSnafu {
278 processor: self.kind().to_string(),
279 v: v.clone(),
280 }
281 .fail();
282 }
283 }
284 }
285
286 Ok(val)
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use urlencoding::decode;
293
294 use super::*;
295 use crate::etl::field::{Field, Fields};
296
297 #[test]
298 fn test_cmcd() {
299 let ss = [
300 (
301 "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
302 vec![(
303 "prefix_sid",
304 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
305 )],
306 ),
307 (
308 "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
309 vec![
310 ("prefix_bs", VrlValue::Boolean(true)),
311 ("prefix_ot", VrlValue::Bytes(Bytes::from("v"))),
312 ("prefix_rtp", VrlValue::Integer(15000)),
313 ("prefix_br", VrlValue::Integer(3200)),
314 ("prefix_tb", VrlValue::Integer(6000)),
315 ("prefix_d", VrlValue::Integer(4004)),
316 (
317 "prefix_sid",
318 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
319 ),
320 ("prefix_mtp", VrlValue::Integer(25400)),
321 ],
322 ),
323 (
324 "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
326 vec![
327 (
328 "prefix_sid",
329 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
330 ),
331 ("prefix_rtp", VrlValue::Integer(15000)),
332 ],
333 ),
334 (
335 "bs%2Csu",
336 vec![
337 ("prefix_su", VrlValue::Boolean(true)),
338 ("prefix_bs", VrlValue::Boolean(true)),
339 ],
340 ),
341 (
342 "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
344 vec![
345 ("prefix_d", VrlValue::Integer(4004)),
354 ],
355 ),
356 (
357 "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
358 vec![
359 (
360 "prefix_sid",
361 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
362 ),
363 (
364 "prefix_nor",
365 VrlValue::Bytes(Bytes::from("\"../300kbps/segment35.m4v\"")),
366
367 ),
368 ],
369 ),
370 (
371 "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
372 vec![
373 ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
374 (
375 "prefix_sid",
376 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
377 ),
378 ],
379 ),
380 (
381 "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
382 vec![
383 ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
384 (
385 "prefix_sid",
386 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
387 ),
388 (
389 "prefix_nor",
390 VrlValue::Bytes(Bytes::from("\"../300kbps/track.m4v\"")),
391 ),
392 ],
393 ),
394 (
395 "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
396 vec![
397 ("prefix_bl", VrlValue::Integer(21300)),
398 ("prefix_bs", VrlValue::Boolean(true)),
399 ("prefix_st", VrlValue::Bytes(Bytes::from("v"))),
400 ("prefix_ot", VrlValue::Bytes(Bytes::from("v"))),
401 (
402 "prefix_sid",
403 VrlValue::Bytes(Bytes::from("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"")),
404 ),
405 ("prefix_tb", VrlValue::Integer(6000)),
406 ("prefix_d", VrlValue::Integer(4004)),
407 (
408 "prefix_cid",
409 VrlValue::Bytes(Bytes::from("\"faec5fc2-ac30-11eabb37-0242ac130002\"")),
410 ),
411 ("prefix_mtp", VrlValue::Integer(48100)),
412 ("prefix_rtp", VrlValue::Integer(12000)),
413 (
414 "prefix_nor",
415 VrlValue::Bytes(Bytes::from("\"../300kbps/track.m4v\"")),
416 ),
417 ("prefix_sf", VrlValue::Bytes(Bytes::from("d"))),
418 ("prefix_br", VrlValue::Integer(3200)),
419 ("prefix_nrr", VrlValue::Bytes(Bytes::from("\"12323-48763\""))),
420 ("prefix_pr", VrlValue::Float(NotNan::new(1.08).unwrap())),
421 ("prefix_su", VrlValue::Boolean(true)),
422 ("prefix_dl", VrlValue::Integer(18500)),
423 ],
424 ),
425 ];
426
427 let field = Field::new("prefix", None);
428
429 let processor = CmcdProcessor {
430 fields: Fields::new(vec![field]),
431 ignore_missing: false,
432 };
433
434 for (s, vec) in ss.into_iter() {
435 let decoded = decode(s).unwrap().to_string();
436
437 let expected = vec
438 .into_iter()
439 .map(|(k, v)| (KeyString::from(k.to_string()), v))
440 .collect::<BTreeMap<KeyString, VrlValue>>();
441
442 let actual = processor.parse("prefix", &decoded).unwrap();
443 assert_eq!(actual, expected);
444 }
445 }
446}