grib/
reader.rs

1use std::io::{self, Read, Seek, SeekFrom};
2
3use crate::{datatypes::*, error::*, helpers::read_as, SectionBody, SectionInfo};
4
5const SECT0_IS_MAGIC: &[u8] = b"GRIB";
6const SECT0_IS_MAGIC_SIZE: usize = SECT0_IS_MAGIC.len();
7const SECT0_IS_SIZE: usize = 16;
8const SECT_HEADER_SIZE: usize = 5;
9const SECT8_ES_MAGIC: &[u8] = b"7777";
10pub(crate) const SECT8_ES_SIZE: usize = SECT8_ES_MAGIC.len();
11
12/// # Example
13/// ```
14/// use grib::{Grib2SectionStream, Indicator, SectionBody, SectionInfo, SeekableGrib2Reader};
15///
16/// fn main() -> Result<(), Box<dyn std::error::Error>> {
17///     let f = std::fs::File::open(
18///         "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
19///     )?;
20///     let f = std::io::BufReader::new(f);
21///     let grib2_reader = SeekableGrib2Reader::new(f);
22///
23///     let mut sect_stream = Grib2SectionStream::new(grib2_reader);
24///     assert_eq!(
25///         sect_stream.next(),
26///         Some(Ok(SectionInfo {
27///             num: 0,
28///             offset: 0,
29///             size: 16,
30///             body: Some(SectionBody::Section0(Indicator {
31///                 discipline: 0,
32///                 total_length: 193,
33///             })),
34///         }))
35///     );
36///     Ok(())
37/// }
38/// ```
39pub struct Grib2SectionStream<R> {
40    reader: R,
41    whole_size: usize,
42    rest_size: usize,
43}
44
45impl<R> Grib2SectionStream<R> {
46    /// # Example
47    /// ```
48    /// use grib::{Grib2SectionStream, SeekableGrib2Reader};
49    ///
50    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
51    ///     let f = std::fs::File::open(
52    ///         "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
53    ///     )?;
54    ///     let mut f = std::io::BufReader::new(f);
55    ///     let grib2_reader = SeekableGrib2Reader::new(f);
56    ///     let _sect_stream = Grib2SectionStream::new(grib2_reader);
57    ///     Ok(())
58    /// }
59    /// ```
60    pub fn new(reader: R) -> Self {
61        Self {
62            reader,
63            whole_size: 0,
64            rest_size: 0,
65        }
66    }
67
68    pub fn into_reader(self) -> R {
69        self.reader
70    }
71}
72
73impl<R> Grib2SectionStream<R>
74where
75    R: Grib2Read,
76{
77    #[inline]
78    fn next_sect0(&mut self) -> Option<Result<SectionInfo, ParseError>> {
79        if self.whole_size == 0 {
80            // if the offset value is left at the initial value, reset it to the current
81            // position
82            let result = self.reset_pos();
83            if let Err(e) = result {
84                return Some(Err(ParseError::ReadError(format!(
85                    "resetting the initial position failed: {e}"
86                ))));
87            }
88        }
89        let result = self
90            .reader
91            .read_sect0()
92            .transpose()?
93            .map(|(offset, indicator)| {
94                self.whole_size += offset;
95                let offset = self.whole_size;
96                let message_size = indicator.total_length as usize;
97                self.whole_size += message_size;
98                let sect_info = SectionInfo {
99                    num: 0,
100                    offset,
101                    size: SECT0_IS_SIZE,
102                    body: Some(SectionBody::Section0(indicator)),
103                };
104                self.rest_size = message_size - SECT0_IS_SIZE;
105                sect_info
106            });
107        Some(result)
108    }
109
110    fn reset_pos(&mut self) -> Result<(), io::Error> {
111        let pos = self.reader.stream_position()?;
112        self.whole_size = pos as usize;
113        Ok(())
114    }
115
116    #[inline]
117    fn next_sect8(&mut self) -> Option<Result<SectionInfo, ParseError>> {
118        let result = self.reader.read_sect8().transpose()?.map(|_| {
119            let sect_info = SectionInfo {
120                num: 8,
121                offset: self.whole_size - self.rest_size,
122                size: SECT8_ES_SIZE,
123                body: None,
124            };
125            self.rest_size -= SECT8_ES_SIZE;
126            sect_info
127        });
128        Some(result)
129    }
130
131    #[inline]
132    fn next_sect(&mut self) -> Option<Result<SectionInfo, ParseError>> {
133        let result = self.reader.read_sect_header().transpose()?;
134        match result {
135            Ok(header) => {
136                let offset = self.whole_size - self.rest_size;
137                match self.reader.read_sect_payload(&header) {
138                    Ok(body) => {
139                        let body = Some(body);
140                        let (size, num) = header;
141                        self.rest_size -= size;
142                        Some(Ok(SectionInfo {
143                            num,
144                            offset,
145                            size,
146                            body,
147                        }))
148                    }
149                    Err(e) => Some(Err(e)),
150                }
151            }
152            Err(e) => Some(Err(e)),
153        }
154    }
155}
156
157impl<R> Iterator for Grib2SectionStream<R>
158where
159    R: Grib2Read,
160{
161    type Item = Result<SectionInfo, ParseError>;
162
163    fn next(&mut self) -> Option<Self::Item> {
164        match self.rest_size {
165            0 => self.next_sect0(),
166            SECT8_ES_SIZE => self.next_sect8(),
167            _ => self.next_sect(),
168        }
169    }
170}
171
172pub trait Grib2Read: Read + Seek {
173    /// Reads Section 0.
174    fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError>;
175
176    /// Reads Section 8.
177    fn read_sect8(&mut self) -> Result<Option<()>, ParseError>;
178
179    /// Reads a common header for Sections 1-7 and returns the section
180    /// size and number.
181    fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError>;
182    fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError>;
183    fn read_sect_payload_as_slice(&mut self, sect: &SectionInfo) -> Result<Box<[u8]>, ParseError>;
184    fn read_sect6_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
185    fn skip_sect7_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
186    fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError>;
187}
188
189pub struct SeekableGrib2Reader<R> {
190    reader: R,
191}
192
193impl<R> SeekableGrib2Reader<R> {
194    pub fn new(r: R) -> Self {
195        Self { reader: r }
196    }
197}
198
199impl<R: Read> Read for SeekableGrib2Reader<R> {
200    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201        self.reader.read(buf)
202    }
203
204    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
205        self.reader.read_exact(buf)
206    }
207}
208
209impl<S: Seek> Seek for SeekableGrib2Reader<S> {
210    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
211        self.reader.seek(pos)
212    }
213}
214
215macro_rules! check_size {
216    ($size:expr, $expected_size:expr) => {{
217        if $size == 0 {
218            return Ok(None);
219        }
220        if $size != $expected_size {
221            return Err(io::Error::new(
222                io::ErrorKind::UnexpectedEof,
223                "failed to fill whole buffer",
224            )
225            .into());
226        }
227    }};
228}
229
230impl<R: Read + Seek> Grib2Read for SeekableGrib2Reader<R> {
231    fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError> {
232        let mut buf = [0; 4096];
233        let mut offset = 0;
234
235        loop {
236            let size = self.read(&mut buf[..])?;
237            if size < SECT0_IS_SIZE {
238                return Ok(None);
239            }
240            let next_offset = size - SECT0_IS_SIZE + 1;
241            for pos in 0..next_offset {
242                if &buf[pos..pos + SECT0_IS_MAGIC_SIZE] == SECT0_IS_MAGIC {
243                    offset += pos;
244                    self.seek(SeekFrom::Current(
245                        (pos + SECT0_IS_SIZE) as i64 - size as i64,
246                    ))?;
247
248                    let indicator = Indicator::from_slice(&buf[pos..pos + SECT0_IS_SIZE])?;
249                    return Ok(Some((offset, indicator)));
250                }
251            }
252            self.seek(SeekFrom::Current(next_offset as i64 - size as i64))?;
253            offset += next_offset;
254        }
255    }
256
257    fn read_sect8(&mut self) -> Result<Option<()>, ParseError> {
258        let mut buf = [0; SECT8_ES_SIZE];
259        let size = self.read(&mut buf[..])?;
260        check_size!(size, buf.len());
261
262        if buf[..] != SECT8_ES_MAGIC[..] {
263            return Err(ParseError::EndSectionMismatch);
264        }
265
266        Ok(Some(()))
267    }
268
269    fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError> {
270        let mut buf = [0; SECT_HEADER_SIZE];
271        let size = self.read(&mut buf[..])?;
272        check_size!(size, buf.len());
273
274        let sect_size = read_as!(u32, buf, 0) as usize;
275        let sect_num = buf[4];
276
277        Ok(Some((sect_size, sect_num)))
278    }
279
280    fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError> {
281        let (size, num) = header;
282        let body_size = size - SECT_HEADER_SIZE;
283        let body = match num {
284            1 => SectionBody::Section1(Identification::from_payload(
285                self.read_slice_without_offset_check(body_size)?,
286            )?),
287            2 => SectionBody::Section2(LocalUse::from_payload(
288                self.read_slice_without_offset_check(body_size)?,
289            )),
290            3 => SectionBody::Section3(GridDefinition::from_payload(
291                self.read_slice_without_offset_check(body_size)?,
292            )?),
293            4 => SectionBody::Section4(ProdDefinition::from_payload(
294                self.read_slice_without_offset_check(body_size)?,
295            )?),
296            5 => SectionBody::Section5(ReprDefinition::from_payload(
297                self.read_slice_without_offset_check(body_size)?,
298            )?),
299            6 => self.read_sect6_payload(body_size)?,
300            7 => self.skip_sect7_payload(body_size)?,
301            _ => return Err(ParseError::UnknownSectionNumber(*num)),
302        };
303
304        Ok(body)
305    }
306
307    fn read_sect_payload_as_slice(&mut self, sect: &SectionInfo) -> Result<Box<[u8]>, ParseError> {
308        let body_offset = sect.offset + SECT_HEADER_SIZE;
309        self.seek(SeekFrom::Start(body_offset as u64))?;
310
311        let body_size = sect.size - SECT_HEADER_SIZE;
312        let mut buf = vec![0; body_size];
313        self.read_exact(buf.as_mut_slice())?;
314
315        Ok(buf.into_boxed_slice())
316    }
317
318    fn read_sect6_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
319        let mut buf = [0; 1]; // octet 6
320        self.read_exact(&mut buf[..])?;
321
322        let len_extra = body_size - buf.len();
323        if len_extra > 0 {
324            let mut buf = vec![0; len_extra];
325            self.read_exact(&mut buf[..])?;
326        }
327
328        Ok(SectionBody::Section6(BitMap {
329            bitmap_indicator: buf[0],
330        }))
331    }
332
333    fn skip_sect7_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
334        self.seek(SeekFrom::Current(body_size as i64))?;
335
336        Ok(SectionBody::Section7)
337    }
338
339    fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError> {
340        let mut buf = vec![0; size];
341        self.read_exact(&mut buf[..])?;
342        Ok(buf.into_boxed_slice())
343    }
344}
345
346type SectHeader = (usize, u8);
347
348#[cfg(test)]
349mod tests {
350    use std::io::{Cursor, Write};
351
352    use super::*;
353
354    #[test]
355    fn read_one_grib2_message() -> Result<(), Box<dyn std::error::Error>> {
356        let f = std::fs::File::open(
357            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
358        )?;
359        let f = std::io::BufReader::new(f);
360
361        let grib2_reader = SeekableGrib2Reader::new(f);
362        let sect_stream = Grib2SectionStream::new(grib2_reader);
363        assert_eq!(
364            sect_stream
365                .take(10)
366                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
367                .collect::<Vec<_>>(),
368            vec![
369                Ok((0, 0, 16)),
370                Ok((1, 16, 21)),
371                Ok((2, 37, 27)),
372                Ok((3, 64, 35)),
373                Ok((4, 99, 58)),
374                Ok((5, 157, 21)),
375                Ok((6, 178, 6)),
376                Ok((7, 184, 5)),
377                Ok((8, 189, 4)),
378            ]
379        );
380
381        Ok(())
382    }
383
384    #[test]
385    fn read_multiple_grib2_messages() -> Result<(), Box<dyn std::error::Error>> {
386        let f = std::fs::File::open(
387            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
388        )?;
389        let mut f = std::io::BufReader::new(f);
390        let mut buf = Vec::new();
391        f.read_to_end(&mut buf)?;
392        let repeated_message = buf.repeat(2);
393        let f = Cursor::new(repeated_message);
394
395        let grib2_reader = SeekableGrib2Reader::new(f);
396        let sect_stream = Grib2SectionStream::new(grib2_reader);
397        assert_eq!(
398            sect_stream
399                .take(19)
400                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
401                .collect::<Vec<_>>(),
402            vec![
403                Ok((0, 0, 16)),
404                Ok((1, 16, 21)),
405                Ok((2, 37, 27)),
406                Ok((3, 64, 35)),
407                Ok((4, 99, 58)),
408                Ok((5, 157, 21)),
409                Ok((6, 178, 6)),
410                Ok((7, 184, 5)),
411                Ok((8, 189, 4)),
412                Ok((0, 193, 16)),
413                Ok((1, 209, 21)),
414                Ok((2, 230, 27)),
415                Ok((3, 257, 35)),
416                Ok((4, 292, 58)),
417                Ok((5, 350, 21)),
418                Ok((6, 371, 6)),
419                Ok((7, 377, 5)),
420                Ok((8, 382, 4)),
421            ]
422        );
423
424        Ok(())
425    }
426
427    #[test]
428    fn read_grib2_message_with_incomplete_section_0() -> Result<(), Box<dyn std::error::Error>> {
429        let f = std::fs::File::open(
430            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
431        )?;
432        let mut f = std::io::BufReader::new(f);
433        let mut buf = Vec::new();
434        f.read_to_end(&mut buf)?;
435
436        let mut extra_bytes = "extra".as_bytes().to_vec();
437        buf.append(&mut extra_bytes);
438        let f = Cursor::new(buf);
439
440        let grib2_reader = SeekableGrib2Reader::new(f);
441        let sect_stream = Grib2SectionStream::new(grib2_reader);
442        assert_eq!(
443            sect_stream
444                .take(10)
445                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
446                .collect::<Vec<_>>(),
447            vec![
448                Ok((0, 0, 16)),
449                Ok((1, 16, 21)),
450                Ok((2, 37, 27)),
451                Ok((3, 64, 35)),
452                Ok((4, 99, 58)),
453                Ok((5, 157, 21)),
454                Ok((6, 178, 6)),
455                Ok((7, 184, 5)),
456                Ok((8, 189, 4)),
457            ]
458        );
459
460        Ok(())
461    }
462
463    #[test]
464    fn read_grib2_message_with_incomplete_section_1() -> Result<(), Box<dyn std::error::Error>> {
465        let f = std::fs::File::open(
466            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
467        )?;
468        let mut f = std::io::BufReader::new(f);
469        let mut buf = Vec::new();
470        f.read_to_end(&mut buf)?;
471
472        let mut message_2_bytes = buf[..(SECT0_IS_SIZE + 1)].to_vec();
473        buf.append(&mut message_2_bytes);
474        let f = Cursor::new(buf);
475
476        let grib2_reader = SeekableGrib2Reader::new(f);
477        let sect_stream = Grib2SectionStream::new(grib2_reader);
478        assert_eq!(
479            sect_stream
480                .take(19)
481                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
482                .collect::<Vec<_>>(),
483            vec![
484                Ok((0, 0, 16)),
485                Ok((1, 16, 21)),
486                Ok((2, 37, 27)),
487                Ok((3, 64, 35)),
488                Ok((4, 99, 58)),
489                Ok((5, 157, 21)),
490                Ok((6, 178, 6)),
491                Ok((7, 184, 5)),
492                Ok((8, 189, 4)),
493                Ok((0, 193, 16)),
494                Err(ParseError::ReadError(
495                    "failed to fill whole buffer".to_owned()
496                ))
497            ]
498        );
499
500        Ok(())
501    }
502
503    #[test]
504    fn read_grib2_message_with_incomplete_section_8() -> Result<(), Box<dyn std::error::Error>> {
505        let f = std::fs::File::open(
506            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
507        )?;
508        let mut f = std::io::BufReader::new(f);
509        let mut buf = Vec::new();
510        f.read_to_end(&mut buf)?;
511
512        let mut repeated_message = buf.repeat(2);
513        repeated_message.pop();
514        let f = Cursor::new(repeated_message);
515
516        let grib2_reader = SeekableGrib2Reader::new(f);
517        let sect_stream = Grib2SectionStream::new(grib2_reader);
518        assert_eq!(
519            sect_stream
520                .take(19)
521                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
522                .collect::<Vec<_>>(),
523            vec![
524                Ok((0, 0, 16)),
525                Ok((1, 16, 21)),
526                Ok((2, 37, 27)),
527                Ok((3, 64, 35)),
528                Ok((4, 99, 58)),
529                Ok((5, 157, 21)),
530                Ok((6, 178, 6)),
531                Ok((7, 184, 5)),
532                Ok((8, 189, 4)),
533                Ok((0, 193, 16)),
534                Ok((1, 209, 21)),
535                Ok((2, 230, 27)),
536                Ok((3, 257, 35)),
537                Ok((4, 292, 58)),
538                Ok((5, 350, 21)),
539                Ok((6, 371, 6)),
540                Ok((7, 377, 5)),
541                Err(ParseError::ReadError(
542                    "failed to fill whole buffer".to_owned()
543                ))
544            ]
545        );
546
547        Ok(())
548    }
549
550    fn create_grib2_message_starting_from_non_zero_position(
551        header: &[u8],
552    ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
553        let mut buf = Vec::new();
554        buf.write_all(header)?;
555
556        let f = std::fs::File::open(
557            "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
558        )?;
559        let mut f = std::io::BufReader::new(f);
560        f.read_to_end(&mut buf)?;
561
562        Ok(buf)
563    }
564
565    #[test]
566    fn read_grib2_message_starting_from_non_zero_position_after_seeking(
567    ) -> Result<(), Box<dyn std::error::Error>> {
568        let header_bytes_skipped = b"HEADER TO BE SKIPPED\n";
569        let buf = create_grib2_message_starting_from_non_zero_position(header_bytes_skipped)?;
570
571        let mut f = Cursor::new(buf);
572        f.seek(SeekFrom::Current(header_bytes_skipped.len() as i64))?;
573
574        let grib2_reader = SeekableGrib2Reader::new(f);
575        let sect_stream = Grib2SectionStream::new(grib2_reader);
576        assert_eq!(
577            sect_stream
578                .take(10)
579                .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
580                .collect::<Vec<_>>(),
581            vec![
582                Ok((0, 21, 16)),
583                Ok((1, 37, 21)),
584                Ok((2, 58, 27)),
585                Ok((3, 85, 35)),
586                Ok((4, 120, 58)),
587                Ok((5, 178, 21)),
588                Ok((6, 199, 6)),
589                Ok((7, 205, 5)),
590                Ok((8, 210, 4)),
591            ]
592        );
593
594        Ok(())
595    }
596
597    macro_rules! test_reading_message_starting_from_non_zero_position {
598        ($(($name:ident, $header:expr, $base_offset:expr),)*) => ($(
599            #[test]
600            fn $name() -> Result<(), Box<dyn std::error::Error>> {
601            let header_bytes_skipped = $header;
602            let buf = create_grib2_message_starting_from_non_zero_position(&header_bytes_skipped)?;
603
604            let f = Cursor::new(buf);
605            let grib2_reader = SeekableGrib2Reader::new(f);
606            let sect_stream = Grib2SectionStream::new(grib2_reader);
607            assert_eq!(
608                sect_stream
609                    .take(10)
610                    .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
611                    .collect::<Vec<_>>(),
612                vec![
613                    Ok((0, $base_offset + 0, 16)),
614                    Ok((1, $base_offset + 16, 21)),
615                    Ok((2, $base_offset + 37, 27)),
616                    Ok((3, $base_offset + 64, 35)),
617                    Ok((4, $base_offset + 99, 58)),
618                    Ok((5, $base_offset + 157, 21)),
619                    Ok((6, $base_offset + 178, 6)),
620                    Ok((7, $base_offset + 184, 5)),
621                    Ok((8, $base_offset + 189, 4)),
622                ]
623            );
624
625            Ok(())
626            }
627        )*);
628    }
629
630    test_reading_message_starting_from_non_zero_position! {
631        (reading_message_using_read_sect0_0th_iteration, [0; 16], 16),
632        (reading_message_using_end_of_read_sect0_0th_iteration, [0; 4096 - 16], 4096 - 16),
633        (reading_message_using_read_sect0_0th_and_1st_iterations, [0; 4096 - 15], 4096 - 15),
634        (reading_message_using_read_sect0_1st_iteration, [0; 4096], 4096),
635    }
636}