1use std::io::{self, Read, Seek, SeekFrom};
2
3use crate::{datatypes::*, error::*, helpers::read_as, SectionBody, SectionInfo};
4
5const SECT0_IS_MAGIC: &[u8] = b"GRIB";
6const SECT0_IS_MAGIC_SIZE: usize = SECT0_IS_MAGIC.len();
7const SECT0_IS_SIZE: usize = 16;
8const SECT_HEADER_SIZE: usize = 5;
9const SECT8_ES_MAGIC: &[u8] = b"7777";
10pub(crate) const SECT8_ES_SIZE: usize = SECT8_ES_MAGIC.len();
11
12pub struct Grib2SectionStream<R> {
40 reader: R,
41 whole_size: usize,
42 rest_size: usize,
43}
44
45impl<R> Grib2SectionStream<R> {
46 pub fn new(reader: R) -> Self {
61 Self {
62 reader,
63 whole_size: 0,
64 rest_size: 0,
65 }
66 }
67
68 pub fn into_reader(self) -> R {
69 self.reader
70 }
71}
72
73impl<R> Grib2SectionStream<R>
74where
75 R: Grib2Read,
76{
77 #[inline]
78 fn next_sect0(&mut self) -> Option<Result<SectionInfo, ParseError>> {
79 if self.whole_size == 0 {
80 let result = self.reset_pos();
83 if let Err(e) = result {
84 return Some(Err(ParseError::ReadError(format!(
85 "resetting the initial position failed: {e}"
86 ))));
87 }
88 }
89 let result = self
90 .reader
91 .read_sect0()
92 .transpose()?
93 .map(|(offset, indicator)| {
94 self.whole_size += offset;
95 let offset = self.whole_size;
96 let message_size = indicator.total_length as usize;
97 self.whole_size += message_size;
98 let sect_info = SectionInfo {
99 num: 0,
100 offset,
101 size: SECT0_IS_SIZE,
102 body: Some(SectionBody::Section0(indicator)),
103 };
104 self.rest_size = message_size - SECT0_IS_SIZE;
105 sect_info
106 });
107 Some(result)
108 }
109
110 fn reset_pos(&mut self) -> Result<(), io::Error> {
111 let pos = self.reader.stream_position()?;
112 self.whole_size = pos as usize;
113 Ok(())
114 }
115
116 #[inline]
117 fn next_sect8(&mut self) -> Option<Result<SectionInfo, ParseError>> {
118 let result = self.reader.read_sect8().transpose()?.map(|_| {
119 let sect_info = SectionInfo {
120 num: 8,
121 offset: self.whole_size - self.rest_size,
122 size: SECT8_ES_SIZE,
123 body: None,
124 };
125 self.rest_size -= SECT8_ES_SIZE;
126 sect_info
127 });
128 Some(result)
129 }
130
131 #[inline]
132 fn next_sect(&mut self) -> Option<Result<SectionInfo, ParseError>> {
133 let result = self.reader.read_sect_header().transpose()?;
134 match result {
135 Ok(header) => {
136 let offset = self.whole_size - self.rest_size;
137 match self.reader.read_sect_payload(&header) {
138 Ok(body) => {
139 let body = Some(body);
140 let (size, num) = header;
141 self.rest_size -= size;
142 Some(Ok(SectionInfo {
143 num,
144 offset,
145 size,
146 body,
147 }))
148 }
149 Err(e) => Some(Err(e)),
150 }
151 }
152 Err(e) => Some(Err(e)),
153 }
154 }
155}
156
157impl<R> Iterator for Grib2SectionStream<R>
158where
159 R: Grib2Read,
160{
161 type Item = Result<SectionInfo, ParseError>;
162
163 fn next(&mut self) -> Option<Self::Item> {
164 match self.rest_size {
165 0 => self.next_sect0(),
166 SECT8_ES_SIZE => self.next_sect8(),
167 _ => self.next_sect(),
168 }
169 }
170}
171
172pub trait Grib2Read: Read + Seek {
173 fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError>;
175
176 fn read_sect8(&mut self) -> Result<Option<()>, ParseError>;
178
179 fn read_sect_as_slice(&mut self, sect: &SectionInfo) -> Result<Vec<u8>, ParseError>;
182 fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError>;
183 fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError>;
184 fn read_sect6_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
185 fn skip_sect7_payload(&mut self, size: usize) -> Result<SectionBody, ParseError>;
186 fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError>;
187}
188
189pub struct SeekableGrib2Reader<R> {
190 reader: R,
191}
192
193impl<R> SeekableGrib2Reader<R> {
194 pub fn new(r: R) -> Self {
195 Self { reader: r }
196 }
197}
198
199impl<R: Read> Read for SeekableGrib2Reader<R> {
200 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201 self.reader.read(buf)
202 }
203
204 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
205 self.reader.read_exact(buf)
206 }
207}
208
209impl<S: Seek> Seek for SeekableGrib2Reader<S> {
210 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
211 self.reader.seek(pos)
212 }
213}
214
215macro_rules! check_size {
216 ($size:expr, $expected_size:expr) => {{
217 if $size == 0 {
218 return Ok(None);
219 }
220 if $size != $expected_size {
221 return Err(io::Error::new(
222 io::ErrorKind::UnexpectedEof,
223 "failed to fill whole buffer",
224 )
225 .into());
226 }
227 }};
228}
229
230impl<R: Read + Seek> Grib2Read for SeekableGrib2Reader<R> {
231 fn read_sect0(&mut self) -> Result<Option<(usize, Indicator)>, ParseError> {
232 let mut buf = [0; 4096];
233 let mut offset = 0;
234
235 loop {
236 let size = self.read(&mut buf[..])?;
237 if size < SECT0_IS_SIZE {
238 return Ok(None);
239 }
240 let next_offset = size - SECT0_IS_SIZE + 1;
241 for pos in 0..next_offset {
242 if &buf[pos..pos + SECT0_IS_MAGIC_SIZE] == SECT0_IS_MAGIC {
243 offset += pos;
244 self.seek(SeekFrom::Current(
245 (pos + SECT0_IS_SIZE) as i64 - size as i64,
246 ))?;
247
248 let indicator = Indicator::from_slice(&buf[pos..pos + SECT0_IS_SIZE])?;
249 return Ok(Some((offset, indicator)));
250 }
251 }
252 self.seek(SeekFrom::Current(next_offset as i64 - size as i64))?;
253 offset += next_offset;
254 }
255 }
256
257 fn read_sect8(&mut self) -> Result<Option<()>, ParseError> {
258 let mut buf = [0; SECT8_ES_SIZE];
259 let size = self.read(&mut buf[..])?;
260 check_size!(size, buf.len());
261
262 if buf[..] != SECT8_ES_MAGIC[..] {
263 return Err(ParseError::EndSectionMismatch);
264 }
265
266 Ok(Some(()))
267 }
268
269 fn read_sect_as_slice(&mut self, sect: &SectionInfo) -> Result<Vec<u8>, ParseError> {
270 self.seek(SeekFrom::Start(sect.offset as u64))?;
271
272 let mut buf = vec![0; sect.size];
273 self.read_exact(buf.as_mut_slice())?;
274
275 Ok(buf)
276 }
277
278 fn read_sect_header(&mut self) -> Result<Option<SectHeader>, ParseError> {
279 let mut buf = [0; SECT_HEADER_SIZE];
280 let size = self.read(&mut buf[..])?;
281 check_size!(size, buf.len());
282
283 let sect_size = read_as!(u32, buf, 0) as usize;
284 let sect_num = buf[4];
285
286 Ok(Some((sect_size, sect_num)))
287 }
288
289 fn read_sect_payload(&mut self, header: &SectHeader) -> Result<SectionBody, ParseError> {
290 let (size, num) = header;
291 let body_size = size - SECT_HEADER_SIZE;
292 let body = match num {
293 1 => SectionBody::Section1(Identification::from_payload(
294 self.read_slice_without_offset_check(body_size)?,
295 )?),
296 2 => SectionBody::Section2(LocalUse::from_payload(
297 self.read_slice_without_offset_check(body_size)?,
298 )),
299 3 => SectionBody::Section3(GridDefinition::from_payload(
300 self.read_slice_without_offset_check(body_size)?,
301 )?),
302 4 => SectionBody::Section4(ProdDefinition::from_payload(
303 self.read_slice_without_offset_check(body_size)?,
304 )?),
305 5 => SectionBody::Section5(ReprDefinition::from_payload(
306 self.read_slice_without_offset_check(body_size)?,
307 )?),
308 6 => self.read_sect6_payload(body_size)?,
309 7 => self.skip_sect7_payload(body_size)?,
310 _ => return Err(ParseError::UnknownSectionNumber(*num)),
311 };
312
313 Ok(body)
314 }
315
316 fn read_sect6_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
317 let mut buf = [0; 1]; self.read_exact(&mut buf[..])?;
319
320 let len_extra = body_size - buf.len();
321 if len_extra > 0 {
322 let mut buf = vec![0; len_extra];
323 self.read_exact(&mut buf[..])?;
324 }
325
326 Ok(SectionBody::Section6(BitMap {
327 bitmap_indicator: buf[0],
328 }))
329 }
330
331 fn skip_sect7_payload(&mut self, body_size: usize) -> Result<SectionBody, ParseError> {
332 self.seek(SeekFrom::Current(body_size as i64))?;
333
334 Ok(SectionBody::Section7)
335 }
336
337 fn read_slice_without_offset_check(&mut self, size: usize) -> Result<Box<[u8]>, ParseError> {
338 let mut buf = vec![0; size];
339 self.read_exact(&mut buf[..])?;
340 Ok(buf.into_boxed_slice())
341 }
342}
343
344type SectHeader = (usize, u8);
345
346#[cfg(test)]
347mod tests {
348 use std::io::{Cursor, Write};
349
350 use super::*;
351
352 #[test]
353 fn read_one_grib2_message() -> Result<(), Box<dyn std::error::Error>> {
354 let f = std::fs::File::open(
355 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
356 )?;
357 let f = std::io::BufReader::new(f);
358
359 let grib2_reader = SeekableGrib2Reader::new(f);
360 let sect_stream = Grib2SectionStream::new(grib2_reader);
361 assert_eq!(
362 sect_stream
363 .take(10)
364 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
365 .collect::<Vec<_>>(),
366 vec![
367 Ok((0, 0, 16)),
368 Ok((1, 16, 21)),
369 Ok((2, 37, 27)),
370 Ok((3, 64, 35)),
371 Ok((4, 99, 58)),
372 Ok((5, 157, 21)),
373 Ok((6, 178, 6)),
374 Ok((7, 184, 5)),
375 Ok((8, 189, 4)),
376 ]
377 );
378
379 Ok(())
380 }
381
382 #[test]
383 fn read_multiple_grib2_messages() -> Result<(), Box<dyn std::error::Error>> {
384 let f = std::fs::File::open(
385 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
386 )?;
387 let mut f = std::io::BufReader::new(f);
388 let mut buf = Vec::new();
389 f.read_to_end(&mut buf)?;
390 let repeated_message = buf.repeat(2);
391 let f = Cursor::new(repeated_message);
392
393 let grib2_reader = SeekableGrib2Reader::new(f);
394 let sect_stream = Grib2SectionStream::new(grib2_reader);
395 assert_eq!(
396 sect_stream
397 .take(19)
398 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
399 .collect::<Vec<_>>(),
400 vec![
401 Ok((0, 0, 16)),
402 Ok((1, 16, 21)),
403 Ok((2, 37, 27)),
404 Ok((3, 64, 35)),
405 Ok((4, 99, 58)),
406 Ok((5, 157, 21)),
407 Ok((6, 178, 6)),
408 Ok((7, 184, 5)),
409 Ok((8, 189, 4)),
410 Ok((0, 193, 16)),
411 Ok((1, 209, 21)),
412 Ok((2, 230, 27)),
413 Ok((3, 257, 35)),
414 Ok((4, 292, 58)),
415 Ok((5, 350, 21)),
416 Ok((6, 371, 6)),
417 Ok((7, 377, 5)),
418 Ok((8, 382, 4)),
419 ]
420 );
421
422 Ok(())
423 }
424
425 #[test]
426 fn read_grib2_message_with_incomplete_section_0() -> Result<(), Box<dyn std::error::Error>> {
427 let f = std::fs::File::open(
428 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
429 )?;
430 let mut f = std::io::BufReader::new(f);
431 let mut buf = Vec::new();
432 f.read_to_end(&mut buf)?;
433
434 let mut extra_bytes = "extra".as_bytes().to_vec();
435 buf.append(&mut extra_bytes);
436 let f = Cursor::new(buf);
437
438 let grib2_reader = SeekableGrib2Reader::new(f);
439 let sect_stream = Grib2SectionStream::new(grib2_reader);
440 assert_eq!(
441 sect_stream
442 .take(10)
443 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
444 .collect::<Vec<_>>(),
445 vec![
446 Ok((0, 0, 16)),
447 Ok((1, 16, 21)),
448 Ok((2, 37, 27)),
449 Ok((3, 64, 35)),
450 Ok((4, 99, 58)),
451 Ok((5, 157, 21)),
452 Ok((6, 178, 6)),
453 Ok((7, 184, 5)),
454 Ok((8, 189, 4)),
455 ]
456 );
457
458 Ok(())
459 }
460
461 #[test]
462 fn read_grib2_message_with_incomplete_section_1() -> Result<(), Box<dyn std::error::Error>> {
463 let f = std::fs::File::open(
464 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
465 )?;
466 let mut f = std::io::BufReader::new(f);
467 let mut buf = Vec::new();
468 f.read_to_end(&mut buf)?;
469
470 let mut message_2_bytes = buf[..(SECT0_IS_SIZE + 1)].to_vec();
471 buf.append(&mut message_2_bytes);
472 let f = Cursor::new(buf);
473
474 let grib2_reader = SeekableGrib2Reader::new(f);
475 let sect_stream = Grib2SectionStream::new(grib2_reader);
476 assert_eq!(
477 sect_stream
478 .take(19)
479 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
480 .collect::<Vec<_>>(),
481 vec![
482 Ok((0, 0, 16)),
483 Ok((1, 16, 21)),
484 Ok((2, 37, 27)),
485 Ok((3, 64, 35)),
486 Ok((4, 99, 58)),
487 Ok((5, 157, 21)),
488 Ok((6, 178, 6)),
489 Ok((7, 184, 5)),
490 Ok((8, 189, 4)),
491 Ok((0, 193, 16)),
492 Err(ParseError::ReadError(
493 "failed to fill whole buffer".to_owned()
494 ))
495 ]
496 );
497
498 Ok(())
499 }
500
501 #[test]
502 fn read_grib2_message_with_incomplete_section_8() -> Result<(), Box<dyn std::error::Error>> {
503 let f = std::fs::File::open(
504 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
505 )?;
506 let mut f = std::io::BufReader::new(f);
507 let mut buf = Vec::new();
508 f.read_to_end(&mut buf)?;
509
510 let mut repeated_message = buf.repeat(2);
511 repeated_message.pop();
512 let f = Cursor::new(repeated_message);
513
514 let grib2_reader = SeekableGrib2Reader::new(f);
515 let sect_stream = Grib2SectionStream::new(grib2_reader);
516 assert_eq!(
517 sect_stream
518 .take(19)
519 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
520 .collect::<Vec<_>>(),
521 vec![
522 Ok((0, 0, 16)),
523 Ok((1, 16, 21)),
524 Ok((2, 37, 27)),
525 Ok((3, 64, 35)),
526 Ok((4, 99, 58)),
527 Ok((5, 157, 21)),
528 Ok((6, 178, 6)),
529 Ok((7, 184, 5)),
530 Ok((8, 189, 4)),
531 Ok((0, 193, 16)),
532 Ok((1, 209, 21)),
533 Ok((2, 230, 27)),
534 Ok((3, 257, 35)),
535 Ok((4, 292, 58)),
536 Ok((5, 350, 21)),
537 Ok((6, 371, 6)),
538 Ok((7, 377, 5)),
539 Err(ParseError::ReadError(
540 "failed to fill whole buffer".to_owned()
541 ))
542 ]
543 );
544
545 Ok(())
546 }
547
548 fn create_grib2_message_starting_from_non_zero_position(
549 header: &[u8],
550 ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
551 let mut buf = Vec::new();
552 buf.write_all(header)?;
553
554 let f = std::fs::File::open(
555 "testdata/icon_global_icosahedral_single-level_2021112018_000_TOT_PREC.grib2",
556 )?;
557 let mut f = std::io::BufReader::new(f);
558 f.read_to_end(&mut buf)?;
559
560 Ok(buf)
561 }
562
563 #[test]
564 fn read_grib2_message_starting_from_non_zero_position_after_seeking(
565 ) -> Result<(), Box<dyn std::error::Error>> {
566 let header_bytes_skipped = b"HEADER TO BE SKIPPED\n";
567 let buf = create_grib2_message_starting_from_non_zero_position(header_bytes_skipped)?;
568
569 let mut f = Cursor::new(buf);
570 f.seek(SeekFrom::Current(header_bytes_skipped.len() as i64))?;
571
572 let grib2_reader = SeekableGrib2Reader::new(f);
573 let sect_stream = Grib2SectionStream::new(grib2_reader);
574 assert_eq!(
575 sect_stream
576 .take(10)
577 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
578 .collect::<Vec<_>>(),
579 vec![
580 Ok((0, 21, 16)),
581 Ok((1, 37, 21)),
582 Ok((2, 58, 27)),
583 Ok((3, 85, 35)),
584 Ok((4, 120, 58)),
585 Ok((5, 178, 21)),
586 Ok((6, 199, 6)),
587 Ok((7, 205, 5)),
588 Ok((8, 210, 4)),
589 ]
590 );
591
592 Ok(())
593 }
594
595 macro_rules! test_reading_message_starting_from_non_zero_position {
596 ($(($name:ident, $header:expr, $base_offset:expr),)*) => ($(
597 #[test]
598 fn $name() -> Result<(), Box<dyn std::error::Error>> {
599 let header_bytes_skipped = $header;
600 let buf = create_grib2_message_starting_from_non_zero_position(&header_bytes_skipped)?;
601
602 let f = Cursor::new(buf);
603 let grib2_reader = SeekableGrib2Reader::new(f);
604 let sect_stream = Grib2SectionStream::new(grib2_reader);
605 assert_eq!(
606 sect_stream
607 .take(10)
608 .map(|result| result.map(|sect| (sect.num, sect.offset, sect.size)))
609 .collect::<Vec<_>>(),
610 vec![
611 Ok((0, $base_offset + 0, 16)),
612 Ok((1, $base_offset + 16, 21)),
613 Ok((2, $base_offset + 37, 27)),
614 Ok((3, $base_offset + 64, 35)),
615 Ok((4, $base_offset + 99, 58)),
616 Ok((5, $base_offset + 157, 21)),
617 Ok((6, $base_offset + 178, 6)),
618 Ok((7, $base_offset + 184, 5)),
619 Ok((8, $base_offset + 189, 4)),
620 ]
621 );
622
623 Ok(())
624 }
625 )*);
626 }
627
628 test_reading_message_starting_from_non_zero_position! {
629 (reading_message_using_read_sect0_0th_iteration, [0; 16], 16),
630 (reading_message_using_end_of_read_sect0_0th_iteration, [0; 4096 - 16], 4096 - 16),
631 (reading_message_using_read_sect0_0th_and_1st_iterations, [0; 4096 - 15], 4096 - 15),
632 (reading_message_using_read_sect0_1st_iteration, [0; 4096], 4096),
633 }
634}