first commit

This commit is contained in:
mvbingham
2025-12-03 20:10:56 -05:00
commit 128f20501d
6 changed files with 1554 additions and 0 deletions

196
weather_tracker.py Normal file
View File

@@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""
Lab Weather Tracker using Pimoroni Enviro Plus sensors
Monitors temperature, humidity, pressure, and light levels
"""
import time
import csv
import os
from datetime import datetime
from smbus2 import SMBus
try:
from bme280 import BME280
except ImportError:
from pimoroni_bme280 import BME280
try:
from ltr559 import LTR559
ltr559 = LTR559()
except ImportError:
import ltr559
# Configuration
DATA_DIR = "data"
LOG_INTERVAL = 300 # 5 minutes in seconds
TEMP_COMP_FACTOR = 2.25 # Tuning factor for temperature compensation
# Initialize sensors
try:
bus = SMBus(1)
bme280 = BME280(i2c_dev=bus)
print("✓ BME280 sensor initialized")
except Exception as e:
print(f"✗ Failed to initialize BME280: {e}")
bme280 = None
try:
if not isinstance(ltr559, LTR559):
# Fallback initialization
from ltr559 import LTR559
ltr559 = LTR559()
print("✓ LTR559 sensor initialized")
except Exception as e:
print(f"✗ Failed to initialize LTR559: {e}")
ltr559 = None
# CPU temperature tracking for compensation
cpu_temps = []
def get_cpu_temperature():
"""Get the temperature of the CPU for compensation"""
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
temp = f.read()
temp = int(temp) / 1000.0
return temp
except Exception as e:
print(f"Warning: Could not read CPU temperature: {e}")
return None
def ensure_data_directory():
"""Create data directory if it doesn't exist"""
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
def get_csv_filename():
"""Generate CSV filename with current date"""
date_str = datetime.now().strftime("%Y-%m-%d")
return os.path.join(DATA_DIR, f"weather_log_{date_str}.csv")
def read_sensors():
"""Read all sensor data and return as dictionary"""
global cpu_temps
try:
data = {
'timestamp': datetime.now().isoformat(),
}
# Read BME280 sensors (temperature, pressure, humidity)
if bme280:
try:
raw_temp = bme280.get_temperature()
pressure = bme280.get_pressure()
humidity = bme280.get_humidity()
# Get CPU temperature and calculate compensated temperature
cpu_temp = get_cpu_temperature()
comp_temp = raw_temp # Default to raw if compensation fails
if cpu_temp is not None:
# Smooth out with some averaging to decrease jitter
cpu_temps.append(cpu_temp)
if len(cpu_temps) > 5:
cpu_temps = cpu_temps[-5:] # Keep only last 5 readings
if len(cpu_temps) > 0:
avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
# Compensate temperature using CPU temp
comp_temp = raw_temp - ((avg_cpu_temp - raw_temp) / TEMP_COMP_FACTOR)
data.update({
'temperature_raw_c': round(raw_temp, 2),
'temperature_c': round(comp_temp, 2),
'pressure_hpa': round(pressure, 2),
'humidity_percent': round(humidity, 2),
})
except Exception as e:
print(f"Error reading BME280: {e}")
return None
else:
print("Error: BME280 sensor not available")
return None
# Read LTR559 sensors (light and proximity)
if ltr559:
try:
lux = ltr559.get_lux()
proximity = ltr559.get_proximity()
data.update({
'light_lux': round(lux, 2),
'proximity': round(proximity, 2),
})
except Exception as e:
print(f"Error reading LTR559: {e}")
# Still return partial data
return data
except Exception as e:
print(f"Error reading sensors: {e}")
return None
def log_to_csv(data):
"""Append sensor data to CSV file"""
filename = get_csv_filename()
file_exists = os.path.isfile(filename)
try:
with open(filename, 'a', newline='') as csvfile:
fieldnames = data.keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow(data)
print(f"✓ Data logged to {filename}")
except Exception as e:
print(f"Error writing to CSV: {e}")
def display_data(data):
"""Display current sensor readings"""
print("\n" + "="*50)
print(f"Lab Weather Report - {data['timestamp']}")
print("="*50)
if 'temperature_raw_c' in data:
print(f"Temperature (raw): {data['temperature_raw_c']}°C")
print(f"Temperature (compensated): {data['temperature_c']}°C")
print(f"Pressure: {data['pressure_hpa']} hPa")
if 'humidity_percent' in data:
print(f"Humidity: {data['humidity_percent']}%")
print(f"Light Level: {data['light_lux']} lux")
if 'proximity' in data:
print(f"Proximity: {data['proximity']}")
print("="*50 + "\n")
def main():
"""Main monitoring loop"""
print("Lab Weather Tracker Starting...")
print(f"Logging interval: {LOG_INTERVAL} seconds ({LOG_INTERVAL/60} minutes)")
ensure_data_directory()
try:
while True:
data = read_sensors()
if data:
display_data(data)
log_to_csv(data)
print(f"Next reading in {LOG_INTERVAL} seconds...")
time.sleep(LOG_INTERVAL)
except KeyboardInterrupt:
print("\n\nShutting down gracefully...")
print("Weather tracking stopped.")
except Exception as e:
print(f"\nUnexpected error: {e}")
raise
if __name__ == "__main__":
main()