grib/
reader.rs

1use std::io::{self, Read, Seek, SeekFrom};
2
3use crate::{datatypes::*, error::*, helpers::read_as, SectionBody, SectionInfo};
4
5const SECT0_IS_MAGIC: &[u8] = b"GRIB";
6const SECT0_IS_MAGIC_SIZE: usize = SECT0_IS_MAGIC.len();
7const SECT0_IS_SIZE: usize = 16;
8const SECT_HEADER_SIZE: usize = 5;
9const SECT8_ES_MAGIC: &[u8] = b"7777";
10pub(crate) const SECT8_ES_SIZE: usize = SECT8_ES_MAGIC.len();
11
12/// # Example
13/// ```
14/// use grib::{Grib2SectionStream, Indicator, SectionBody, SectionInfo, SeekableGrib2Reader};
15///
16/// fn main() -> Result<(), Box<dyn std::error::Error>> {
17///     let f = std::fs::File::open(
18///         "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
19///     )?;
20///     let f = std::io::BufReader::new(f);
21///     let grib2_reader = SeekableGrib2Reader::new(f);
22///
23///     let mut sect_stream = Grib2SectionStream::new(grib2_reader);
24///     assert_eq!(
25///         sect_stream.next(),
26///         Some(Ok(SectionInfo {
27///             num: 0,
28///             offset: 0,
29///             size: 16,
30///             body: Some(SectionBody::Section0(Indicator {
31///                 discipline: 0,
32///                 total_length: 193,
33///             })),
34///         }))
35///     );
36///     Ok(())
37/// }
38/// ```
39pub struct Grib2SectionStream<R> {
40    reader: R,
41    whole_size: usize,
42    rest_size: usize,
43}
44
45impl<R> Grib2SectionStream<R> {
46    /// # Example
47    /// ```
48    /// use grib::{Grib2SectionStream, SeekableGrib2Reader};
49    ///
50    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
51    ///     let f = std::fs::File::open(
52    ///         "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
53    ///     )?;
54    ///     let mut f = std::io::BufReader::new(f);
55    ///     let grib2_reader = SeekableGrib2Reader::new(f);
56    ///     let _sect_stream = Grib2SectionStream::new(grib2_reader);
57    ///     Ok(())
58    /// }
59    /// ```
60    pub fn new(reader: R) -> Self {
61        Self {
62            reader,
63            whole_size: 0,
64            rest_size: 0,
65        }
66    }
67
68    pub fn into_reader(self) -> R {
69        self.reader
70    }
71}
72
73impl<R> Grib2SectionStream<R>
74where
75    R: Grib2Read,
76{
77    #[inline]
78    fn next_sect0(&mut self) -> Option<Result<SectionInfo, ParseError>> {
79        if self.whole_size == 0 {
80            // if the offset value is left at the initial value, reset it to the current
81            // position
82            let result = self.reset_pos();
83            if let Err(e) = result {
84                return Some(Err(ParseError::ReadError(format!(
85                    "resetting the initial position failed: {e}"
86                ))));
87            }
88        }
89        let result = self
90            .reader
91            .read_sect0()
92            .transpose()?
93            .map(|(offset, indicator)| {
94                self.whole_size += offset;
95                let offset = self.whole_size;
96                let message_size = indicator.total_length as usize;
97                self.whole_size += message_size;
98                let sect_info = SectionInfo {
99                    num: 0,
100                    offset,
101                    size: SECT0_IS_SIZE,
102                    body: Some(SectionBody::Section0(indicator)),
103                };
104                self.rest_size = message_size - SECT0_IS_SIZE;
105                sect_info
106            });
107        Some(result)
108    }
109
110    fn reset_pos(&mut self) -> Result<(), io::Error> {
111        let pos = self.reader.stream_position()?;
112        self.whole_size = pos as usize;
113        Ok(())
114    }
115
116    #[inline]
117    fn next_sect8(&mut self) -> Option<Result<SectionInfo, ParseError>> {
118        let result = self.reader.read_sect8().transpose()?.map(|_| {
119            let sect_info = SectionInfo {
120                num: 8,
121                offset: self.whole_size - self.rest_size,
122                size: SECT8_ES_SIZE,
123                body: None,
124            };
125            self.rest_size -= SECT8_ES_SIZE;
126            sect_info
127        });
128        Some(result)
129    }
130
131    #[inline]
132    fn next_sect(&mut self) -> Option<Result<SectionInfo, ParseError>> {
133        let result = self.reader.read_sect_header().transpose()?;
134        match result {
135            Ok(header) => {
136                let offset = self.whole_size - self.rest_size;
137                match self.reader.read_sect_payload(&header) {
138                    Ok(body) => {
139                        let body = Some(body);
140                        let (size, num) = header;
141                        self.rest_size -= size;
142                        Some(Ok(SectionInfo {
143                            num,
144                            offset,
145                            size,
146                            body,
147                        }))
148                    }
149                    Err(e) => Some(Err(e)),
150                }
151            }
152            Err(e) => Some(Err(e)),
153        }
154    }
155}
156
157impl<R> Iterator for Grib2SectionStream<R>
158where
159    R: Grib2Read,
160{
161    type Item = Result<SectionInfo, ParseError>;
162
163    fn next(&mut self) -> Option<Self::Item> {
164        match self.rest_size {
165            0 => self.next_sect0(),
166            SECT8_ES_SIZE => self.next_sect8(),
167            _ => self.next_sect(),
168        }
169    }
170}
171
172pub trait Grib2Read: Read + Seek {
173    /// Reads Section 0.
174    fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError>;
175
176    /// Reads Section 8.
177    fn read_sect8(&mut self) -> Result<Option<()>, ParseError>;
178
179    /// Reads a common header for Sections 1-7 and returns the section
180    /// size and number.
181    fn read_sect_as_slice(&mut self, sect: &SectionInfo) -> Result<Vec<u8>, ParseError>;
182    fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError>;
183    fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError>;
184    fn read_sect6_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
185    fn skip_sect7_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
186    fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError>;
187}
188
189pub struct SeekableGrib2Reader<R> {
190    reader: R,
191}
192
193impl<R> SeekableGrib2Reader<R> {
194    pub fn new(r: R) -> Self {
195        Self { reader: r }
196    }
197}
198
199impl<R: Read> Read for SeekableGrib2Reader<R> {
200    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201        self.reader.read(buf)
202    }
203
204    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
205        self.reader.read_exact(buf)
206    }
207}
208
209impl<S: Seek> Seek for SeekableGrib2Reader<S> {
210    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
211        self.reader.seek(pos)
212    }
213}
214
215macro_rules! check_size {
216    ($size:expr, $expected_size:expr) => {{
217        if $size == 0 {
218            return Ok(None);
219        }
220        if $size != $expected_size {
221            return Err(io::Error::new(
222                io::ErrorKind::UnexpectedEof,
223                "failed to fill whole buffer",
224            )
225            .into());
226        }
227    }};
228}
229
230impl<R: Read + Seek> Grib2Read for SeekableGrib2Reader<R> {
231    fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError> {
232        let mut buf = [0; 4096];
233        let mut offset = 0;
234
235        loop {
236            let size = self.read(&mut buf[..])?;
237            if size < SECT0_IS_SIZE {
238                return Ok(None);
239            }
240            let next_offset = size - SECT0_IS_SIZE + 1;
241            for pos in 0..next_offset {
242                if &buf[pos..pos + SECT0_IS_MAGIC_SIZE] == SECT0_IS_MAGIC {
243                    offset += pos;
244                    self.seek(SeekFrom::Current(
245                        (pos + SECT0_IS_SIZE) as i64 - size as i64,
246                    ))?;
247
248                    let indicator = Indicator::from_slice(&buf[pos..pos + SECT0_IS_SIZE])?;
249                    return Ok(Some((offset, indicator)));
250                }
251            }
252            self.seek(SeekFrom::Current(next_offset as i64 - size as i64))?;
253            offset += next_offset;
254        }
255    }
256
257    fn read_sect8(&mut self) -> Result<Option<()>, ParseError> {
258        let mut buf = [0; SECT8_ES_SIZE];
259        let size = self.read(&mut buf[..])?;
260        check_size!(size, buf.len());
261
262        if buf[..] != SECT8_ES_MAGIC[..] {
263            return Err(ParseError::EndSectionMismatch);
264        }
265
266        Ok(Some(()))
267    }
268
269    fn read_sect_as_slice(&mut self, sect: &SectionInfo) -> Result<Vec<u8>, ParseError> {
270        self.seek(SeekFrom::Start(sect.offset as u64))?;
271
272        let mut buf = vec![0; sect.size];
273        self.read_exact(buf.as_mut_slice())?;
274
275        Ok(buf)
276    }
277
278    fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError> {
279        let mut buf = [0; SECT_HEADER_SIZE];
280        let size = self.read(&mut buf[..])?;
281        check_size!(size, buf.len());
282
283        let sect_size = read_as!(u32, buf, 0) as usize;
284        let sect_num = buf[4];
285
286        Ok(Some((sect_size, sect_num)))
287    }
288
289    fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError> {
290        let (size, num) = header;
291        let body_size = size - SECT_HEADER_SIZE;
292        let body = match num {
293            1 => SectionBody::Section1(Identification::from_payload(
294                self.read_slice_without_offset_check(body_size)?,
295            )?),
296            2 => SectionBody::Section2(LocalUse::from_payload(
297                self.read_slice_without_offset_check(body_size)?,
298            )),
299            3 => SectionBody::Section3(GridDefinition::from_payload(
300                self.read_slice_without_offset_check(body_size)?,
301            )?),
302            4 => SectionBody::Section4(ProdDefinition::from_payload(
303                self.read_slice_without_offset_check(body_size)?,
304            )?),
305            5 => SectionBody::Section5(ReprDefinition::from_payload(
306                self.read_slice_without_offset_check(body_size)?,
307            )?),
308            6 => self.read_sect6_payload(body_size)?,
309            7 => self.skip_sect7_payload(body_size)?,
310            _ => return Err(ParseError::UnknownSectionNumber(*num)),
311        };
312
313        Ok(body)
314    }
315
316    fn read_sect6_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
317        let mut buf = [0; 1]; // octet 6
318        self.read_exact(&mut buf[..])?;
319
320        let len_extra = body_size - buf.len();
321        if len_extra > 0 {
322            let mut buf = vec![0; len_extra];
323            self.read_exact(&mut buf[..])?;
324        }
325
326        Ok(SectionBody::Section6(BitMap {
327            bitmap_indicator: buf[0],
328        }))
329    }
330
331    fn skip_sect7_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
332        self.seek(SeekFrom::Current(body_size as i64))?;
333
334        Ok(SectionBody::Section7)
335    }
336
337    fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError> {
338        let mut buf = vec![0; size];
339        self.read_exact(&mut buf[..])?;
340        Ok(buf.into_boxed_slice())
341    }
342}
343
344type SectHeader = (usize, u8);
345
346#[cfg(test)]
347mod tests {
348    use std::io::{Cursor, Write};
349
350    use super::*;
351
352    #[test]
353    fn read_one_grib2_message() -> Result<(), Box<dyn std::error::Error>> {
354        let f = std::fs::File::open(
355            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
356        )?;
357        let f = std::io::BufReader::new(f);
358
359        let grib2_reader = SeekableGrib2Reader::new(f);
360        let sect_stream = Grib2SectionStream::new(grib2_reader);
361        assert_eq!(
362            sect_stream
363                .take(10)
364                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
365                .collect::<Vec<_>>(),
366            vec![
367                Ok((0, 0, 16)),
368                Ok((1, 16, 21)),
369                Ok((2, 37, 27)),
370                Ok((3, 64, 35)),
371                Ok((4, 99, 58)),
372                Ok((5, 157, 21)),
373                Ok((6, 178, 6)),
374                Ok((7, 184, 5)),
375                Ok((8, 189, 4)),
376            ]
377        );
378
379        Ok(())
380    }
381
382    #[test]
383    fn read_multiple_grib2_messages() -> Result<(), Box<dyn std::error::Error>> {
384        let f = std::fs::File::open(
385            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
386        )?;
387        let mut f = std::io::BufReader::new(f);
388        let mut buf = Vec::new();
389        f.read_to_end(&mut buf)?;
390        let repeated_message = buf.repeat(2);
391        let f = Cursor::new(repeated_message);
392
393        let grib2_reader = SeekableGrib2Reader::new(f);
394        let sect_stream = Grib2SectionStream::new(grib2_reader);
395        assert_eq!(
396            sect_stream
397                .take(19)
398                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
399                .collect::<Vec<_>>(),
400            vec![
401                Ok((0, 0, 16)),
402                Ok((1, 16, 21)),
403                Ok((2, 37, 27)),
404                Ok((3, 64, 35)),
405                Ok((4, 99, 58)),
406                Ok((5, 157, 21)),
407                Ok((6, 178, 6)),
408                Ok((7, 184, 5)),
409                Ok((8, 189, 4)),
410                Ok((0, 193, 16)),
411                Ok((1, 209, 21)),
412                Ok((2, 230, 27)),
413                Ok((3, 257, 35)),
414                Ok((4, 292, 58)),
415                Ok((5, 350, 21)),
416                Ok((6, 371, 6)),
417                Ok((7, 377, 5)),
418                Ok((8, 382, 4)),
419            ]
420        );
421
422        Ok(())
423    }
424
425    #[test]
426    fn read_grib2_message_with_incomplete_section_0() -> Result<(), Box<dyn std::error::Error>> {
427        let f = std::fs::File::open(
428            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
429        )?;
430        let mut f = std::io::BufReader::new(f);
431        let mut buf = Vec::new();
432        f.read_to_end(&mut buf)?;
433
434        let mut extra_bytes = "extra".as_bytes().to_vec();
435        buf.append(&mut extra_bytes);
436        let f = Cursor::new(buf);
437
438        let grib2_reader = SeekableGrib2Reader::new(f);
439        let sect_stream = Grib2SectionStream::new(grib2_reader);
440        assert_eq!(
441            sect_stream
442                .take(10)
443                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
444                .collect::<Vec<_>>(),
445            vec![
446                Ok((0, 0, 16)),
447                Ok((1, 16, 21)),
448                Ok((2, 37, 27)),
449                Ok((3, 64, 35)),
450                Ok((4, 99, 58)),
451                Ok((5, 157, 21)),
452                Ok((6, 178, 6)),
453                Ok((7, 184, 5)),
454                Ok((8, 189, 4)),
455            ]
456        );
457
458        Ok(())
459    }
460
461    #[test]
462    fn read_grib2_message_with_incomplete_section_1() -> Result<(), Box<dyn std::error::Error>> {
463        let f = std::fs::File::open(
464            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
465        )?;
466        let mut f = std::io::BufReader::new(f);
467        let mut buf = Vec::new();
468        f.read_to_end(&mut buf)?;
469
470        let mut message_2_bytes = buf[..(SECT0_IS_SIZE + 1)].to_vec();
471        buf.append(&mut message_2_bytes);
472        let f = Cursor::new(buf);
473
474        let grib2_reader = SeekableGrib2Reader::new(f);
475        let sect_stream = Grib2SectionStream::new(grib2_reader);
476        assert_eq!(
477            sect_stream
478                .take(19)
479                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
480                .collect::<Vec<_>>(),
481            vec![
482                Ok((0, 0, 16)),
483                Ok((1, 16, 21)),
484                Ok((2, 37, 27)),
485                Ok((3, 64, 35)),
486                Ok((4, 99, 58)),
487                Ok((5, 157, 21)),
488                Ok((6, 178, 6)),
489                Ok((7, 184, 5)),
490                Ok((8, 189, 4)),
491                Ok((0, 193, 16)),
492                Err(ParseError::ReadError(
493                    "failed to fill whole buffer".to_owned()
494                ))
495            ]
496        );
497
498        Ok(())
499    }
500
501    #[test]
502    fn read_grib2_message_with_incomplete_section_8() -> Result<(), Box<dyn std::error::Error>> {
503        let f = std::fs::File::open(
504            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
505        )?;
506        let mut f = std::io::BufReader::new(f);
507        let mut buf = Vec::new();
508        f.read_to_end(&mut buf)?;
509
510        let mut repeated_message = buf.repeat(2);
511        repeated_message.pop();
512        let f = Cursor::new(repeated_message);
513
514        let grib2_reader = SeekableGrib2Reader::new(f);
515        let sect_stream = Grib2SectionStream::new(grib2_reader);
516        assert_eq!(
517            sect_stream
518                .take(19)
519                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
520                .collect::<Vec<_>>(),
521            vec![
522                Ok((0, 0, 16)),
523                Ok((1, 16, 21)),
524                Ok((2, 37, 27)),
525                Ok((3, 64, 35)),
526                Ok((4, 99, 58)),
527                Ok((5, 157, 21)),
528                Ok((6, 178, 6)),
529                Ok((7, 184, 5)),
530                Ok((8, 189, 4)),
531                Ok((0, 193, 16)),
532                Ok((1, 209, 21)),
533                Ok((2, 230, 27)),
534                Ok((3, 257, 35)),
535                Ok((4, 292, 58)),
536                Ok((5, 350, 21)),
537                Ok((6, 371, 6)),
538                Ok((7, 377, 5)),
539                Err(ParseError::ReadError(
540                    "failed to fill whole buffer".to_owned()
541                ))
542            ]
543        );
544
545        Ok(())
546    }
547
548    fn create_grib2_message_starting_from_non_zero_position(
549        header: &[u8],
550    ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
551        let mut buf = Vec::new();
552        buf.write_all(header)?;
553
554        let f = std::fs::File::open(
555            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
556        )?;
557        let mut f = std::io::BufReader::new(f);
558        f.read_to_end(&mut buf)?;
559
560        Ok(buf)
561    }
562
563    #[test]
564    fn read_grib2_message_starting_from_non_zero_position_after_seeking(
565    ) -> Result<(), Box<dyn std::error::Error>> {
566        let header_bytes_skipped = b"HEADER TO BE SKIPPED\n";
567        let buf = create_grib2_message_starting_from_non_zero_position(header_bytes_skipped)?;
568
569        let mut f = Cursor::new(buf);
570        f.seek(SeekFrom::Current(header_bytes_skipped.len() as i64))?;
571
572        let grib2_reader = SeekableGrib2Reader::new(f);
573        let sect_stream = Grib2SectionStream::new(grib2_reader);
574        assert_eq!(
575            sect_stream
576                .take(10)
577                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
578                .collect::<Vec<_>>(),
579            vec![
580                Ok((0, 21, 16)),
581                Ok((1, 37, 21)),
582                Ok((2, 58, 27)),
583                Ok((3, 85, 35)),
584                Ok((4, 120, 58)),
585                Ok((5, 178, 21)),
586                Ok((6, 199, 6)),
587                Ok((7, 205, 5)),
588                Ok((8, 210, 4)),
589            ]
590        );
591
592        Ok(())
593    }
594
595    macro_rules! test_reading_message_starting_from_non_zero_position {
596        ($(($name:ident, $header:expr, $base_offset:expr),)*) => ($(
597            #[test]
598            fn $name() -> Result<(), Box<dyn std::error::Error>> {
599            let header_bytes_skipped = $header;
600            let buf = create_grib2_message_starting_from_non_zero_position(&header_bytes_skipped)?;
601
602            let f = Cursor::new(buf);
603            let grib2_reader = SeekableGrib2Reader::new(f);
604            let sect_stream = Grib2SectionStream::new(grib2_reader);
605            assert_eq!(
606                sect_stream
607                    .take(10)
608                    .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
609                    .collect::<Vec<_>>(),
610                vec![
611                    Ok((0, $base_offset + 0, 16)),
612                    Ok((1, $base_offset + 16, 21)),
613                    Ok((2, $base_offset + 37, 27)),
614                    Ok((3, $base_offset + 64, 35)),
615                    Ok((4, $base_offset + 99, 58)),
616                    Ok((5, $base_offset + 157, 21)),
617                    Ok((6, $base_offset + 178, 6)),
618                    Ok((7, $base_offset + 184, 5)),
619                    Ok((8, $base_offset + 189, 4)),
620                ]
621            );
622
623            Ok(())
624            }
625        )*);
626    }
627
628    test_reading_message_starting_from_non_zero_position! {
629        (reading_message_using_read_sect0_0th_iteration, [0; 16], 16),
630        (reading_message_using_end_of_read_sect0_0th_iteration, [0; 4096 - 16], 4096 - 16),
631        (reading_message_using_read_sect0_0th_and_1st_iterations, [0; 4096 - 15], 4096 - 15),
632        (reading_message_using_read_sect0_1st_iteration, [0; 4096], 4096),
633    }
634}