import carla import random import time print("CARLA library imported successfully") #connect to the client client = carla.Client('localhost',2000 ) #client.load_world('Town10HD') client.load_world('Town03') #client.load_world('Town05') client.set_timeout(10.0) world = client.get_world() # Funktion that forces vehicles to stop when the light is red def enforce_stop(world): vehicles = world.get_actors().filter('vehicle.*') for vehicle in vehicles: if vehicle is None: continue velocity = vehicle.get_velocity() speed = (velocity.x**2 + velocity.y**2 + velocity.z**2) ** 0.5 # recovering stop signs stop_signs = world.get_actors().filter("traffic.stop") #loop through all stop signs for stop in stop_signs: distance = vehicle.get_location().distance(stop.get_location()) # if the vehicle is less than 2 metres from the stop sign and almost at a standstill if distance < 2.0 and speed < 0.1:# collision and motionless print(f"🚹 Infraction dĂ©tectĂ©e : Le vĂ©hicule {vehicle.id} n'a pas respectĂ© le STOP !") # apply the brakes to simulate the infringement vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) # Handle traffic lights and stop signs def manage_traffic(): traffic_lights = world.get_actors().filter('traffic.traffic_light') vehicles = world.get_actors().filter('vehicle.*') for traffic_light in traffic_lights: for vehicle in vehicles: distance = vehicle.get_location().distance(traffic_light.get_location()) # Check if vehicle is near the red light if distance < 10 and traffic_light.state == carla.TrafficLightState.Red: vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) # Brake if red light # Alternate traffic lights synchronously if traffic_light.state == carla.TrafficLightState.Red: traffic_light.set_state(carla.TrafficLightState.Green) else: traffic_light.set_state(carla.TrafficLightState.Red) map_name = world.get_map().name print("Loaded map name:", map_name) # Define uncontrolled intersections (add more coordinates if needed) INTERSECTION_ZONES = [ carla.Location(x=100, y=200, z=0), carla.Location(x=-50, y=30, z=0), ] # Define priority based on vehicle orientation, add a function that analyzes the vehicle's movement angle def get_vehicle_priority(vehicle): """ Determines priority based on vehicle orientation.""" return abs(vehicle.get_transform().rotation.yaw) # Direction angle def handle_simultaneous_arrival(vehicles): for v in vehicles: delay = random.uniform(0.5 , 1.5) v.wait(delay) # Manage uncontrolled intersections def manage_uncontrolled_intersections(): vehicles = get_vehicles() for vehicle in vehicles: vehicle_location = vehicle.get_location() for intersection in INTERSECTION_ZONES: distance = vehicle_location.distance(intersection) # If a vehicle approaches an intersection without a traffic light if distance < 4: nearby_vehicles = [ v for v in vehicles if v.id != vehicle.id and v.get_location().distance(intersection) < 10 ] if nearby_vehicles: # Sort nearby vehicles by priority (distance, speed, direction, orientation) closest_vehicle = min(nearby_vehicles, key=lambda v: v.get_location().distance(vehicle.get_location())) # Check if the closest vehicle is engaged in the intersection if closest_vehicle.get_speed().length() > 0: # The vehicle is engaged, so the other must yield print(f"Vehicle {vehicle.id} is waiting at the intersection to yield the right of way.") # Apply moderate braking to avoid harsh braking vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.5)) else: print(f"Vehicle {vehicle.id} can cross the intersection.") # Allow gentle acceleration to cross vehicle.apply_control(carla.VehicleControl(throttle=0.3, brake=0.0)) else: # Add a waiting time to prevent blockages vehicle.waiting_time = getattr(vehicle, "waiting_time", 0) if vehicle.waiting_time > 1: # If the vehicle has waited too long print(f"Vehicle {vehicle.id} has waited too long, it crosses.") vehicle.apply_control(carla.VehicleControl(throttle=0.5, brake=0.0)) else: vehicle.waiting_time += world.get_snapshot().timestamp.delta_seconds print(f"Vehicle {vehicle.id} is waiting for {vehicle.waiting_time:.1f} sec.") collision_status = {} # Statut de collision entre vĂ©hicules def has_collision(vehicle1, vehicle2): """ VĂ©rifie si deux vĂ©hicules sont en collision. Cela suppose une dĂ©tection de collision basĂ©e sur la proximitĂ©. """ distance = vehicle1.get_location().distance(vehicle2.get_location()) return distance < 2 # HypothĂ©tique, distance minimale pour considĂ©rer une collision def reset_collision(vehicle1, vehicle2): """ AprĂšs une collision, attend un dĂ©lai et rĂ©ajuste les vĂ©hicules. Si cela n'est pas possible, retire un vĂ©hicule pour dĂ©gager la route. """ print(f"Collision entre {vehicle1.id} et {vehicle2.id} dĂ©tectĂ©e. Tentative de dĂ©gagement des vĂ©hicules.") # Attente de quelques secondes avant de dĂ©gager les vĂ©hicules (ex : 3 secondes) time.sleep(3) # Attente de 3 secondes pour simuler un dĂ©lai de dĂ©gagement # Tentative de rĂ©ajustement de la position des vĂ©hicules (ex : dĂ©placement de 2 mĂštres) vehicle1.set_position(vehicle1.get_location().x + 2, vehicle1.get_location().y) # DĂ©placer lĂ©gĂšrement Ă  droite vehicle2.set_position(vehicle2.get_location().x - 2, vehicle2.get_location().y) # DĂ©placer lĂ©gĂšrement Ă  gauche # RĂ©initialisation des vitesses des vĂ©hicules vehicle1.set_speed(5) # RĂ©duire la vitesse Ă  5 km/h vehicle2.set_speed(5) # RĂ©duire la vitesse Ă  5 km/h # AprĂšs rĂ©ajustement, vĂ©rifier si la collision persiste if has_collision(vehicle1, vehicle2): # Si la collision persiste, retirer un des vĂ©hicules bloquĂ©s print(f"Le vĂ©hicule {vehicle2.id} ne peut pas se dĂ©gager, suppression du vĂ©hicule.") vehicle2.destroy() # Retirer le vĂ©hicule bloquĂ© de la simulation return True # Collision persistante, dĂ©gagement du vĂ©hicule # Sinon, reprendre la circulation print(f"Les vĂ©hicules {vehicle1.id} et {vehicle2.id} ont Ă©tĂ© dĂ©gagĂ©s et circulent Ă  nouveau.") vehicle1.set_speed(10) # Vitesse normale vehicle2.set_speed(10) # Vitesse normale return False # Pas de besoin de retirer un vĂ©hicule def check_and_remove_collisions(vehicles): """ VĂ©rifie les collisions et applique le dĂ©gagement des vĂ©hicules bloquĂ©s. Si les vĂ©hicules ne peuvent pas ĂȘtre dĂ©gagĂ©s, retire un des vĂ©hicules en collision. """ for i, vehicle1 in enumerate(vehicles): for vehicle2 in vehicles[i + 1:]: if has_collision(vehicle1, vehicle2): # Si une collision est dĂ©tectĂ©e, rĂ©initialiser les vĂ©hicules ou retirer un vĂ©hicule if reset_collision(vehicle1, vehicle2): # Si un vĂ©hicule a Ă©tĂ© supprimĂ©, on s'assure que le processus continue sans le vĂ©hicule supprimĂ© vehicles.remove(vehicle2) break # On arrĂȘte de vĂ©rifier aprĂšs une collision # Exemple d'utilisation dans la logique d'un point de spawn def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_collisions=False): blocked_vehicles = [] for vehicle in vehicles: if vehicle.get_location().distance(spawn_point.location) < 20: if vehicle.get_speed() < 2: # Si le vĂ©hicule est lent ou arrĂȘtĂ© blocked_vehicles.append(vehicle) if len(blocked_vehicles) >= max_density: if remove_collisions: # VĂ©rifie les collisions et applique le dĂ©gagement ou suppression de vĂ©hicules check_and_remove_collisions(vehicles) for vehicle in blocked_vehicles: print(f"Suppression du vĂ©hicule bloquĂ© {vehicle.id} aprĂšs collision.") vehicle.destroy() # Suppression ou gestion du vĂ©hicule bloquĂ© return True # Spawn occupĂ© return False # Spawn libre def communicate_with_other_vehicle(vehicle): ohter_vehicles= get_nearby_vehicles(vehicle,radius=5.0)# rĂ©cupĂ©ration des vĂ©hicules voisins dans un rayon de 5.0 mĂštres #vĂ©rification si les vĂ©hicules sont en trajectoire de collisin for other in other_vehicles: if is_on_collision_course(vehicle, other): if vehicle.id < other.id: # Comparer les ID pour dĂ©cider qui passe vehicle.move() other.stop() else: vehicle.stop() other.move() # detection la ligne d'arret en avance STOP_DISTANCE = 10 # Distance avant la ligne d'arrĂȘt def detect_stop_line(vehicle,stop_line_position): stop_line = get_nearest_stop_line(vehicle,stop_line_position) if stop_line and vehicle.distance_to(stop_line) <= STOP_DISTANCE: return True # Le vĂ©hicule doit s'arrĂȘter return False # Le vĂ©hicule peut continuer def slow_down_before_stop(vehicle): if detect_stop_line(vehicle): vehicle.set_speed(vehicle.speed * 0.5) # Ralentir Ă  50% de la vitesse if vehicle.speed < 1.0: # Si la vitesse est trĂšs faible, arrĂȘter totalement vehicle.stop() def ensure_correct_stop(vehicle): if vehicle.distance_to(get_nearest_stop_line(vehicle)) < STOP_DISTANCE - 0.5: if is_space_behind(vehicle): vehicle.move_backward(1.0) # Reculer lĂ©gĂšrement si possible else: vehicle.wait(1) # Attendre quelques secondes avant de repartir # Classe reprĂ©sentant un camion class Truck: def __init__(self, vehicle): self.vehicle = vehicle self.speed = vehicle.get_velocity().length() self.steering_angle = vehicle.get_control().steer def update(self): self.speed = self.vehicle.get_velocity().length() self.steering_angle = self.vehicle.get_control().steer # Fonction pur enregister la distance dĂ©tectĂ©e par le capteur def callback(event,distance_data): if event: distance_data.append(event.distance) # Fonction pour obtenir l'espace libre Ă  gauche et Ă  droite via un capteur RayCast def get_clearance(vehicle, side, world): blueprint = world.get_blueprint_library().find('sensor.other.ray_cast') # Position du capteur (dĂ©calĂ© Ă  gauche ou Ă  droite du camion) sensor_location = carla.Location(x=2.5, y=-1 if side == "left" else 1, z=1.5) sensor_rotation = carla.Rotation(yaw=90 if side == "left" else -90) # Rayon vers l'extĂ©rieur sensor_transform = carla.Transform(sensor_location, sensor_rotation) # CrĂ©ation et attachement du capteur au camion sensor = world.spawn_actor(blueprint, sensor_transform, attach_to=vehicle) # Fonction pour rĂ©cupĂ©rer la distance dĂ©tectĂ©e distance_data = [] # Ă©coute des Ă©vĂ©nements de distance avec la fonction callback sensor.listen(lambda event:callback(event,distance_data)) # Attendre un instant pour capter les donnĂ©es time.sleep(0.1) # Suppression du capteur sensor.destroy() # VĂ©rification et affichage du message if distance_data: min_distance = min(distance_data) print(f"📏 Distance dĂ©tectĂ©e ({side}) : {min_distance:.2f} mĂštres") return min_distance else: print(f"❌ Aucune distance dĂ©tectĂ©e sur le cĂŽtĂ© {side}") return 5.0 # Valeur par dĂ©faut si aucune dĂ©tection return min(distance_data) if distance_data else 5.0 # Valeur par dĂ©faut si rien n'est dĂ©tectĂ© # Fonction pour ajuster la direction du camion en fonction des obstacles def adjust_truck_steering(truck, world): truck.update() truck_width = 3.0 # Largeur estimĂ©e du camion # RĂ©cupĂ©ration de l'espace libre left_clearance = get_clearance(truck.vehicle, "left", world) right_clearance = get_clearance(truck.vehicle, "right", world) front_clearance = get_clearance(truck.vehicle, "front", world) control = truck.vehicle.get_control() # Ajuster la direction pour Ă©viter les obstacles if left_clearance < truck_width: control.steer -= 0.2 # Tourner lĂ©gĂšrement Ă  droite elif right_clearance < truck_width: control.steer += 0.2 # Tourner lĂ©gĂšrement Ă  gauche # Si l'espace est vraiment restreint, ajuster davantage if left_clearance < truck_width * 0.7: control.steer -= 0.2 # Tourner plus Ă  droite si l'espace est trĂšs serrĂ© elif right_clearance < truck_width * 0.7: control.steer += 0.2 # Tourner plus Ă  gauche si l'espace est trĂšs serrĂ© # Ajuster la vitesse en fonction de la distance devant if front_clearance < 3.0: # Si un vĂ©hicule est trop proche devant control.brake = 0.5 # Appliquer le freinage control.throttle = 0.0 # RĂ©duire la vitesse Ă  zĂ©ro pour Ă©viter une collision # fonction pour freiner distance_ahead = get_clearance(truck.vehicle,"front",world) if distance_ahead < 3.0: # si un vehicule est trop proche devant control.brake=0.5 # Appliquer le freinage # RĂ©duire la vitesse si le virage est serrĂ© if abs(control.steer) > 0.2: # Éviter un virage trop brusque control.throttle = min(control.throttle,0.4) # reduire la vitesse #control.throttle *= 0.5 # RĂ©duction de 20% de l'accĂ©lĂ©ration # Appliquer le nouveau contrĂŽle au camion truck.vehicle.apply_control(control) # Filtrer les camions dans la simulation vehicles = world.get_actors().filter('vehicle.*') trucks = [Truck(vehicle) for vehicle in vehicles if "truck" in vehicle.type_id] while True: for truck in trucks: adjust_truck_steering(truck, world) time.sleep(0.08) # Mettre Ă  jour toutes les 0.08secondes # Pour suivre le temps d'arrĂȘt des vĂ©hicules stuck_vehicles_time = {} # Stocke le temps d'arrĂȘt des vĂ©hicules def is_vehicle_blocked(vehicle, max_stop_time=30): """ VĂ©rifie si un vĂ©hicule est bloquĂ© en fonction de son temps d'arrĂȘt et de son accĂ©lĂ©ration. - max_stop_time : Temps (en secondes) avant de considĂ©rer un vĂ©hicule comme bloquĂ©. """ global stuck_vehicles_time current_time = time.time() vehicle_id = vehicle.id # VĂ©rifier si le vĂ©hicule est lent ou arrĂȘtĂ© if vehicle.get_speed() < 2: if vehicle_id not in stuck_vehicles_time: stuck_vehicles_time[vehicle_id] = current_time # Commencer le suivi du temps d'arrĂȘt else: time_stopped = current_time - stuck_vehicles_time[vehicle_id] if time_stopped > max_stop_time and vehicle.get_acceleration() < 0.1: return True # VĂ©hicule bloquĂ© aprĂšs 30 sec d'arrĂȘt else: # Si le vĂ©hicule bouge Ă  nouveau, on le retire des bloquĂ©s if vehicle_id in stuck_vehicles_time: del stuck_vehicles_time[vehicle_id] return False # VĂ©hicule non bloquĂ© def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_stuck_vehicles=False): blocked_vehicles = [] for vehicle in vehicles: if vehicle.get_location().distance(spawn_point.location) < 20: if is_vehicle_blocked(vehicle): blocked_vehicles.append(vehicle) if len(blocked_vehicles) >= max_density: if remove_stuck_vehicles: for vehicle in blocked_vehicles: print(f"Suppression du vĂ©hicule bloquĂ© {vehicle.id} aprĂšs 30 sec d'arrĂȘt.") vehicle.destroy() del stuck_vehicles_time[vehicle.id] # Retirer de la liste des bloquĂ©s return True # Spawn bloquĂ© return False # Spawn libre vehicle_blueprints = world.get_blueprint_library().filter('vehicle*') spawn_points = world.get_map().get_spawn_points() # Spawn 30 vehicles randomly distributed throughout the map # for each spawn point, we choose a random vehicle from the blueprint library vehicles = [] for i in range(0,30): vehicles.append(world.try_spawn_actor(random.choice(vehicle_blueprints), random.choice(spawn_points))) blueprint_library = world.get_blueprint_library() print("Available vehicule models:", blueprint_library.filter('vehicle.*')) print("Number of spawn points:", len(spawn_points)) # Function to try spawning the vehicle at multiple spawn points def try_spawn_vehicle(vehicle_blueprints, spawn_points): for _ in range(40): # Try up to 10 spawn points spawn_point = random.choice(spawn_points) vehicle = world.try_spawn_actor(random.choice(vehicle_blueprints), spawn_point) if vehicle is not None: return vehicle return None # Return None if all attempts fail # Example usage in the main code vehicle = try_spawn_vehicle(vehicle_blueprints, spawn_points) if vehicle is not None: vehicles.append(vehicle) print(f"Vehicle {vehicle.id} created at {vehicle.get_location()}") vehicle.set_autopilot(True) else: print(f"Failed to create vehicle after several attempts.") for vehicle in world.get_actors().filter('*vehicle*'): if vehicle.attributes['role_name'] != 'ego': vehicle.set_autopilot(True)