1use snafu::{OptionExt, ResultExt};
20use urlencoding::decode;
21
22use crate::error::{
23 CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
24 FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25 ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29 yaml_bool, yaml_new_field, yaml_new_fields, Processor, FIELDS_NAME, FIELD_NAME,
30 IGNORE_MISSING_NAME,
31};
32use crate::etl::value::Value;
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_CMCD: &str = "cmcd";
36
37const CMCD_KEY_BR: &str = "br"; const CMCD_KEY_BL: &str = "bl"; const CMCD_KEY_BS: &str = "bs"; const CMCD_KEY_CID: &str = "cid"; const CMCD_KEY_D: &str = "d"; const CMCD_KEY_DL: &str = "dl"; const CMCD_KEY_MTP: &str = "mtp"; const CMCD_KEY_NOR: &str = "nor"; const CMCD_KEY_NRR: &str = "nrr"; const CMCD_KEY_OT: &str = "ot"; const CMCD_KEY_PR: &str = "pr"; const CMCD_KEY_RTP: &str = "rtp"; const CMCD_KEY_SF: &str = "sf"; const CMCD_KEY_SID: &str = "sid"; const CMCD_KEY_ST: &str = "st"; const CMCD_KEY_SU: &str = "su"; const CMCD_KEY_TB: &str = "tb"; const CMCD_KEY_V: &str = "v"; const CMCD_KEYS: [&str; 18] = [
57 CMCD_KEY_BR,
58 CMCD_KEY_BL,
59 CMCD_KEY_BS,
60 CMCD_KEY_CID,
61 CMCD_KEY_D,
62 CMCD_KEY_DL,
63 CMCD_KEY_MTP,
64 CMCD_KEY_NOR,
65 CMCD_KEY_NRR,
66 CMCD_KEY_OT,
67 CMCD_KEY_PR,
68 CMCD_KEY_RTP,
69 CMCD_KEY_SF,
70 CMCD_KEY_SID,
71 CMCD_KEY_ST,
72 CMCD_KEY_SU,
73 CMCD_KEY_TB,
74 CMCD_KEY_V,
75];
76
77fn bs_su(_: &str, _: &str, _: Option<&str>) -> Result<Value> {
79 Ok(Value::Boolean(true))
80}
81
82fn br_tb(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
84 let v = v.context(CmcdMissingValueSnafu { k, s })?;
85 let val: i64 = v
86 .parse()
87 .context(FailedToParseIntKeySnafu { key: k, value: v })?;
88 Ok(Value::Int64(val))
89}
90
91fn cid_v(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
93 let v = v.context(CmcdMissingValueSnafu { k, s })?;
94 Ok(Value::String(v.to_string()))
95}
96
97fn nor(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
99 let v = v.context(CmcdMissingValueSnafu { k, s })?;
100 let val = match decode(v) {
101 Ok(val) => val.to_string(),
102 Err(_) => v.to_string(),
103 };
104 Ok(Value::String(val))
105}
106
107fn pr(s: &str, k: &str, v: Option<&str>) -> Result<Value> {
109 let v = v.context(CmcdMissingValueSnafu { k, s })?;
110 let val: f64 = v
111 .parse()
112 .context(FailedToParseFloatKeySnafu { key: k, value: v })?;
113 Ok(Value::Float64(val))
114}
115
116#[derive(Debug, Default)]
152pub struct CmcdProcessor {
153 fields: Fields,
154 ignore_missing: bool,
155}
156
157impl CmcdProcessor {
158 fn generate_key(prefix: &str, key: &str) -> String {
159 format!("{}_{}", prefix, key)
160 }
161
162 fn parse(&self, name: &str, value: &str) -> Result<PipelineMap> {
163 let mut working_set = PipelineMap::new();
164
165 let parts = value.split(',');
166
167 for part in parts {
168 let mut kv = part.split('=');
169 let k = kv.next().context(CmcdMissingKeySnafu { part, s: value })?;
170 let v = kv.next();
171
172 for cmcd_key in CMCD_KEYS {
173 if cmcd_key == k {
174 match cmcd_key {
175 CMCD_KEY_BS | CMCD_KEY_SU => {
176 working_set
177 .insert(Self::generate_key(name, cmcd_key), bs_su(value, k, v)?);
178 }
179 CMCD_KEY_BR | CMCD_KEY_BL | CMCD_KEY_D | CMCD_KEY_DL | CMCD_KEY_MTP
180 | CMCD_KEY_RTP | CMCD_KEY_TB => {
181 working_set
182 .insert(Self::generate_key(name, cmcd_key), br_tb(value, k, v)?);
183 }
184 CMCD_KEY_CID | CMCD_KEY_NRR | CMCD_KEY_OT | CMCD_KEY_SF | CMCD_KEY_SID
185 | CMCD_KEY_ST | CMCD_KEY_V => {
186 working_set
187 .insert(Self::generate_key(name, cmcd_key), cid_v(value, k, v)?);
188 }
189 CMCD_KEY_NOR => {
190 working_set
191 .insert(Self::generate_key(name, cmcd_key), nor(value, k, v)?);
192 }
193 CMCD_KEY_PR => {
194 working_set
195 .insert(Self::generate_key(name, cmcd_key), pr(value, k, v)?);
196 }
197
198 _ => {}
199 }
200 }
201 }
202 }
203 Ok(working_set)
204 }
205}
206
207impl TryFrom<&yaml_rust::yaml::Hash> for CmcdProcessor {
208 type Error = Error;
209
210 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
211 let mut fields = Fields::default();
212 let mut ignore_missing = false;
213
214 for (k, v) in value.iter() {
215 let key = k
216 .as_str()
217 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
218 match key {
219 FIELD_NAME => {
220 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
221 }
222 FIELDS_NAME => {
223 fields = yaml_new_fields(v, FIELDS_NAME)?;
224 }
225
226 IGNORE_MISSING_NAME => {
227 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
228 }
229
230 _ => {}
231 }
232 }
233
234 let proc = CmcdProcessor {
235 fields,
236 ignore_missing,
237 };
238
239 Ok(proc)
240 }
241}
242
243impl Processor for CmcdProcessor {
244 fn kind(&self) -> &str {
245 PROCESSOR_CMCD
246 }
247
248 fn ignore_missing(&self) -> bool {
249 self.ignore_missing
250 }
251
252 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
253 for field in self.fields.iter() {
254 let name = field.input_field();
255
256 match val.get(name) {
257 Some(Value::String(s)) => {
258 let results = self.parse(field.target_or_input_field(), s)?;
259 val.extend(results);
260 }
261 Some(Value::Null) | None => {
262 if !self.ignore_missing {
263 return ProcessorMissingFieldSnafu {
264 processor: self.kind().to_string(),
265 field: name.to_string(),
266 }
267 .fail();
268 }
269 }
270 Some(v) => {
271 return ProcessorExpectStringSnafu {
272 processor: self.kind().to_string(),
273 v: v.clone(),
274 }
275 .fail();
276 }
277 }
278 }
279
280 Ok(())
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use urlencoding::decode;
287
288 use super::*;
289 use crate::etl::field::{Field, Fields};
290 use crate::etl::value::Value;
291
292 #[test]
293 fn test_cmcd() {
294 let ss = [
295 (
296 "sid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
297 vec![(
298 "prefix_sid",
299 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
300 )],
301 ),
302 (
303 "br%3D3200%2Cbs%2Cd%3D4004%2Cmtp%3D25400%2Cot%3Dv%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Ctb%3D6000",
304 vec![
305 ("prefix_bs", Value::Boolean(true)),
306 ("prefix_ot", Value::String("v".into())),
307 ("prefix_rtp", Value::Int64(15000)),
308 ("prefix_br", Value::Int64(3200)),
309 ("prefix_tb", Value::Int64(6000)),
310 ("prefix_d", Value::Int64(4004)),
311 (
312 "prefix_sid",
313 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
314 ),
315 ("prefix_mtp", Value::Int64(25400)),
316 ],
317 ),
318 (
319 "b%2Crtp%3D15000%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
321 vec![
322 (
323 "prefix_sid",
324 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
325 ),
326 ("prefix_rtp", Value::Int64(15000)),
327 ],
328 ),
329 (
330 "bs%2Csu",
331 vec![
332 ("prefix_su", Value::Boolean(true)),
333 ("prefix_bs", Value::Boolean(true)),
334 ],
335 ),
336 (
337 "d%3D4004%2Ccom.example-myNumericKey%3D500%2Ccom.examplemyStringKey%3D%22myStringValue%22",
339 vec![
340 ("prefix_d", Value::Int64(4004)),
349 ],
350 ),
351 (
352 "nor%3D%22..%252F300kbps%252Fsegment35.m4v%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
353 vec![
354 (
355 "prefix_sid",
356 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
357 ),
358 (
359 "prefix_nor",
360 Value::String("\"../300kbps/segment35.m4v\"".into()),
361
362 ),
363 ],
364 ),
365 (
366 "nrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
367 vec![
368 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
369 (
370 "prefix_sid",
371 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
372 ),
373 ],
374 ),
375 (
376 "nor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22",
377 vec![
378 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
379 (
380 "prefix_sid",
381 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
382 ),
383 (
384 "prefix_nor",
385 Value::String("\"../300kbps/track.m4v\"".into()),
386 ),
387 ],
388 ),
389 (
390 "bl%3D21300%2Cbr%3D3200%2Cbs%2Ccid%3D%22faec5fc2-ac30-11eabb37-0242ac130002%22%2Cd%3D4004%2Cdl%3D18500%2Cmtp%3D48100%2Cnor%3D%22..%252F300kbps%252Ftrack.m4v%22%2Cnrr%3D%2212323-48763%22%2Cot%3Dv%2Cpr%3D1.08%2Crtp%3D12000%2Csf%3Dd%2Csid%3D%226e2fb550-c457-11e9-bb97-0800200c9a66%22%2Cst%3Dv%2Csu%2Ctb%3D6000",
391 vec![
392 ("prefix_bl", Value::Int64(21300)),
393 ("prefix_bs", Value::Boolean(true)),
394 ("prefix_st", Value::String("v".into())),
395 ("prefix_ot", Value::String("v".into())),
396 (
397 "prefix_sid",
398 Value::String("\"6e2fb550-c457-11e9-bb97-0800200c9a66\"".into()),
399 ),
400 ("prefix_tb", Value::Int64(6000)),
401 ("prefix_d", Value::Int64(4004)),
402 (
403 "prefix_cid",
404 Value::String("\"faec5fc2-ac30-11eabb37-0242ac130002\"".into()),
405 ),
406 ("prefix_mtp", Value::Int64(48100)),
407 ("prefix_rtp", Value::Int64(12000)),
408 (
409 "prefix_nor",
410 Value::String("\"../300kbps/track.m4v\"".into()),
411 ),
412 ("prefix_sf", Value::String("d".into())),
413 ("prefix_br", Value::Int64(3200)),
414 ("prefix_nrr", Value::String("\"12323-48763\"".into())),
415 ("prefix_pr", Value::Float64(1.08)),
416 ("prefix_su", Value::Boolean(true)),
417 ("prefix_dl", Value::Int64(18500)),
418 ],
419 ),
420 ];
421
422 let field = Field::new("prefix", None);
423
424 let processor = CmcdProcessor {
425 fields: Fields::new(vec![field]),
426 ignore_missing: false,
427 };
428
429 for (s, vec) in ss.into_iter() {
430 let decoded = decode(s).unwrap().to_string();
431
432 let expected = vec
433 .into_iter()
434 .map(|(k, v)| (k.to_string(), v))
435 .collect::<PipelineMap>();
436
437 let actual = processor.parse("prefix", &decoded).unwrap();
438 assert_eq!(actual, expected);
439 }
440 }
441}