CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pynmea2

Python library for parsing and generating NMEA 0183 protocol messages used in GPS and marine navigation systems

77

1.11x
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Stream processing capabilities allow handling continuous NMEA data from files, serial ports, and other streaming sources with flexible error handling and parsing options.

NMEAStreamReader

class NMEAStreamReader:
    """Reads NMEA sentences from a stream with configurable error handling."""
    
    def __init__(self, stream=None, errors: str = 'raise'):
        """
        Create NMEAStreamReader object.
        
        Args:
            stream: File-like object with readline() method (optional)
            errors: Error handling behavior:
                - 'raise': Raise exception on parse errors (default)
                - 'yield': Yield ParseError objects as stream elements  
                - 'ignore': Skip parse errors silently
        """
    
    def next(self, data: str = None) -> Iterator[NMEASentence]:
        """
        Parse data and yield NMEA sentence objects.
        
        Args:
            data: Optional string data to parse. If None, reads from stream.
            
        Yields:
            NMEASentence objects or ParseError objects (if errors='yield')
        """
    
    def __iter__(self) -> Iterator[List[NMEASentence]]:
        """Iterator protocol support for use in for loops."""
    
    def __next__(self) -> List[NMEASentence]:
        """Python 3 iterator protocol."""

Usage Examples

import pynmea2
import io

# Stream processing with string data
nmea_data = """$GPGGA,184353.07,1929.045,S,02410.506,E,1,04,2.6,100.00,M,-33.9,M,,0000*6D
$GPRMC,184353.07,A,1929.045,S,02410.506,E,0.13,309.62,120598,,A*70
invalid sentence
$GPGLL,1929.045,S,02410.506,E,184353.07,A,A*4C"""

stream = io.StringIO(nmea_data)
reader = pynmea2.NMEAStreamReader(stream, errors='ignore')

for batch in reader:
    for msg in batch:
        print(f"{type(msg).__name__}: {msg}")

# Manual data feeding
reader = pynmea2.NMEAStreamReader(errors='yield')
for sentence in reader.next("$GPGGA,184353.07,1929.045,S,02410.506,E,1,04,2.6,100.00,M,-33.9,M,,0000*6D\n"):
    if isinstance(sentence, pynmea2.ParseError):
        print(f"Parse error: {sentence}")
    else:
        print(f"Parsed: {sentence}")

# File processing with error handling
with open('nmea_log.txt', 'r') as f:
    reader = pynmea2.NMEAStreamReader(f, errors='raise')
    try:
        for batch in reader:
            for msg in batch:
                print(f"Position: {getattr(msg, 'latitude', 'N/A')}, {getattr(msg, 'longitude', 'N/A')}")
    except pynmea2.ParseError as e:
        print(f"Failed to parse: {e}")

NMEAFile

class NMEAFile:
    """File reader for NMEA sentences with file-like interface."""
    
    def __init__(self, f, *args, **kwargs):
        """
        Open NMEA file for reading.
        
        Args:
            f: File path string or file-like object
            *args, **kwargs: Additional arguments passed to open() if f is a path
        """
    
    def open(self, fp: str, mode: str = 'r'):
        """Open file at specified path."""
    
    def close(self):
        """Close the file."""
    
    def __iter__(self) -> Iterator[NMEASentence]:
        """Iterate through file yielding NMEA sentences."""
    
    def next(self) -> NMEASentence:
        """Read next NMEA sentence from file."""
    
    def readline(self) -> NMEASentence:
        """Read next NMEA sentence (alias for next())."""
    
    def read(self) -> List[NMEASentence]:
        """Read all sentences as list."""
    
    def parse(self, s: str) -> NMEASentence:
        """Parse single sentence string."""
    
    def __enter__(self):
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""

Usage Examples

import pynmea2

# Read NMEA file with context manager
with pynmea2.NMEAFile('gps_log.nmea') as nmea_file:
    for sentence in nmea_file:
        if hasattr(sentence, 'latitude'):
            print(f"Position: {sentence.latitude}, {sentence.longitude}")

# Read all sentences at once
nmea_file = pynmea2.NMEAFile('gps_log.nmea')
all_sentences = nmea_file.read()
nmea_file.close()

print(f"Total sentences: {len(all_sentences)}")
gga_sentences = [s for s in all_sentences if isinstance(s, pynmea2.GGA)]
print(f"GGA sentences: {len(gga_sentences)}")

# Read file line by line
nmea_file = pynmea2.NMEAFile('gps_log.nmea')
try:
    while True:
        sentence = nmea_file.next()
        print(sentence)
except StopIteration:
    pass
finally:
    nmea_file.close()

# Open file with custom parameters
nmea_file = pynmea2.NMEAFile('gps_log.nmea', mode='r', encoding='utf-8')

Serial Port Processing

import pynmea2
import serial
import io

# Process NMEA data from serial GPS device
ser = serial.Serial('/dev/ttyUSB0', 4800, timeout=5.0)
sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))

reader = pynmea2.NMEAStreamReader(sio, errors='ignore')

try:
    for batch in reader:
        for msg in batch:
            if isinstance(msg, pynmea2.GGA) and msg.is_valid:
                print(f"GPS Fix: {msg.latitude:.6f}, {msg.longitude:.6f}")
                print(f"Altitude: {msg.altitude}m, Satellites: {msg.num_sats}")
            elif isinstance(msg, pynmea2.RMC) and msg.is_valid:
                print(f"Speed: {msg.spd_over_grnd} knots, Course: {msg.true_course}°")
                
except KeyboardInterrupt:
    print("Stopping GPS monitoring")
finally:
    ser.close()

Error Handling Strategies

Raise Errors (Default)

reader = pynmea2.NMEAStreamReader(stream, errors='raise')
try:
    for batch in reader:
        for msg in batch:
            process_message(msg)
except pynmea2.ParseError as e:
    print(f"Parse failed: {e}")
    # Handle error and potentially continue

Yield Errors

reader = pynmea2.NMEAStreamReader(stream, errors='yield')
for batch in reader:
    for item in batch:
        if isinstance(item, pynmea2.ParseError):
            print(f"Skipping bad sentence: {item}")
        else:
            process_message(item)

Ignore Errors

reader = pynmea2.NMEAStreamReader(stream, errors='ignore')
for batch in reader:
    for msg in batch:
        # Only valid sentences are yielded
        process_message(msg)

Performance Considerations

import pynmea2

# Efficient processing of large files
def process_large_nmea_file(filename):
    gga_count = 0
    positions = []
    
    with pynmea2.NMEAFile(filename) as f:
        for sentence in f:
            if isinstance(sentence, pynmea2.GGA) and sentence.is_valid:
                gga_count += 1
                positions.append((sentence.latitude, sentence.longitude))
                
                # Process in batches to manage memory
                if len(positions) >= 1000:
                    process_position_batch(positions)
                    positions.clear()
    
    # Process remaining positions
    if positions:
        process_position_batch(positions)
    
    return gga_count

def process_position_batch(positions):
    # Batch processing logic
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-pynmea2

docs

core-parsing.md

depth-sonar.md

gps-positioning.md

index.md

navigation-course.md

proprietary-sentences.md

stream-processing.md

utilities.md

wind-weather.md

tile.json