#!/usr/bin/env python3 """ Lab Weather Tracker using Pimoroni Enviro Plus sensors Monitors temperature, humidity, pressure, and light levels """ import time import csv import os from datetime import datetime from smbus2 import SMBus try: from bme280 import BME280 except ImportError: from pimoroni_bme280 import BME280 try: from ltr559 import LTR559 ltr559 = LTR559() except ImportError: import ltr559 # Configuration DATA_DIR = "data" LOG_INTERVAL = 300 # 5 minutes in seconds TEMP_COMP_FACTOR = 2.25 # Tuning factor for temperature compensation # Initialize sensors try: bus = SMBus(1) bme280 = BME280(i2c_dev=bus) print("✓ BME280 sensor initialized") except Exception as e: print(f"✗ Failed to initialize BME280: {e}") bme280 = None try: if not isinstance(ltr559, LTR559): # Fallback initialization from ltr559 import LTR559 ltr559 = LTR559() print("✓ LTR559 sensor initialized") except Exception as e: print(f"✗ Failed to initialize LTR559: {e}") ltr559 = None # CPU temperature tracking for compensation cpu_temps = [] def get_cpu_temperature(): """Get the temperature of the CPU for compensation""" try: with open("/sys/class/thermal/thermal_zone0/temp", "r") as f: temp = f.read() temp = int(temp) / 1000.0 return temp except Exception as e: print(f"Warning: Could not read CPU temperature: {e}") return None def ensure_data_directory(): """Create data directory if it doesn't exist""" if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) def get_csv_filename(): """Generate CSV filename with current date""" date_str = datetime.now().strftime("%Y-%m-%d") return os.path.join(DATA_DIR, f"weather_log_{date_str}.csv") def read_sensors(): """Read all sensor data and return as dictionary""" global cpu_temps try: data = { 'timestamp': datetime.now().isoformat(), } # Read BME280 sensors (temperature, pressure, humidity) if bme280: try: raw_temp = bme280.get_temperature() pressure = bme280.get_pressure() humidity = bme280.get_humidity() # Get CPU temperature and calculate compensated temperature cpu_temp = get_cpu_temperature() comp_temp = raw_temp # Default to raw if compensation fails if cpu_temp is not None: # Smooth out with some averaging to decrease jitter cpu_temps.append(cpu_temp) if len(cpu_temps) > 5: cpu_temps = cpu_temps[-5:] # Keep only last 5 readings if len(cpu_temps) > 0: avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps)) # Compensate temperature using CPU temp comp_temp = raw_temp - ((avg_cpu_temp - raw_temp) / TEMP_COMP_FACTOR) data.update({ 'temperature_raw_c': round(raw_temp, 2), 'temperature_c': round(comp_temp, 2), 'pressure_hpa': round(pressure, 2), 'humidity_percent': round(humidity, 2), }) except Exception as e: print(f"Error reading BME280: {e}") return None else: print("Error: BME280 sensor not available") return None # Read LTR559 sensors (light and proximity) if ltr559: try: lux = ltr559.get_lux() proximity = ltr559.get_proximity() data.update({ 'light_lux': round(lux, 2), 'proximity': round(proximity, 2), }) except Exception as e: print(f"Error reading LTR559: {e}") # Still return partial data return data except Exception as e: print(f"Error reading sensors: {e}") return None def log_to_csv(data): """Append sensor data to CSV file""" filename = get_csv_filename() file_exists = os.path.isfile(filename) try: with open(filename, 'a', newline='') as csvfile: fieldnames = data.keys() writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow(data) print(f"✓ Data logged to {filename}") except Exception as e: print(f"Error writing to CSV: {e}") def display_data(data): """Display current sensor readings""" print("\n" + "="*50) print(f"Lab Weather Report - {data['timestamp']}") print("="*50) if 'temperature_raw_c' in data: print(f"Temperature (raw): {data['temperature_raw_c']}°C") print(f"Temperature (compensated): {data['temperature_c']}°C") print(f"Pressure: {data['pressure_hpa']} hPa") if 'humidity_percent' in data: print(f"Humidity: {data['humidity_percent']}%") print(f"Light Level: {data['light_lux']} lux") if 'proximity' in data: print(f"Proximity: {data['proximity']}") print("="*50 + "\n") def main(): """Main monitoring loop""" print("Lab Weather Tracker Starting...") print(f"Logging interval: {LOG_INTERVAL} seconds ({LOG_INTERVAL/60} minutes)") ensure_data_directory() try: while True: data = read_sensors() if data: display_data(data) log_to_csv(data) print(f"Next reading in {LOG_INTERVAL} seconds...") time.sleep(LOG_INTERVAL) except KeyboardInterrupt: print("\n\nShutting down gracefully...") print("Weather tracking stopped.") except Exception as e: print(f"\nUnexpected error: {e}") raise if __name__ == "__main__": main()