Python library for parsing and generating NMEA 0183 protocol messages used in GPS and marine navigation systems
77
Stream processing capabilities allow handling continuous NMEA data from files, serial ports, and other streaming sources with flexible error handling and parsing options.
class NMEAStreamReader:
"""Reads NMEA sentences from a stream with configurable error handling."""
def __init__(self, stream=None, errors: str = 'raise'):
"""
Create NMEAStreamReader object.
Args:
stream: File-like object with readline() method (optional)
errors: Error handling behavior:
- 'raise': Raise exception on parse errors (default)
- 'yield': Yield ParseError objects as stream elements
- 'ignore': Skip parse errors silently
"""
def next(self, data: str = None) -> Iterator[NMEASentence]:
"""
Parse data and yield NMEA sentence objects.
Args:
data: Optional string data to parse. If None, reads from stream.
Yields:
NMEASentence objects or ParseError objects (if errors='yield')
"""
def __iter__(self) -> Iterator[List[NMEASentence]]:
"""Iterator protocol support for use in for loops."""
def __next__(self) -> List[NMEASentence]:
"""Python 3 iterator protocol."""import pynmea2
import io
# Stream processing with string data
nmea_data = """$GPGGA,184353.07,1929.045,S,02410.506,E,1,04,2.6,100.00,M,-33.9,M,,0000*6D
$GPRMC,184353.07,A,1929.045,S,02410.506,E,0.13,309.62,120598,,A*70
invalid sentence
$GPGLL,1929.045,S,02410.506,E,184353.07,A,A*4C"""
stream = io.StringIO(nmea_data)
reader = pynmea2.NMEAStreamReader(stream, errors='ignore')
for batch in reader:
for msg in batch:
print(f"{type(msg).__name__}: {msg}")
# Manual data feeding
reader = pynmea2.NMEAStreamReader(errors='yield')
for sentence in reader.next("$GPGGA,184353.07,1929.045,S,02410.506,E,1,04,2.6,100.00,M,-33.9,M,,0000*6D\n"):
if isinstance(sentence, pynmea2.ParseError):
print(f"Parse error: {sentence}")
else:
print(f"Parsed: {sentence}")
# File processing with error handling
with open('nmea_log.txt', 'r') as f:
reader = pynmea2.NMEAStreamReader(f, errors='raise')
try:
for batch in reader:
for msg in batch:
print(f"Position: {getattr(msg, 'latitude', 'N/A')}, {getattr(msg, 'longitude', 'N/A')}")
except pynmea2.ParseError as e:
print(f"Failed to parse: {e}")class NMEAFile:
"""File reader for NMEA sentences with file-like interface."""
def __init__(self, f, *args, **kwargs):
"""
Open NMEA file for reading.
Args:
f: File path string or file-like object
*args, **kwargs: Additional arguments passed to open() if f is a path
"""
def open(self, fp: str, mode: str = 'r'):
"""Open file at specified path."""
def close(self):
"""Close the file."""
def __iter__(self) -> Iterator[NMEASentence]:
"""Iterate through file yielding NMEA sentences."""
def next(self) -> NMEASentence:
"""Read next NMEA sentence from file."""
def readline(self) -> NMEASentence:
"""Read next NMEA sentence (alias for next())."""
def read(self) -> List[NMEASentence]:
"""Read all sentences as list."""
def parse(self, s: str) -> NMEASentence:
"""Parse single sentence string."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""import pynmea2
# Read NMEA file with context manager
with pynmea2.NMEAFile('gps_log.nmea') as nmea_file:
for sentence in nmea_file:
if hasattr(sentence, 'latitude'):
print(f"Position: {sentence.latitude}, {sentence.longitude}")
# Read all sentences at once
nmea_file = pynmea2.NMEAFile('gps_log.nmea')
all_sentences = nmea_file.read()
nmea_file.close()
print(f"Total sentences: {len(all_sentences)}")
gga_sentences = [s for s in all_sentences if isinstance(s, pynmea2.GGA)]
print(f"GGA sentences: {len(gga_sentences)}")
# Read file line by line
nmea_file = pynmea2.NMEAFile('gps_log.nmea')
try:
while True:
sentence = nmea_file.next()
print(sentence)
except StopIteration:
pass
finally:
nmea_file.close()
# Open file with custom parameters
nmea_file = pynmea2.NMEAFile('gps_log.nmea', mode='r', encoding='utf-8')import pynmea2
import serial
import io
# Process NMEA data from serial GPS device
ser = serial.Serial('/dev/ttyUSB0', 4800, timeout=5.0)
sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
reader = pynmea2.NMEAStreamReader(sio, errors='ignore')
try:
for batch in reader:
for msg in batch:
if isinstance(msg, pynmea2.GGA) and msg.is_valid:
print(f"GPS Fix: {msg.latitude:.6f}, {msg.longitude:.6f}")
print(f"Altitude: {msg.altitude}m, Satellites: {msg.num_sats}")
elif isinstance(msg, pynmea2.RMC) and msg.is_valid:
print(f"Speed: {msg.spd_over_grnd} knots, Course: {msg.true_course}°")
except KeyboardInterrupt:
print("Stopping GPS monitoring")
finally:
ser.close()reader = pynmea2.NMEAStreamReader(stream, errors='raise')
try:
for batch in reader:
for msg in batch:
process_message(msg)
except pynmea2.ParseError as e:
print(f"Parse failed: {e}")
# Handle error and potentially continuereader = pynmea2.NMEAStreamReader(stream, errors='yield')
for batch in reader:
for item in batch:
if isinstance(item, pynmea2.ParseError):
print(f"Skipping bad sentence: {item}")
else:
process_message(item)reader = pynmea2.NMEAStreamReader(stream, errors='ignore')
for batch in reader:
for msg in batch:
# Only valid sentences are yielded
process_message(msg)import pynmea2
# Efficient processing of large files
def process_large_nmea_file(filename):
gga_count = 0
positions = []
with pynmea2.NMEAFile(filename) as f:
for sentence in f:
if isinstance(sentence, pynmea2.GGA) and sentence.is_valid:
gga_count += 1
positions.append((sentence.latitude, sentence.longitude))
# Process in batches to manage memory
if len(positions) >= 1000:
process_position_batch(positions)
positions.clear()
# Process remaining positions
if positions:
process_position_batch(positions)
return gga_count
def process_position_batch(positions):
# Batch processing logic
passInstall with Tessl CLI
npx tessl i tessl/pypi-pynmea2docs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10