Drivingsimulator/Spwan_Fahrzeuge _Nna.py

499 lines
19 KiB
Python
Raw Normal View History

import carla
import random
import time
print("CARLA library imported successfully")
#connect to the client
client = carla.Client('localhost',2000 )
#client.load_world('Town10HD')
client.load_world('Town03')
#client.load_world('Town05')
client.set_timeout(10.0)
world = client.get_world()
# Funktion that forces vehicles to stop when the light is red
def enforce_stop(world):
vehicles = world.get_actors().filter('vehicle.*')
for vehicle in vehicles:
if vehicle is None:
continue
velocity = vehicle.get_velocity()
speed = (velocity.x**2 + velocity.y**2 + velocity.z**2) ** 0.5
# recovering stop signs
stop_signs = world.get_actors().filter("traffic.stop")
#loop through all stop signs
for stop in stop_signs:
distance = vehicle.get_location().distance(stop.get_location())
# if the vehicle is less than 2 metres from the stop sign and almost at a standstill
if distance < 2.0 and speed < 0.1:# collision and motionless
print(f"🚨 Infraction détectée : Le véhicule {vehicle.id} n'a pas respecté le STOP !")
# apply the brakes to simulate the infringement
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0))
# Handle traffic lights and stop signs
def manage_traffic():
traffic_lights = world.get_actors().filter('traffic.traffic_light')
vehicles = world.get_actors().filter('vehicle.*')
for traffic_light in traffic_lights:
for vehicle in vehicles:
distance = vehicle.get_location().distance(traffic_light.get_location())
# Check if vehicle is near the red light
if distance < 10 and traffic_light.state == carla.TrafficLightState.Red:
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) # Brake if red light
# Alternate traffic lights synchronously
if traffic_light.state == carla.TrafficLightState.Red:
traffic_light.set_state(carla.TrafficLightState.Green)
else:
traffic_light.set_state(carla.TrafficLightState.Red)
map_name = world.get_map().name
print("Loaded map name:", map_name)
# Define uncontrolled intersections (add more coordinates if needed)
INTERSECTION_ZONES = [
carla.Location(x=100, y=200, z=0),
carla.Location(x=-50, y=30, z=0),
]
# Define priority based on vehicle orientation, add a function that analyzes the vehicle's movement angle
def get_vehicle_priority(vehicle):
""" Determines priority based on vehicle orientation."""
return abs(vehicle.get_transform().rotation.yaw) # Direction angle
def handle_simultaneous_arrival(vehicles):
for v in vehicles:
delay = random.uniform(0.5 , 1.5)
v.wait(delay)
# Manage uncontrolled intersections
def manage_uncontrolled_intersections():
vehicles = get_vehicles()
for vehicle in vehicles:
vehicle_location = vehicle.get_location()
for intersection in INTERSECTION_ZONES:
distance = vehicle_location.distance(intersection)
# If a vehicle approaches an intersection without a traffic light
if distance < 4:
nearby_vehicles = [
v for v in vehicles if v.id != vehicle.id and v.get_location().distance(intersection) < 10
]
if nearby_vehicles: # Sort nearby vehicles by priority (distance, speed, direction, orientation)
closest_vehicle = min(nearby_vehicles, key=lambda v: v.get_location().distance(vehicle.get_location()))
# Check if the closest vehicle is engaged in the intersection
if closest_vehicle.get_speed().length() > 0:
# The vehicle is engaged, so the other must yield
print(f"Vehicle {vehicle.id} is waiting at the intersection to yield the right of way.")
# Apply moderate braking to avoid harsh braking
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.5))
else:
print(f"Vehicle {vehicle.id} can cross the intersection.")
# Allow gentle acceleration to cross
vehicle.apply_control(carla.VehicleControl(throttle=0.3, brake=0.0))
else:
# Add a waiting time to prevent blockages
vehicle.waiting_time = getattr(vehicle, "waiting_time", 0)
if vehicle.waiting_time > 1: # If the vehicle has waited too long
print(f"Vehicle {vehicle.id} has waited too long, it crosses.")
vehicle.apply_control(carla.VehicleControl(throttle=0.5, brake=0.0))
else:
vehicle.waiting_time += world.get_snapshot().timestamp.delta_seconds
print(f"Vehicle {vehicle.id} is waiting for {vehicle.waiting_time:.1f} sec.")
#The Code try to adjusts the position of two colliding vehicles to separate them.
#if the adjustment fails, the vehicle that cannot free itself is removed from the
#wold
collision_status = {} # Statut de collision entre véhicules
def has_collision(vehicle1, vehicle2):
"""
Vérifie si deux véhicules sont en collision. Cela suppose une détection de collision basée sur la proximité.
"""
distance = vehicle1.get_location().distance(vehicle2.get_location())
return distance < 2 # Hypothétique, distance minimale pour considérer une collision
def reset_collision(vehicle1, vehicle2):
"""
Après une collision, attend un délai et réajuste les véhicules.
Si cela n'est pas possible, retire un véhicule pour dégager la route.
"""
print(f"Collision entre {vehicle1.id} et {vehicle2.id} détectée. Tentative de dégagement des véhicules.")
# Attente de quelques secondes avant de dégager les véhicules (ex : 3 secondes)
time.sleep(3) # Attente de 3 secondes pour simuler un délai de dégagement
# Tentative de réajustement de la position des véhicules (ex : déplacement de 2 mètres)
vehicle1.set_position(vehicle1.get_location().x + 2, vehicle1.get_location().y) # Déplacer légèrement à droite
vehicle2.set_position(vehicle2.get_location().x - 2, vehicle2.get_location().y) # Déplacer légèrement à gauche
# Réinitialisation des vitesses des véhicules
vehicle1.set_speed(5) # Réduire la vitesse à 5 km/h
vehicle2.set_speed(5) # Réduire la vitesse à 5 km/h
# Après réajustement, vérifier si la collision persiste
if has_collision(vehicle1, vehicle2):
# Si la collision persiste, retirer un des véhicules bloqués
print(f"Le véhicule {vehicle2.id} ne peut pas se dégager, suppression du véhicule.")
vehicle2.destroy() # Retirer le véhicule bloqué de la simulation
return True # Collision persistante, dégagement du véhicule
# Sinon, reprendre la circulation
print(f"Les véhicules {vehicle1.id} et {vehicle2.id} ont été dégagés et circulent à nouveau.")
vehicle1.set_speed(10) # Vitesse normale
vehicle2.set_speed(10) # Vitesse normale
return False # Pas de besoin de retirer un véhicule
def check_and_remove_collisions(vehicles):
"""
Vérifie les collisions et applique le dégagement des véhicules bloqués.
Si les véhicules ne peuvent pas être dégagés, retire un des véhicules en collision.
"""
for i, vehicle1 in enumerate(vehicles):
for vehicle2 in vehicles[i + 1:]:
if has_collision(vehicle1, vehicle2):
# Si une collision est détectée, réinitialiser les véhicules ou retirer un véhicule
if reset_collision(vehicle1, vehicle2):
# Si un véhicule a été supprimé, on s'assure que le processus continue sans le véhicule supprimé
vehicles.remove(vehicle2)
break # On arrête de vérifier après une collision
# Exemple d'utilisation dans la logique d'un point de spawn
def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_collisions=False):
blocked_vehicles = []
for vehicle in vehicles:
if vehicle.get_location().distance(spawn_point.location) < 20:
if vehicle.get_speed() < 2: # Si le véhicule est lent ou arrêté
blocked_vehicles.append(vehicle)
if len(blocked_vehicles) >= max_density:
if remove_collisions:
# Vérifie les collisions et applique le dégagement ou suppression de véhicules
check_and_remove_collisions(vehicles)
for vehicle in blocked_vehicles:
print(f"Suppression du véhicule bloqué {vehicle.id} après collision.")
vehicle.destroy() # Suppression ou gestion du véhicule bloqué
return True # Spawn occupé
return False # Spawn libre
def communicate_with_other_vehicle(vehicle):
ohter_vehicles= get_nearby_vehicles(vehicle,radius=5.0)# récupération des véhicules voisins dans un rayon de 5.0 mètres
#vérification si les véhicules sont en trajectoire de collisin
for other in other_vehicles:
if is_on_collision_course(vehicle, other):
if vehicle.id < other.id: # Comparer les ID pour décider qui passe
vehicle.move()
other.stop()
else:
vehicle.stop()
other.move()
# detection la ligne d'arret en avance
STOP_DISTANCE = 10 # Distance avant la ligne d'arrêt
def detect_stop_line(vehicle,stop_line_position):
stop_line = get_nearest_stop_line(vehicle,stop_line_position)
if stop_line and vehicle.distance_to(stop_line) <= STOP_DISTANCE:
return True # Le véhicule doit s'arrêter
return False # Le véhicule peut continuer
def slow_down_before_stop(vehicle):
if detect_stop_line(vehicle):
vehicle.set_speed(vehicle.speed * 0.5) # Ralentir à 50% de la vitesse
if vehicle.speed < 1.0: # Si la vitesse est très faible, arrêter totalement
vehicle.stop()
def ensure_correct_stop(vehicle):
if vehicle.distance_to(get_nearest_stop_line(vehicle)) < STOP_DISTANCE - 0.5:
if is_space_behind(vehicle):
vehicle.move_backward(1.0) # Reculer légèrement si possible
else:
vehicle.wait(1) # Attendre quelques secondes avant de repartir
#The code adjust the position of ambulance
# Classe représentant un camion
class Truck:
def __init__(self, vehicle):
self.vehicle = vehicle
self.speed = vehicle.get_velocity().length()
self.steering_angle = vehicle.get_control().steer
def update(self):
self.speed = self.vehicle.get_velocity().length()
self.steering_angle = self.vehicle.get_control().steer
# Fonction pur enregister la distance détectée par le capteur
def callback(event,distance_data):
if event:
distance_data.append(event.distance)
# Fonction pour obtenir l'espace libre à gauche et à droite via un capteur RayCast
def get_clearance(vehicle, side, world):
blueprint = world.get_blueprint_library().find('sensor.other.ray_cast')
# Position du capteur (décalé à gauche ou à droite du camion)
sensor_location = carla.Location(x=2.5, y=-1 if side == "left" else 1, z=1.5)
sensor_rotation = carla.Rotation(yaw=90 if side == "left" else -90) # Rayon vers l'extérieur
sensor_transform = carla.Transform(sensor_location, sensor_rotation)
# Création et attachement du capteur au camion
sensor = world.spawn_actor(blueprint, sensor_transform, attach_to=vehicle)
# Fonction pour récupérer la distance détectée
distance_data = []
# écoute des événements de distance avec la fonction callback
sensor.listen(lambda event:callback(event,distance_data))
# Attendre un instant pour capter les données
time.sleep(0.1)
# Suppression du capteur
sensor.destroy()
# Vérification et affichage du message
if distance_data:
min_distance = min(distance_data)
print(f"📏 Distance détectée ({side}) : {min_distance:.2f} mètres")
return min_distance
else:
print(f"❌ Aucune distance détectée sur le côté {side}")
return 5.0 # Valeur par défaut si aucune détection
return min(distance_data) if distance_data else 5.0 # Valeur par défaut si rien n'est détecté
# Fonction pour ajuster la direction du camion en fonction des obstacles
def adjust_truck_steering(truck, world):
truck.update()
truck_width = 3.0 # Largeur estimée du camion
# Récupération de l'espace libre
left_clearance = get_clearance(truck.vehicle, "left", world)
right_clearance = get_clearance(truck.vehicle, "right", world)
front_clearance = get_clearance(truck.vehicle, "front", world)
control = truck.vehicle.get_control()
# Ajuster la direction pour éviter les obstacles
if left_clearance < truck_width:
control.steer -= 0.2 # Tourner légèrement à droite
elif right_clearance < truck_width:
control.steer += 0.2 # Tourner légèrement à gauche
# Si l'espace est vraiment restreint, ajuster davantage
if left_clearance < truck_width * 0.7:
control.steer -= 0.2 # Tourner plus à droite si l'espace est très serré
elif right_clearance < truck_width * 0.7:
control.steer += 0.2 # Tourner plus à gauche si l'espace est très serré
# Ajuster la vitesse en fonction de la distance devant
if front_clearance < 3.0: # Si un véhicule est trop proche devant
control.brake = 0.5 # Appliquer le freinage
control.throttle = 0.0 # Réduire la vitesse à zéro pour éviter une collision
# fonction pour freiner
distance_ahead = get_clearance(truck.vehicle,"front",world)
if distance_ahead < 3.0: # si un vehicule est trop proche devant
control.brake=0.5 # Appliquer le freinage
# Réduire la vitesse si le virage est serré
if abs(control.steer) > 0.2: # Éviter un virage trop brusque
control.throttle = min(control.throttle,0.4) # reduire la vitesse
#control.throttle *= 0.5 # Réduction de 20% de l'accélération
# Appliquer le nouveau contrôle au camion
truck.vehicle.apply_control(control)
# Filtrer les camions dans la simulation
vehicles = world.get_actors().filter('vehicle.*')
trucks = [Truck(vehicle) for vehicle in vehicles if "truck" in vehicle.type_id]
while True:
for truck in trucks:
adjust_truck_steering(truck, world)
time.sleep(0.08) # Mettre à jour toutes les 0.08secondes
#The code based of the speed and stop_time of vehicles
# check if vehicles has 0.0 m/s speed and
# stop_time ist more than 30 seconds
# that means the vehicle is blocked and
# will be removed
stuck_vehicles_time = {} # Stocke le temps d'arrêt des véhicules
def is_vehicle_blocked(vehicle, max_stop_time=30):
"""
Vérifie si un véhicule est bloqué en fonction de son temps d'arrêt et de son accélération.
- max_stop_time : Temps (en secondes) avant de considérer un véhicule comme bloqué.
"""
global stuck_vehicles_time
current_time = time.time()
vehicle_id = vehicle.id
# Vérifier si le véhicule est lent ou arrêté
if vehicle.get_speed() < 2:
if vehicle_id not in stuck_vehicles_time:
stuck_vehicles_time[vehicle_id] = current_time # Commencer le suivi du temps d'arrêt
else:
time_stopped = current_time - stuck_vehicles_time[vehicle_id]
if time_stopped > max_stop_time and vehicle.get_acceleration() < 0.1:
return True # Véhicule bloqué après 30 sec d'arrêt
else:
# Si le véhicule bouge à nouveau, on le retire des bloqués
if vehicle_id in stuck_vehicles_time:
del stuck_vehicles_time[vehicle_id]
return False # Véhicule non bloqué
def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_stuck_vehicles=False):
blocked_vehicles = []
for vehicle in vehicles:
if vehicle.get_location().distance(spawn_point.location) < 20:
if is_vehicle_blocked(vehicle):
blocked_vehicles.append(vehicle)
if len(blocked_vehicles) >= max_density:
if remove_stuck_vehicles:
for vehicle in blocked_vehicles:
print(f"Suppression du véhicule bloqué {vehicle.id} après 30 sec d'arrêt.")
vehicle.destroy()
del stuck_vehicles_time[vehicle.id] # Retirer de la liste des bloqués
return True # Spawn bloqué
return False # Spawn libre
# get the vehicle blueprints and spwan point
vehicle_blueprints = world.get_blueprint_library().filter('vehicle*')
spawn_points = world.get_map().get_spawn_points()
# Spawn 30 vehicles randomly distributed throughout the map
# for each spawn point, we choose a random vehicle from the blueprint library
vehicles = []
for i in range(0,30):
vehicles.append(world.try_spawn_actor(random.choice(vehicle_blueprints), random.choice(spawn_points)))
#display the available vehicles
blueprint_library = world.get_blueprint_library()
print("Available vehicule models:", blueprint_library.filter('vehicle.*'))
#display the Number of spwan points
print("Number of spawn points:", len(spawn_points))
# Function to try spawning the vehicle at multiple spawn points
def try_spawn_vehicle(vehicle_blueprints, spawn_points):
for _ in range(40): # Try up to 10 spawn points
spawn_point = random.choice(spawn_points)
vehicle = world.try_spawn_actor(random.choice(vehicle_blueprints), spawn_point)
if vehicle is not None:
return vehicle
return None # Return None if all attempts fail
# Example usage in the main code
vehicle = try_spawn_vehicle(vehicle_blueprints, spawn_points)
if vehicle is not None:
vehicles.append(vehicle)
print(f"Vehicle {vehicle.id} created at {vehicle.get_location()}")
vehicle.set_autopilot(True)
else:
print(f"Failed to create vehicle after several attempts.")
#The vehicles start the control system
for vehicle in world.get_actors().filter('*vehicle*'):
if vehicle.attributes['role_name'] != 'ego':
vehicle.set_autopilot(True)
#How to set weather and display available carla.weather.parameters
world.set_weather(carla.WeatherParameters.HardRainNoon)
world.set_weather(carla.WeatherParameters.ClearNoon)
world.set_weather(carla.WeatherParameters.HardRainNight)
world.set_weather(carla.WeatherParameters.CloudySunset) # Set weather to CloudySunset
world.set_weather(carla.WeatherParameters.WetNoon) # Set weather to wet
world.set_weather(carla.WeatherParameters.CloudyNoon)
world.set_weather(carla.WeatherParameters.SoftRainNight)
world.set_weather(carla.WeatherParameters.WetCloudySunset)
print(dir(carla.WeatherParameters))