1use std::io::{self, Read, Seek, SeekFrom};
2
3use crate::{datatypes::*, error::*, helpers::read_as, SectionBody, SectionInfo};
4
5const SECT0_IS_MAGIC: &[u8] = b"GRIB";
6const SECT0_IS_MAGIC_SIZE: usize = SECT0_IS_MAGIC.len();
7const SECT0_IS_SIZE: usize = 16;
8const SECT_HEADER_SIZE: usize = 5;
9const SECT8_ES_MAGIC: &[u8] = b"7777";
10pub(crate) const SECT8_ES_SIZE: usize = SECT8_ES_MAGIC.len();
11
12pub struct Grib2SectionStream<R> {
40 reader: R,
41 whole_size: usize,
42 rest_size: usize,
43}
44
45impl<R> Grib2SectionStream<R> {
46 pub fn new(reader: R) -> Self {
61 Self {
62 reader,
63 whole_size: 0,
64 rest_size: 0,
65 }
66 }
67
68 pub fn into_reader(self) -> R {
69 self.reader
70 }
71}
72
73impl<R> Grib2SectionStream<R>
74where
75 R: Grib2Read,
76{
77 #[inline]
78 fn next_sect0(&mut self) -> Option<Result<SectionInfo, ParseError>> {
79 if self.whole_size == 0 {
80 let result = self.reset_pos();
83 if let Err(e) = result {
84 return Some(Err(ParseError::ReadError(format!(
85 "resetting the initial position failed: {e}"
86 ))));
87 }
88 }
89 let result = self
90 .reader
91 .read_sect0()
92 .transpose()?
93 .map(|(offset, indicator)| {
94 self.whole_size += offset;
95 let offset = self.whole_size;
96 let message_size = indicator.total_length as usize;
97 self.whole_size += message_size;
98 let sect_info = SectionInfo {
99 num: 0,
100 offset,
101 size: SECT0_IS_SIZE,
102 body: Some(SectionBody::Section0(indicator)),
103 };
104 self.rest_size = message_size - SECT0_IS_SIZE;
105 sect_info
106 });
107 Some(result)
108 }
109
110 fn reset_pos(&mut self) -> Result<(), io::Error> {
111 let pos = self.reader.stream_position()?;
112 self.whole_size = pos as usize;
113 Ok(())
114 }
115
116 #[inline]
117 fn next_sect8(&mut self) -> Option<Result<SectionInfo, ParseError>> {
118 let result = self.reader.read_sect8().transpose()?.map(|_| {
119 let sect_info = SectionInfo {
120 num: 8,
121 offset: self.whole_size - self.rest_size,
122 size: SECT8_ES_SIZE,
123 body: None,
124 };
125 self.rest_size -= SECT8_ES_SIZE;
126 sect_info
127 });
128 Some(result)
129 }
130
131 #[inline]
132 fn next_sect(&mut self) -> Option<Result<SectionInfo, ParseError>> {
133 let result = self.reader.read_sect_header().transpose()?;
134 match result {
135 Ok(header) => {
136 let offset = self.whole_size - self.rest_size;
137 match self.reader.read_sect_payload(&header) {
138 Ok(body) => {
139 let body = Some(body);
140 let (size, num) = header;
141 self.rest_size -= size;
142 Some(Ok(SectionInfo {
143 num,
144 offset,
145 size,
146 body,
147 }))
148 }
149 Err(e) => Some(Err(e)),
150 }
151 }
152 Err(e) => Some(Err(e)),
153 }
154 }
155}
156
157impl<R> Iterator for Grib2SectionStream<R>
158where
159 R: Grib2Read,
160{
161 type Item = Result<SectionInfo, ParseError>;
162
163 fn next(&mut self) -> Option<Self::Item> {
164 match self.rest_size {
165 0 => self.next_sect0(),
166 SECT8_ES_SIZE => self.next_sect8(),
167 _ => self.next_sect(),
168 }
169 }
170}
171
172pub trait Grib2Read: Read + Seek {
173 fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError>;
175
176 fn read_sect8(&mut self) -> Result<Option<()>, ParseError>;
178
179 fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError>;
182 fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError>;
183 fn read_sect_payload_as_slice(&mut self, sect: &SectionInfo) -> Result<Box<[u8]>, ParseError>;
184 fn read_sect6_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
185 fn skip_sect7_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
186 fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError>;
187}
188
189pub struct SeekableGrib2Reader<R> {
190 reader: R,
191}
192
193impl<R> SeekableGrib2Reader<R> {
194 pub fn new(r: R) -> Self {
195 Self { reader: r }
196 }
197}
198
199impl<R: Read> Read for SeekableGrib2Reader<R> {
200 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201 self.reader.read(buf)
202 }
203
204 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
205 self.reader.read_exact(buf)
206 }
207}
208
209impl<S: Seek> Seek for SeekableGrib2Reader<S> {
210 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
211 self.reader.seek(pos)
212 }
213}
214
215macro_rules! check_size {
216 ($size:expr, $expected_size:expr) => {{
217 if $size == 0 {
218 return Ok(None);
219 }
220 if $size != $expected_size {
221 return Err(io::Error::new(
222 io::ErrorKind::UnexpectedEof,
223 "failed to fill whole buffer",
224 )
225 .into());
226 }
227 }};
228}
229
230impl<R: Read + Seek> Grib2Read for SeekableGrib2Reader<R> {
231 fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError> {
232 let mut buf = [0; 4096];
233 let mut offset = 0;
234
235 loop {
236 let size = self.read(&mut buf[..])?;
237 if size < SECT0_IS_SIZE {
238 return Ok(None);
239 }
240 let next_offset = size - SECT0_IS_SIZE + 1;
241 for pos in 0..next_offset {
242 if &buf[pos..pos + SECT0_IS_MAGIC_SIZE] == SECT0_IS_MAGIC {
243 offset += pos;
244 self.seek(SeekFrom::Current(
245 (pos + SECT0_IS_SIZE) as i64 - size as i64,
246 ))?;
247
248 let indicator = Indicator::from_slice(&buf[pos..pos + SECT0_IS_SIZE])?;
249 return Ok(Some((offset, indicator)));
250 }
251 }
252 self.seek(SeekFrom::Current(next_offset as i64 - size as i64))?;
253 offset += next_offset;
254 }
255 }
256
257 fn read_sect8(&mut self) -> Result<Option<()>, ParseError> {
258 let mut buf = [0; SECT8_ES_SIZE];
259 let size = self.read(&mut buf[..])?;
260 check_size!(size, buf.len());
261
262 if buf[..] != SECT8_ES_MAGIC[..] {
263 return Err(ParseError::EndSectionMismatch);
264 }
265
266 Ok(Some(()))
267 }
268
269 fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError> {
270 let mut buf = [0; SECT_HEADER_SIZE];
271 let size = self.read(&mut buf[..])?;
272 check_size!(size, buf.len());
273
274 let sect_size = read_as!(u32, buf, 0) as usize;
275 let sect_num = buf[4];
276
277 Ok(Some((sect_size, sect_num)))
278 }
279
280 fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError> {
281 let (size, num) = header;
282 let body_size = size - SECT_HEADER_SIZE;
283 let body = match num {
284 1 => SectionBody::Section1(Identification::from_payload(
285 self.read_slice_without_offset_check(body_size)?,
286 )?),
287 2 => SectionBody::Section2(LocalUse::from_payload(
288 self.read_slice_without_offset_check(body_size)?,
289 )),
290 3 => SectionBody::Section3(GridDefinition::from_payload(
291 self.read_slice_without_offset_check(body_size)?,
292 )?),
293 4 => SectionBody::Section4(ProdDefinition::from_payload(
294 self.read_slice_without_offset_check(body_size)?,
295 )?),
296 5 => SectionBody::Section5(ReprDefinition::from_payload(
297 self.read_slice_without_offset_check(body_size)?,
298 )?),
299 6 => self.read_sect6_payload(body_size)?,
300 7 => self.skip_sect7_payload(body_size)?,
301 _ => return Err(ParseError::UnknownSectionNumber(*num)),
302 };
303
304 Ok(body)
305 }
306
307 fn read_sect_payload_as_slice(&mut self, sect: &SectionInfo) -> Result<Box<[u8]>, ParseError> {
308 let body_offset = sect.offset + SECT_HEADER_SIZE;
309 self.seek(SeekFrom::Start(body_offset as u64))?;
310
311 let body_size = sect.size - SECT_HEADER_SIZE;
312 let mut buf = vec![0; body_size];
313 self.read_exact(buf.as_mut_slice())?;
314
315 Ok(buf.into_boxed_slice())
316 }
317
318 fn read_sect6_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
319 let mut buf = [0; 1]; self.read_exact(&mut buf[..])?;
321
322 let len_extra = body_size - buf.len();
323 if len_extra > 0 {
324 let mut buf = vec![0; len_extra];
325 self.read_exact(&mut buf[..])?;
326 }
327
328 Ok(SectionBody::Section6(BitMap {
329 bitmap_indicator: buf[0],
330 }))
331 }
332
333 fn skip_sect7_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
334 self.seek(SeekFrom::Current(body_size as i64))?;
335
336 Ok(SectionBody::Section7)
337 }
338
339 fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError> {
340 let mut buf = vec![0; size];
341 self.read_exact(&mut buf[..])?;
342 Ok(buf.into_boxed_slice())
343 }
344}
345
346type SectHeader = (usize, u8);
347
348#[cfg(test)]
349mod tests {
350 use std::io::{Cursor, Write};
351
352 use super::*;
353
354 #[test]
355 fn read_one_grib2_message() -> Result<(), Box<dyn std::error::Error>> {
356 let f = std::fs::File::open(
357 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
358 )?;
359 let f = std::io::BufReader::new(f);
360
361 let grib2_reader = SeekableGrib2Reader::new(f);
362 let sect_stream = Grib2SectionStream::new(grib2_reader);
363 assert_eq!(
364 sect_stream
365 .take(10)
366 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
367 .collect::<Vec<_>>(),
368 vec![
369 Ok((0, 0, 16)),
370 Ok((1, 16, 21)),
371 Ok((2, 37, 27)),
372 Ok((3, 64, 35)),
373 Ok((4, 99, 58)),
374 Ok((5, 157, 21)),
375 Ok((6, 178, 6)),
376 Ok((7, 184, 5)),
377 Ok((8, 189, 4)),
378 ]
379 );
380
381 Ok(())
382 }
383
384 #[test]
385 fn read_multiple_grib2_messages() -> Result<(), Box<dyn std::error::Error>> {
386 let f = std::fs::File::open(
387 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
388 )?;
389 let mut f = std::io::BufReader::new(f);
390 let mut buf = Vec::new();
391 f.read_to_end(&mut buf)?;
392 let repeated_message = buf.repeat(2);
393 let f = Cursor::new(repeated_message);
394
395 let grib2_reader = SeekableGrib2Reader::new(f);
396 let sect_stream = Grib2SectionStream::new(grib2_reader);
397 assert_eq!(
398 sect_stream
399 .take(19)
400 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
401 .collect::<Vec<_>>(),
402 vec![
403 Ok((0, 0, 16)),
404 Ok((1, 16, 21)),
405 Ok((2, 37, 27)),
406 Ok((3, 64, 35)),
407 Ok((4, 99, 58)),
408 Ok((5, 157, 21)),
409 Ok((6, 178, 6)),
410 Ok((7, 184, 5)),
411 Ok((8, 189, 4)),
412 Ok((0, 193, 16)),
413 Ok((1, 209, 21)),
414 Ok((2, 230, 27)),
415 Ok((3, 257, 35)),
416 Ok((4, 292, 58)),
417 Ok((5, 350, 21)),
418 Ok((6, 371, 6)),
419 Ok((7, 377, 5)),
420 Ok((8, 382, 4)),
421 ]
422 );
423
424 Ok(())
425 }
426
427 #[test]
428 fn read_grib2_message_with_incomplete_section_0() -> Result<(), Box<dyn std::error::Error>> {
429 let f = std::fs::File::open(
430 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
431 )?;
432 let mut f = std::io::BufReader::new(f);
433 let mut buf = Vec::new();
434 f.read_to_end(&mut buf)?;
435
436 let mut extra_bytes = "extra".as_bytes().to_vec();
437 buf.append(&mut extra_bytes);
438 let f = Cursor::new(buf);
439
440 let grib2_reader = SeekableGrib2Reader::new(f);
441 let sect_stream = Grib2SectionStream::new(grib2_reader);
442 assert_eq!(
443 sect_stream
444 .take(10)
445 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
446 .collect::<Vec<_>>(),
447 vec![
448 Ok((0, 0, 16)),
449 Ok((1, 16, 21)),
450 Ok((2, 37, 27)),
451 Ok((3, 64, 35)),
452 Ok((4, 99, 58)),
453 Ok((5, 157, 21)),
454 Ok((6, 178, 6)),
455 Ok((7, 184, 5)),
456 Ok((8, 189, 4)),
457 ]
458 );
459
460 Ok(())
461 }
462
463 #[test]
464 fn read_grib2_message_with_incomplete_section_1() -> Result<(), Box<dyn std::error::Error>> {
465 let f = std::fs::File::open(
466 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
467 )?;
468 let mut f = std::io::BufReader::new(f);
469 let mut buf = Vec::new();
470 f.read_to_end(&mut buf)?;
471
472 let mut message_2_bytes = buf[..(SECT0_IS_SIZE + 1)].to_vec();
473 buf.append(&mut message_2_bytes);
474 let f = Cursor::new(buf);
475
476 let grib2_reader = SeekableGrib2Reader::new(f);
477 let sect_stream = Grib2SectionStream::new(grib2_reader);
478 assert_eq!(
479 sect_stream
480 .take(19)
481 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
482 .collect::<Vec<_>>(),
483 vec![
484 Ok((0, 0, 16)),
485 Ok((1, 16, 21)),
486 Ok((2, 37, 27)),
487 Ok((3, 64, 35)),
488 Ok((4, 99, 58)),
489 Ok((5, 157, 21)),
490 Ok((6, 178, 6)),
491 Ok((7, 184, 5)),
492 Ok((8, 189, 4)),
493 Ok((0, 193, 16)),
494 Err(ParseError::ReadError(
495 "failed to fill whole buffer".to_owned()
496 ))
497 ]
498 );
499
500 Ok(())
501 }
502
503 #[test]
504 fn read_grib2_message_with_incomplete_section_8() -> Result<(), Box<dyn std::error::Error>> {
505 let f = std::fs::File::open(
506 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
507 )?;
508 let mut f = std::io::BufReader::new(f);
509 let mut buf = Vec::new();
510 f.read_to_end(&mut buf)?;
511
512 let mut repeated_message = buf.repeat(2);
513 repeated_message.pop();
514 let f = Cursor::new(repeated_message);
515
516 let grib2_reader = SeekableGrib2Reader::new(f);
517 let sect_stream = Grib2SectionStream::new(grib2_reader);
518 assert_eq!(
519 sect_stream
520 .take(19)
521 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
522 .collect::<Vec<_>>(),
523 vec![
524 Ok((0, 0, 16)),
525 Ok((1, 16, 21)),
526 Ok((2, 37, 27)),
527 Ok((3, 64, 35)),
528 Ok((4, 99, 58)),
529 Ok((5, 157, 21)),
530 Ok((6, 178, 6)),
531 Ok((7, 184, 5)),
532 Ok((8, 189, 4)),
533 Ok((0, 193, 16)),
534 Ok((1, 209, 21)),
535 Ok((2, 230, 27)),
536 Ok((3, 257, 35)),
537 Ok((4, 292, 58)),
538 Ok((5, 350, 21)),
539 Ok((6, 371, 6)),
540 Ok((7, 377, 5)),
541 Err(ParseError::ReadError(
542 "failed to fill whole buffer".to_owned()
543 ))
544 ]
545 );
546
547 Ok(())
548 }
549
550 fn create_grib2_message_starting_from_non_zero_position(
551 header: &[u8],
552 ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
553 let mut buf = Vec::new();
554 buf.write_all(header)?;
555
556 let f = std::fs::File::open(
557 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
558 )?;
559 let mut f = std::io::BufReader::new(f);
560 f.read_to_end(&mut buf)?;
561
562 Ok(buf)
563 }
564
565 #[test]
566 fn read_grib2_message_starting_from_non_zero_position_after_seeking(
567 ) -> Result<(), Box<dyn std::error::Error>> {
568 let header_bytes_skipped = b"HEADER TO BE SKIPPED\n";
569 let buf = create_grib2_message_starting_from_non_zero_position(header_bytes_skipped)?;
570
571 let mut f = Cursor::new(buf);
572 f.seek(SeekFrom::Current(header_bytes_skipped.len() as i64))?;
573
574 let grib2_reader = SeekableGrib2Reader::new(f);
575 let sect_stream = Grib2SectionStream::new(grib2_reader);
576 assert_eq!(
577 sect_stream
578 .take(10)
579 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
580 .collect::<Vec<_>>(),
581 vec![
582 Ok((0, 21, 16)),
583 Ok((1, 37, 21)),
584 Ok((2, 58, 27)),
585 Ok((3, 85, 35)),
586 Ok((4, 120, 58)),
587 Ok((5, 178, 21)),
588 Ok((6, 199, 6)),
589 Ok((7, 205, 5)),
590 Ok((8, 210, 4)),
591 ]
592 );
593
594 Ok(())
595 }
596
597 macro_rules! test_reading_message_starting_from_non_zero_position {
598 ($(($name:ident, $header:expr, $base_offset:expr),)*) => ($(
599 #[test]
600 fn $name() -> Result<(), Box<dyn std::error::Error>> {
601 let header_bytes_skipped = $header;
602 let buf = create_grib2_message_starting_from_non_zero_position(&header_bytes_skipped)?;
603
604 let f = Cursor::new(buf);
605 let grib2_reader = SeekableGrib2Reader::new(f);
606 let sect_stream = Grib2SectionStream::new(grib2_reader);
607 assert_eq!(
608 sect_stream
609 .take(10)
610 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
611 .collect::<Vec<_>>(),
612 vec![
613 Ok((0, $base_offset + 0, 16)),
614 Ok((1, $base_offset + 16, 21)),
615 Ok((2, $base_offset + 37, 27)),
616 Ok((3, $base_offset + 64, 35)),
617 Ok((4, $base_offset + 99, 58)),
618 Ok((5, $base_offset + 157, 21)),
619 Ok((6, $base_offset + 178, 6)),
620 Ok((7, $base_offset + 184, 5)),
621 Ok((8, $base_offset + 189, 4)),
622 ]
623 );
624
625 Ok(())
626 }
627 )*);
628 }
629
630 test_reading_message_starting_from_non_zero_position! {
631 (reading_message_using_read_sect0_0th_iteration, [0; 16], 16),
632 (reading_message_using_end_of_read_sect0_0th_iteration, [0; 4096 - 16], 4096 - 16),
633 (reading_message_using_read_sect0_0th_and_1st_iterations, [0; 4096 - 15], 4096 - 15),
634 (reading_message_using_read_sect0_1st_iteration, [0; 4096], 4096),
635 }
636}