1use std::collections::BTreeMap;
20
21use snafu::{OptionExt, ResultExt};
22use urlencoding::decode;
23
24use crate::error::{
25 CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
26 FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
27 ProcessorMissingFieldSnafu, Result,
28};
29use crate::etl::field::Fields;
30use crate::etl::processor::{
31 yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
32 IGNORE_MISSING_NAME,
33};
34use crate::etl::value::Value;
35
36pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
37
38const CMCD_KEY_BR: &str = "br"; const CMCD_KEY_BL: &str = "bl"; const CMCD_KEY_BS: &str = "bs"; const CMCD_KEY_CID: &str = "cid"; const CMCD_KEY_D: &str = "d"; const CMCD_KEY_DL: &str = "dl"; const CMCD_KEY_MTP: &str = "mtp"; const CMCD_KEY_NOR: &str = "nor"; const CMCD_KEY_NRR: &str = "nrr"; const CMCD_KEY_OT: &str = "ot"; const CMCD_KEY_PR: &str = "pr"; const CMCD_KEY_RTP: &str = "rtp"; const CMCD_KEY_SF: &str = "sf"; const CMCD_KEY_SID: &str = "sid"; const CMCD_KEY_ST: &str = "st"; const CMCD_KEY_SU: &str = "su"; const CMCD_KEY_TB: &str = "tb"; const CMCD_KEY_V: &str = "v"; const CMCD_KEYS: [&str; 18] = [
58 CMCD_KEY_BR,
59 CMCD_KEY_BL,
60 CMCD_KEY_BS,
61 CMCD_KEY_CID,
62 CMCD_KEY_D,
63 CMCD_KEY_DL,
64 CMCD_KEY_MTP,
65 CMCD_KEY_NOR,
66 CMCD_KEY_NRR,
67 CMCD_KEY_OT,
68 CMCD_KEY_PR,
69 CMCD_KEY_RTP,
70 CMCD_KEY_SF,
71 CMCD_KEY_SID,
72 CMCD_KEY_ST,
73 CMCD_KEY_SU,
74 CMCD_KEY_TB,
75 CMCD_KEY_V,
76];
77
78fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value> {
80 Ok(Value::Boolean(true))
81}
82
83fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
85 let v = v.context(CmcdMissingValueSnafu { k, s })?;
86 let val: i64 = v
87 .parse()
88 .context(FailedToParseIntKeySnafu { key: k, value: v })?;
89 Ok(Value::Int64(val))
90}
91
92fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
94 let v = v.context(CmcdMissingValueSnafu { k, s })?;
95 Ok(Value::String(v.to_string()))
96}
97
98fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
100 let v = v.context(CmcdMissingValueSnafu { k, s })?;
101 let val = match decode(v) {
102 Ok(val) => val.to_string(),
103 Err(_) => v.to_string(),
104 };
105 Ok(Value::String(val))
106}
107
108fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
110 let v = v.context(CmcdMissingValueSnafu { k, s })?;
111 let val: f64 = v
112 .parse()
113 .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
114 Ok(Value::Float64(val))
115}
116
117#[derive(Debug, Default)]
153pub struct CmcdProcessor {
154 fields: Fields,
155 ignore_missing: bool,
156}
157
158impl CmcdProcessor {
159 fn generate_key(prefix: &str, key: &str) -> String {
160 format!("{}_{}", prefix, key)
161 }
162
163 fn parse(&self, name: &str, value: &str) -> Result<BTreeMap<String, Value>> {
164 let mut working_set = BTreeMap::new();
165
166 let parts = value.split(',');
167
168 for part in parts {
169 let mut kv = part.split('=');
170 let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
171 let v = kv.next();
172
173 for cmcd_key in CMCD_KEYS {
174 if cmcd_key == k {
175 match cmcd_key {
176 CMCD_KEY_BS | CMCD_KEY_SU => {
177 working_set
178 .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
179 }
180 CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
181 | CMCD_KEY_RTP | CMCD_KEY_TB => {
182 working_set
183 .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
184 }
185 CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
186 | CMCD_KEY_ST | CMCD_KEY_V => {
187 working_set
188 .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
189 }
190 CMCD_KEY_NOR => {
191 working_set
192 .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
193 }
194 CMCD_KEY_PR => {
195 working_set
196 .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
197 }
198
199 _ => {}
200 }
201 }
202 }
203 }
204 Ok(working_set)
205 }
206}
207
208impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
209 type Error = Error;
210
211 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
212 let mut fields = Fields::default();
213 let mut ignore_missing = false;
214
215 for (k, v) in value.iter() {
216 let key = k
217 .as_str()
218 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
219 match key {
220 FIELD_NAME => {
221 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
222 }
223 FIELDS_NAME => {
224 fields = yaml_new_fields(v, FIELDS_NAME)?;
225 }
226
227 IGNORE_MISSING_NAME => {
228 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
229 }
230
231 _ => {}
232 }
233 }
234
235 let proc = CmcdProcessor {
236 fields,
237 ignore_missing,
238 };
239
240 Ok(proc)
241 }
242}
243
244impl Processor for CmcdProcessor {
245 fn kind(&self) -> &str {
246 PROCESSOR_CMCD
247 }
248
249 fn ignore_missing(&self) -> bool {
250 self.ignore_missing
251 }
252
253 fn exec_mut(&self, mut val: Value) -> Result<Value> {
254 for field in self.fields.iter() {
255 let name = field.input_field();
256
257 match val.get(name) {
258 Some(Value::String(s)) => {
259 let results = self.parse(field.target_or_input_field(), s)?;
260 val.extend(results.into())?;
261 }
262 Some(Value::Null) | None => {
263 if !self.ignore_missing {
264 return ProcessorMissingFieldSnafu {
265 processor: self.kind().to_string(),
266 field: name.to_string(),
267 }
268 .fail();
269 }
270 }
271 Some(v) => {
272 return ProcessorExpectStringSnafu {
273 processor: self.kind().to_string(),
274 v: v.clone(),
275 }
276 .fail();
277 }
278 }
279 }
280
281 Ok(val)
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use urlencoding::decode;
288
289 use super::*;
290 use crate::etl::field::{Field, Fields};
291 use crate::etl::value::Value;
292
293 #[test]
294 fn test_cmcd() {
295 let ss = [
296 (
297 "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
298 vec![(
299 "prefix_sid",
300 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
301 )],
302 ),
303 (
304 "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
305 vec![
306 ("prefix_bs", Value::Boolean(true)),
307 ("prefix_ot", Value::String("v".into())),
308 ("prefix_rtp", Value::Int64(15000)),
309 ("prefix_br", Value::Int64(3200)),
310 ("prefix_tb", Value::Int64(6000)),
311 ("prefix_d", Value::Int64(4004)),
312 (
313 "prefix_sid",
314 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
315 ),
316 ("prefix_mtp", Value::Int64(25400)),
317 ],
318 ),
319 (
320 "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
322 vec![
323 (
324 "prefix_sid",
325 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
326 ),
327 ("prefix_rtp", Value::Int64(15000)),
328 ],
329 ),
330 (
331 "bs%2Csu",
332 vec![
333 ("prefix_su", Value::Boolean(true)),
334 ("prefix_bs", Value::Boolean(true)),
335 ],
336 ),
337 (
338 "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
340 vec![
341 ("prefix_d", Value::Int64(4004)),
350 ],
351 ),
352 (
353 "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
354 vec![
355 (
356 "prefix_sid",
357 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
358 ),
359 (
360 "prefix_nor",
361 Value::String("\"../300kbps/segment35.m4v\"".into()),
362
363 ),
364 ],
365 ),
366 (
367 "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
368 vec![
369 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
370 (
371 "prefix_sid",
372 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
373 ),
374 ],
375 ),
376 (
377 "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
378 vec![
379 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
380 (
381 "prefix_sid",
382 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
383 ),
384 (
385 "prefix_nor",
386 Value::String("\"../300kbps/track.m4v\"".into()),
387 ),
388 ],
389 ),
390 (
391 "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
392 vec![
393 ("prefix_bl", Value::Int64(21300)),
394 ("prefix_bs", Value::Boolean(true)),
395 ("prefix_st", Value::String("v".into())),
396 ("prefix_ot", Value::String("v".into())),
397 (
398 "prefix_sid",
399 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
400 ),
401 ("prefix_tb", Value::Int64(6000)),
402 ("prefix_d", Value::Int64(4004)),
403 (
404 "prefix_cid",
405 Value::String("\"faec5fc2-ac30-11eabb37-0242ac130002\"".into()),
406 ),
407 ("prefix_mtp", Value::Int64(48100)),
408 ("prefix_rtp", Value::Int64(12000)),
409 (
410 "prefix_nor",
411 Value::String("\"../300kbps/track.m4v\"".into()),
412 ),
413 ("prefix_sf", Value::String("d".into())),
414 ("prefix_br", Value::Int64(3200)),
415 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
416 ("prefix_pr", Value::Float64(1.08)),
417 ("prefix_su", Value::Boolean(true)),
418 ("prefix_dl", Value::Int64(18500)),
419 ],
420 ),
421 ];
422
423 let field = Field::new("prefix", None);
424
425 let processor = CmcdProcessor {
426 fields: Fields::new(vec![field]),
427 ignore_missing: false,
428 };
429
430 for (s, vec) in ss.into_iter() {
431 let decoded = decode(s).unwrap().to_string();
432
433 let expected = vec
434 .into_iter()
435 .map(|(k, v)| (k.to_string(), v))
436 .collect::<BTreeMap<String, Value>>();
437
438 let actual = processor.parse("prefix", &decoded).unwrap();
439 assert_eq!(actual, expected);
440 }
441 }
442}