From be8ba11da7919ba5b073679a071949818986ee6f Mon Sep 17 00:00:00 2001 From: mbtech Date: Sun, 14 Dec 2025 20:50:16 +0000 Subject: [PATCH] Update epaper-pihole.py --- epaper-pihole.py | 612 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 612 insertions(+) diff --git a/epaper-pihole.py b/epaper-pihole.py index e69de29..4b3c885 100644 --- a/epaper-pihole.py +++ b/epaper-pihole.py @@ -0,0 +1,612 @@ +#!/usr/bin/env python3 +""" +Pi-hole E-Paper Display - Binglab Style +======================================= +Displays Pi-hole and system statistics on a Waveshare 2.7" E-Paper display +with a professional 6-block grid layout (3x2) showing: +- Ads blocked today +- DNS queries today +- Ad blocking percentage +- Number of devices protected +- CPU usage +- System uptime +""" + +import subprocess +import json +import time +import sys +import os +from PIL import Image, ImageDraw, ImageFont +import traceback +from datetime import datetime + +# ============================================================================ +# CONFIGURATION +# ============================================================================ + +# ----------------------------------------------------------------------------- +# Data Source Configuration +# ----------------------------------------------------------------------------- +USE_CLI = True # True = use pihole command, False = use HTTP API + +# CLI Mode Settings +USE_SSH = False # Set True to fetch stats from remote Pi-hole via SSH +SSH_HOST = "pi@192.168.10.44" # SSH connection string (user@host) + +# HTTP API Mode Settings +PIHOLE_HOST = "192.168.10.44" # Pi-hole server IP address +PIHOLE_API_KEY = "" # Optional API key for authenticated requests +USE_NEW_API = False # Reserved for future Pi-hole API versions + +# ----------------------------------------------------------------------------- +# Display Hardware Settings +# ----------------------------------------------------------------------------- +DISPLAY_WIDTH = 264 # E-Paper display width in pixels +DISPLAY_HEIGHT = 176 # E-Paper display height in pixels +REFRESH_INTERVAL = 300 # Time between updates in seconds (5 minutes) + +# ----------------------------------------------------------------------------- +# File Path Configuration +# ----------------------------------------------------------------------------- +WAVESHARE_LIB_PATH = '/home/mbtech/pihole-epaper/waveshare_epd' # Path to Waveshare library +PREVIEW_IMAGE_PATH = '/home/mbtech/pihole_display.png' # Path for preview image output + +# ----------------------------------------------------------------------------- +# Branding and Customization +# ----------------------------------------------------------------------------- +DISPLAY_TITLE = "Binglab Sinkhole" # Display header title - customize this! + +# ----------------------------------------------------------------------------- +# E-Paper Library Import +# ----------------------------------------------------------------------------- +# Attempt to import the Waveshare E-Paper library +# If not found, the script will run in preview mode (saves PNG files instead) +try: + sys.path.append(WAVESHARE_LIB_PATH) + from waveshare_epd import epd2in7b_V2 + EPD_AVAILABLE = True +except ImportError: + print("Warning: Waveshare e-Paper library not found. Running in preview mode.") + EPD_AVAILABLE = False + +# ============================================================================ +# DATA COLLECTION FUNCTIONS +# ============================================================================ + +def run_pihole_command(command): + """ + Execute a Pi-hole CLI command either locally or via SSH + + Args: + command (str): The command to execute + + Returns: + str: Command output if successful, None otherwise + """ + try: + # Execute via SSH if configured + if USE_SSH: + ssh_cmd = ['ssh', SSH_HOST, command] + result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10) + # Execute locally + else: + result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10) + + # Return output if command succeeded + if result.returncode == 0: + return result.stdout.strip() + return None + except Exception as e: + print(f"Error running command: {e}") + return None + +def get_pihole_stats_cli(): + """ + Fetch Pi-hole statistics using the CLI method + Attempts newer API format first, falls back to legacy format + + Returns: + dict: Statistics dictionary with keys: + - dns_queries_today: Total DNS queries + - ads_blocked_today: Number of blocked queries + - ads_percentage_today: Percentage of queries blocked + - unique_clients: Number of active devices + None if data fetch fails + """ + try: + # Try newer Pi-hole API format first (v6.0+) + output = run_pihole_command('sudo pihole api stats/summary') + + if output: + try: + data = json.loads(output) + # Check if response has new API structure + if 'queries' in data and isinstance(data['queries'], dict): + return { + 'dns_queries_today': data.get('queries', {}).get('total', 0), + 'ads_blocked_today': data.get('queries', {}).get('blocked', 0), + 'ads_percentage_today': data.get('queries', {}).get('percent_blocked', 0), + 'unique_clients': data.get('clients', {}).get('active', 0), + } + except (json.JSONDecodeError, KeyError, TypeError): + # If parsing fails, try legacy format below + pass + + # Fallback to legacy Pi-hole format (v5.x and earlier) + output = run_pihole_command('sudo pihole -c -j') + if output: + data = json.loads(output) + return { + 'dns_queries_today': data.get('dns_queries_today', 0), + 'ads_blocked_today': data.get('ads_blocked_today', 0), + 'ads_percentage_today': data.get('ads_percentage_today', 0), + 'unique_clients': data.get('unique_clients', 0), + } + return None + except Exception as e: + print(f"Error getting CLI stats: {e}") + return None + +def get_pihole_stats_http(): + """ + Fetch Pi-hole statistics using the HTTP API method + + Returns: + dict: Statistics dictionary with keys: + - dns_queries_today: Total DNS queries + - ads_blocked_today: Number of blocked queries + - ads_percentage_today: Percentage of queries blocked + - unique_clients: Number of active devices + None if data fetch fails + """ + import requests + try: + # Build API endpoint URL + url = f"http://{PIHOLE_HOST}/admin/api.php" + params = {'summaryRaw': ''} + + # Add authentication if API key is provided + if PIHOLE_API_KEY: + params['auth'] = PIHOLE_API_KEY + + # Fetch data from Pi-hole web API + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + data = response.json() + + # Extract and return statistics + return { + 'dns_queries_today': data.get('dns_queries_today', 0), + 'ads_blocked_today': data.get('ads_blocked_today', 0), + 'ads_percentage_today': data.get('ads_percentage_today', 0), + 'unique_clients': data.get('unique_clients', 0), + } + except Exception as e: + print(f"Error getting HTTP stats: {e}") + return None + +def get_pihole_stats(): + """ + Fetch Pi-hole statistics using the configured method (CLI or HTTP) + + Returns: + dict: Pi-hole statistics or None if fetch fails + """ + if USE_CLI: + return get_pihole_stats_cli() + else: + return get_pihole_stats_http() + +def get_system_stats(): + """ + Fetch system statistics (CPU usage and uptime) + Works on both local and SSH configurations + + Returns: + dict: System statistics with keys: + - cpu_usage: CPU usage percentage (float) + - uptime: System uptime as formatted string (e.g., "5d 12h") + None if data fetch fails + """ + try: + stats = {} + + # Get CPU usage + if USE_SSH: + # Remote system via SSH + cpu_output = run_pihole_command("top -bn1 | grep 'Cpu(s)' | awk '{print $2}' | cut -d'%' -f1") + if cpu_output: + stats['cpu_usage'] = float(cpu_output) + else: + stats['cpu_usage'] = 0.0 + else: + # Local system - use psutil if available, fallback to top command + try: + import psutil + stats['cpu_usage'] = psutil.cpu_percent(interval=1) + except ImportError: + # Fallback to top command + cpu_output = run_pihole_command("top -bn1 | grep 'Cpu(s)' | awk '{print $2}' | cut -d'%' -f1") + if cpu_output: + stats['cpu_usage'] = float(cpu_output) + else: + stats['cpu_usage'] = 0.0 + + # Get uptime - format as "Xd Xh" or "Xh Xm" depending on duration + uptime_output = run_pihole_command("uptime -p") + if uptime_output: + # Parse "up X days, Y hours" format + uptime_str = uptime_output.replace('up ', '') + # Simplify the format for display + parts = [] + if 'day' in uptime_str: + days = uptime_str.split('day')[0].strip().split()[-1] + parts.append(f"{days}d") + if 'hour' in uptime_str: + hours = uptime_str.split('hour')[0].strip().split()[-1] + parts.append(f"{hours}h") + if 'minute' in uptime_str and 'day' not in uptime_str and 'hour' not in uptime_str: + minutes = uptime_str.split('minute')[0].strip().split()[-1] + parts.append(f"{minutes}m") + + stats['uptime'] = ' '.join(parts[:2]) if parts else 'N/A' + else: + stats['uptime'] = 'N/A' + + return stats + except Exception as e: + print(f"Error getting system stats: {e}") + return {'cpu_usage': 0.0, 'uptime': 'N/A'} + +# ============================================================================ +# DISPLAY LAYOUT FUNCTIONS +# ============================================================================ + +def find_fonts(): + """ + Automatically discover and select the best available TrueType fonts + Searches system font directories and prioritizes high-quality fonts + + Returns: + dict: Dictionary with 'bold' and 'regular' font paths + Priority order: DejaVu > Liberation > FreeSans + """ + import os + + # Common font directory locations across different systems + font_dirs = [ + '/usr/share/fonts/', + '/usr/local/share/fonts/', + '/home/pi/.fonts/', + '/home/mbtech/.fonts/', + os.path.expanduser('~/.fonts/'), + ] + + # Scan all font directories for TTF files + found_fonts = [] + for font_dir in font_dirs: + if os.path.exists(font_dir): + for root, dirs, files in os.walk(font_dir): + for file in files: + if file.endswith(('.ttf', '.TTF')): + found_fonts.append(os.path.join(root, file)) + + fonts = {} + + # Priority 1: Look for DejaVu fonts (best quality, widely available) + for font in found_fonts: + if 'DejaVuSans-Bold.ttf' in font or 'DejaVu-Sans-Bold.ttf' in font: + fonts['bold'] = font + if 'DejaVuSans.ttf' in font and 'Bold' not in font: + fonts['regular'] = font + + # Priority 2: Fallback to Liberation fonts + if 'bold' not in fonts: + for font in found_fonts: + if 'LiberationSans-Bold' in font or 'Liberation-Sans-Bold' in font: + fonts['bold'] = font + break + + if 'regular' not in fonts: + for font in found_fonts: + if ('LiberationSans.ttf' in font or 'Liberation-Sans.ttf' in font) and 'Bold' not in font: + fonts['regular'] = font + break + + # Priority 3: Fallback to FreeSans + if 'bold' not in fonts: + for font in found_fonts: + if 'FreeSansBold' in font or 'FreeSans-Bold' in font: + fonts['bold'] = font + break + + if 'regular' not in fonts: + for font in found_fonts: + if 'FreeSans.ttf' in font and 'Bold' not in font: + fonts['regular'] = font + break + + return fonts + +def create_display_image(stats, system_stats): + """ + Generate the E-Paper display image with Pi-hole and system statistics + Creates a 6-block grid layout (3x2) with header section + + Args: + stats (dict): Pi-hole statistics dictionary or None + system_stats (dict): System statistics (CPU, uptime) or None + + Returns: + PIL.Image: Generated display image (1-bit black/white) + """ + # Create blank white image (1-bit mode for e-paper) + image = Image.new('1', (DISPLAY_WIDTH, DISPLAY_HEIGHT), 255) + draw = ImageDraw.Draw(image) + + # Discover and load system fonts + fonts = find_fonts() + + # Load TrueType fonts with different sizes for hierarchy + try: + if 'bold' in fonts and 'regular' in fonts: + # Best case: Both bold and regular fonts available + print(f"Using fonts: {os.path.basename(fonts['bold'])}, {os.path.basename(fonts['regular'])}") + font_title = ImageFont.truetype(fonts['bold'], 20) # Header title + font_huge = ImageFont.truetype(fonts['bold'], 28) # Main statistics + font_large = ImageFont.truetype(fonts['bold'], 22) # Secondary stats + font_small = ImageFont.truetype(fonts['regular'], 12) # Labels + elif 'bold' in fonts: + # Use bold for everything + print(f"Using font: {os.path.basename(fonts['bold'])}") + font_title = ImageFont.truetype(fonts['bold'], 20) + font_huge = ImageFont.truetype(fonts['bold'], 28) + font_large = ImageFont.truetype(fonts['bold'], 22) + font_small = ImageFont.truetype(fonts['bold'], 12) + elif 'regular' in fonts: + # Use regular for everything + print(f"Using font: {os.path.basename(fonts['regular'])}") + font_title = ImageFont.truetype(fonts['regular'], 20) + font_huge = ImageFont.truetype(fonts['regular'], 28) + font_large = ImageFont.truetype(fonts['regular'], 22) + font_small = ImageFont.truetype(fonts['regular'], 12) + else: + raise Exception("No TrueType fonts found") + except Exception as e: + # Fallback to default bitmap font (very small) + print(f"Font loading error: {e}") + print("Using default font (will be smaller)") + font_title = ImageFont.load_default() + font_huge = ImageFont.load_default() + font_large = ImageFont.load_default() + font_small = ImageFont.load_default() + + # Handle case where no stats are available + if stats is None: + draw.text((10, 70), "No Pi-hole Data", font=font_large, fill=0) + return image + + # ------------------------------------------------------------------------- + # HEADER SECTION (Top banner with logo, title, date, status) + # ------------------------------------------------------------------------- + # Draw header container box + draw.rectangle((0, 0, DISPLAY_WIDTH, 50), outline=0, width=2) + + # Draw logo (decorative circle with arc design) + draw.ellipse((10, 10, 35, 35), outline=0, width=2) + draw.arc((15, 15, 30, 30), 0, 270, fill=0, width=2) + + # Display title text + draw.text((45, 8), DISPLAY_TITLE, font=font_title, fill=0) + + # Display current date + date_str = datetime.now().strftime("%A %B %-d" if sys.platform != "win32" else "%A %B %d") + draw.text((45, 30), date_str, font=font_small, fill=0) + + # Status indicator (filled circle in top right) + draw.ellipse((240, 15, 254, 29), fill=0) + draw.ellipse((242, 17, 252, 27), fill=255) + + # ------------------------------------------------------------------------- + # GRID LAYOUT (6 blocks - 3 columns x 2 rows) + # ------------------------------------------------------------------------- + # Calculate block dimensions + block_width = DISPLAY_WIDTH // 3 # 88 pixels per column + row_height = (DISPLAY_HEIGHT - 50) // 2 # Two rows below header + + # Vertical dividers (create 3 columns) + draw.line((block_width, 50, block_width, DISPLAY_HEIGHT), fill=0, width=2) + draw.line((block_width * 2, 50, block_width * 2, DISPLAY_HEIGHT), fill=0, width=2) + # Horizontal divider (create 2 rows) + draw.line((0, 50 + row_height, DISPLAY_WIDTH, 50 + row_height), fill=0, width=2) + + # Helper function to draw centered text in a block + def draw_block(col, row, value, label, value_font): + """Draw a centered statistic block""" + # Calculate block center position + x_center = block_width * col + (block_width // 2) + y_start = 50 + (row_height * row) + + # Draw value (centered) + bbox = draw.textbbox((0, 0), str(value), font=value_font) + text_width = bbox[2] - bbox[0] + x_pos = x_center - (text_width // 2) + y_pos = y_start + 15 + draw.text((x_pos, y_pos), str(value), font=value_font, fill=0) + + # Draw label (centered, below value) + bbox = draw.textbbox((0, 0), label, font=font_small) + text_width = bbox[2] - bbox[0] + x_pos = x_center - (text_width // 2) + y_pos = y_start + row_height - 25 + draw.text((x_pos, y_pos), label, font=font_small, fill=0) + + # ------------------------------------------------------------------------- + # TOP ROW (Row 0) + # ------------------------------------------------------------------------- + + # Block 1 (Top Left): Ads Blocked + blocked = stats['ads_blocked_today'] + blocked_str = f"{blocked:,}" if blocked < 100000 else f"{blocked//1000}k" + draw_block(0, 0, blocked_str, "Blocked", font_large) + + # Block 2 (Top Center): DNS Queries + queries = stats['dns_queries_today'] + queries_str = f"{queries:,}" if queries < 100000 else f"{queries//1000}k" + draw_block(1, 0, queries_str, "Queries", font_large) + + # Block 3 (Top Right): Block Percentage + percent = stats['ads_percentage_today'] + percent_str = f"{percent:.1f}%" + draw_block(2, 0, percent_str, "Percent", font_large) + + # ------------------------------------------------------------------------- + # BOTTOM ROW (Row 1) + # ------------------------------------------------------------------------- + + # Block 4 (Bottom Left): Devices + devices = stats['unique_clients'] + draw_block(0, 1, str(devices), "Devices", font_large) + + # Block 5 (Bottom Center): CPU Usage + if system_stats: + cpu = system_stats['cpu_usage'] + cpu_str = f"{cpu:.1f}%" + else: + cpu_str = "N/A" + draw_block(1, 1, cpu_str, "CPU", font_large) + + # Block 6 (Bottom Right): Uptime + if system_stats: + uptime_str = system_stats['uptime'] + else: + uptime_str = "N/A" + draw_block(2, 1, uptime_str, "Uptime", font_large) + + return image + +def display_on_epaper(image, clear_first=True): + """ + Send the generated image to the physical E-Paper display + Handles both 2-color and 3-color display variants + + Args: + image (PIL.Image): The image to display + clear_first (bool): Whether to clear the display before updating + """ + try: + # Initialize the E-Paper display + epd = epd2in7b_V2.EPD() + epd.init() + + # Clear display on first run to remove any artifacts + if clear_first: + print("Clearing display...") + try: + epd.Clear() + except TypeError: + # Fallback method for displays without Clear() function + white = Image.new('1', (264, 176), 255) + buf = epd.getbuffer(white) + try: + epd.display(buf) + except TypeError: + # 3-color display needs red buffer + red_buf = [0xFF] * (264 * 176 // 8) + epd.display(buf, red_buf) + time.sleep(1) + + # Send image to display + print("Updating display...") + buffer = epd.getbuffer(image) + try: + epd.display(buffer) + except TypeError: + # 3-color display variant requires separate red channel buffer + red_buffer = [0xFF] * (264 * 176 // 8) + epd.display(buffer, red_buffer) + + # Allow time for display refresh, then put display to sleep + time.sleep(2) + epd.sleep() + print("✓ Display updated") + + except Exception as e: + print(f"Error: {e}") + traceback.print_exc() + +def save_preview(image): + """ + Save the generated image as a PNG file for preview/testing + Used when E-Paper hardware is not available + + Args: + image (PIL.Image): The image to save + """ + image.save(PREVIEW_IMAGE_PATH) + print(f"Preview saved to {PREVIEW_IMAGE_PATH}") + +# ============================================================================ +# MAIN PROGRAM LOOP +# ============================================================================ + +def main(): + """ + Main program loop + Fetches Pi-hole and system stats, generates display image, and updates the display + Runs continuously with configurable refresh interval + """ + print("Pi-hole E-Paper Display - Binglab Style (6-Block Layout)") + print("=" * 60) + + # Track if this is the first display update (to trigger full clear) + first_run = True + + # Main loop - runs indefinitely until interrupted + while True: + try: + # Fetch current Pi-hole statistics + print("\nFetching Pi-hole stats...") + stats = get_pihole_stats() + + # Fetch system statistics + print("Fetching system stats...") + system_stats = get_system_stats() + + # Display fetched stats in console + if stats: + print(f" Queries: {stats['dns_queries_today']:,}") + print(f" Blocked: {stats['ads_blocked_today']:,} ({stats['ads_percentage_today']:.2f}%)") + print(f" Devices: {stats['unique_clients']}") + if system_stats: + print(f" CPU: {system_stats['cpu_usage']:.1f}%") + print(f" Uptime: {system_stats['uptime']}") + + # Generate display image from stats + image = create_display_image(stats, system_stats) + + # Update physical display or save preview + if EPD_AVAILABLE: + display_on_epaper(image, clear_first=first_run) + first_run = False # Only clear on first update + else: + save_preview(image) + + # Wait for next update cycle + print(f"\nNext update in {REFRESH_INTERVAL} seconds...") + time.sleep(REFRESH_INTERVAL) + + except KeyboardInterrupt: + # Handle Ctrl+C gracefully + print("\nExiting...") + sys.exit(0) + except Exception as e: + # Log any unexpected errors and continue + print(f"Error: {e}") + traceback.print_exc() + time.sleep(60) # Wait 1 minute before retry + +# Program entry point +if __name__ == "__main__": + main()