MultiWebcams

De Wiki levelKro

Guide pour créer un affichage multiple de caméra depuis un serveur de partage, comme FFServer.

cam.py

import cv2
import pygame
import threading
import time
import json
import sys
from datetime import datetime

# Chargement de la configuration
def load_config():
    try:
        with open("config.json", "r") as f:
            return json.load(f)
    except FileNotFoundError:
        return {"streams": [False,False,False,False,False,False,False,False]}

config = load_config()
STREAM_URLS = [f"{config['url']}{i}" for i in range(1, 9)]
active_streams = config.get("streams", [True] * 8)

debug_mode = "--debug" in sys.argv

def debug_log(message):
    if debug_mode:
        print(message)

class VideoStreamApp:
    def __init__(self):
        pygame.init()
        info = pygame.display.Info()
        self.screen_width, self.screen_height = info.current_w, info.current_h
        debug_log(f"Résolution détectée : {self.screen_width}x{self.screen_height}")
        self.screen = pygame.display.set_mode((self.screen_width, self.screen_height), pygame.NOFRAME)
        pygame.display.set_caption("Surveillance Grid")
        self.running = True
        self.frames = [None] * 8
        self.status = ["Démarrage" for _ in range(8)]
        self.threads = []
        self.start_streams()
    
    def start_streams(self):
        for idx, active in enumerate(active_streams):
            if active:
                self.status[idx] = f"Connexion à la caméra {idx+1}..."
                debug_log(f"Démarrage du thread pour la caméra {idx+1}")
                thread = threading.Thread(target=self.stream_video, args=(idx,))
                thread.daemon = True
                thread.start()
                self.threads.append(thread)
            else:
                self.status[idx] = f"Caméra {idx+1} désactivé"  # Afficher "Désactivé" pour les caméras désactivées
    
    def stream_video(self, index):
        url = STREAM_URLS[index]
        cap = None
        retry_interval = int(config['retry'])  # Intervalle de reconnexion en secondes
        previously_connected = False  # Variable pour suivre l'état de la connexion

        while self.running:
            if not active_streams[index]:  # Si la caméra est désactivée, ne tente pas de se connecter
                time.sleep(1)
                continue
            
            if cap is None or not cap.isOpened():
                self.status[index] = f"Connexion à la caméra {index+1}..."
                debug_log(f"[Camera {index+1}] Tentative de connexion à {url}")
                cap = cv2.VideoCapture(url)
                cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) 
                time.sleep(int(config['timeout']))

            if not cap.isOpened():
                debug_log(f"[Camera {index+1}] Impossible d'ouvrir le flux, reconnexion dans {retry_interval}s")
                self.status[index] = f"Attente de reconnexion à la caméra {index+1}..."
                self.frames[index] = None  # Retirer l'image pour laisser place au message de statut
                cap.release()  # Libération de l'objet cap
                cap = None  # Réinitialisation de cap
                previously_connected = False  # Variable pour suivre l'état de la connexion
                time.sleep(retry_interval)  # Reconnexion après l'intervalle

            else:
                #ret, frame = cap.read()
                for _ in range(1):  # Lire et ignorer les 5 dernières images pour vider le buffer
                    cap.grab()
                ret, frame = cap.read()  # Lire seulement la dernière frame après vidage du buffer

                if ret:
                    # Connexion réussie, on affiche uniquement une fois
                    if not previously_connected:
                        self.status[index] = f"Connecté à la caméra {index+1}"
                        debug_log(f"[Camera {index+1}] Connexion au flux réussie")
                        previously_connected = True

                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frame = cv2.resize(frame, (self.screen_width // 3, self.screen_height // 3))
                    frame = pygame.surfarray.make_surface(frame.swapaxes(0, 1))
                    self.frames[index] = frame
                    self.status[index] = ""
                else:
                    debug_log(f"[Camera {index+1}] Échec de réception du flux, reconnexion dans {retry_interval}s")
                    self.status[index] = f"Attente de reconnexion à la caméra {index+1}..."
                    self.frames[index] = None  # Retirer l'image pour laisser place au message de statut
                    cap.release()  # Libération de l'objet cap en cas d'erreur
                    cap = None  # Réinitialisation de cap
                    previously_connected = False  # Variable pour suivre l'état de la connexion
                    time.sleep(retry_interval)  # Reconnexion après l'intervalle
        
        if cap:
            cap.release()
    
    def run(self):
        debug_log("Boucle principale en cours")
        font = pygame.font.SysFont('Courier', int(config['font_camera']))  # Police monospace (compatible Win/Linux)
        font_time = pygame.font.SysFont('Courier', int(config['font_time']))  # Police plus grande pour l'heure et la date
        font_date = pygame.font.SysFont('Courier', int(config['font_date']))  # Police plus grande pour l'heure et la date
        cell_width = self.screen_width // 3
        cell_height = self.screen_height // 3
        
        while self.running:
            self.screen.fill((0, 0, 0))
            for event in pygame.event.get():
                if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key in [pygame.K_ESCAPE, pygame.K_q]):
                    self.running = False
            # Affichage des caméras
            for idx in range(8):
                x, y = (idx % 3) * cell_width, (idx // 3) * cell_height
                if self.frames[idx]:
                    self.screen.blit(self.frames[idx], (x, y))
                else:
                    # Centrage du texte
                    text_surface = font.render(self.status[idx], True, (255, 255, 255))
                    text_rect = text_surface.get_rect(center=(x + cell_width // 2, y + cell_height // 2))
                    self.screen.blit(text_surface, text_rect)
            
            # Affichage de l'heure et de la date dans la 9e case (dernière case)
            x, y = (8 % 3) * cell_width, (8 // 3) * cell_height  # Calcul des coordonnées de la 9e case
            current_time = datetime.now().strftime("%H:%M:%S")
            current_date = datetime.now().strftime("%d/%m/%Y")
            
            # Affichage de l'heure (plus grand)
            time_surface = font_time.render(current_time, True, (255, 255, 255))
            time_rect = time_surface.get_rect(center=(x + cell_width // 2, y + cell_height // 3))
            self.screen.blit(time_surface, time_rect)
            
            # Affichage de la date (plus grand)
            date_surface = font_date.render(current_date, True, (255, 255, 255))
            date_rect = date_surface.get_rect(center=(x + cell_width // 2, y + 2 * cell_height // 3))
            self.screen.blit(date_surface, date_rect)
            
            pygame.display.flip()
            
            for event in pygame.event.get():
                if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key in [pygame.K_ESCAPE, pygame.K_q]):
                    self.running = False
        
        pygame.quit()
        debug_log("Application fermée.")
        
if __name__ == "__main__":
    debug_log("Démarrage de l'application")
    app = VideoStreamApp()
    app.run()


config.json

{
    "streams": [true, true, true, true, true, true, false, true],
	"url": "http://192.168.0.255:8090/camera",
	"timeout": "1",
	"retry": "5",
	"font_camera": "16",
	"font_time": "90",
	"font_date": "70"
}