forked from zhuli90799/Fahrsimulator
503 lines
18 KiB
Python
503 lines
18 KiB
Python
import carla
|
|
import random
|
|
import time
|
|
|
|
|
|
print("CARLA library imported successfully")
|
|
|
|
|
|
|
|
#connect to the client
|
|
client = carla.Client('localhost',2000 )
|
|
#client.load_world('Town10HD')
|
|
client.load_world('Town03')
|
|
#client.load_world('Town05')
|
|
client.set_timeout(10.0)
|
|
world = client.get_world()
|
|
|
|
|
|
|
|
|
|
# Funktion that forces vehicles to stop when the light is red
|
|
def enforce_stop(world):
|
|
vehicles = world.get_actors().filter('vehicle.*')
|
|
|
|
for vehicle in vehicles:
|
|
if vehicle is None:
|
|
continue
|
|
|
|
velocity = vehicle.get_velocity()
|
|
speed = (velocity.x**2 + velocity.y**2 + velocity.z**2) ** 0.5
|
|
|
|
# recovering stop signs
|
|
stop_signs = world.get_actors().filter("traffic.stop")
|
|
#loop through all stop signs
|
|
for stop in stop_signs:
|
|
distance = vehicle.get_location().distance(stop.get_location())
|
|
|
|
# if the vehicle is less than 2 metres from the stop sign and almost at a standstill
|
|
if distance < 2.0 and speed < 0.1:# collision and motionless
|
|
print(f"🚨 Infraction détectée : Le véhicule {vehicle.id} n'a pas respecté le STOP !")
|
|
# apply the brakes to simulate the infringement
|
|
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0))
|
|
|
|
|
|
|
|
# Handle traffic lights and stop signs
|
|
def manage_traffic():
|
|
traffic_lights = world.get_actors().filter('traffic.traffic_light')
|
|
vehicles = world.get_actors().filter('vehicle.*')
|
|
|
|
for traffic_light in traffic_lights:
|
|
for vehicle in vehicles:
|
|
distance = vehicle.get_location().distance(traffic_light.get_location())
|
|
|
|
# Check if vehicle is near the red light
|
|
if distance < 10 and traffic_light.state == carla.TrafficLightState.Red:
|
|
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) # Brake if red light
|
|
|
|
# Alternate traffic lights synchronously
|
|
if traffic_light.state == carla.TrafficLightState.Red:
|
|
traffic_light.set_state(carla.TrafficLightState.Green)
|
|
else:
|
|
traffic_light.set_state(carla.TrafficLightState.Red)
|
|
|
|
|
|
map_name = world.get_map().name
|
|
print("Loaded map name:", map_name)
|
|
|
|
|
|
|
|
# Define uncontrolled intersections (add more coordinates if needed)
|
|
INTERSECTION_ZONES = [
|
|
carla.Location(x=100, y=200, z=0),
|
|
carla.Location(x=-50, y=30, z=0),
|
|
]
|
|
|
|
# Define priority based on vehicle orientation, add a function that analyzes the vehicle's movement angle
|
|
def get_vehicle_priority(vehicle):
|
|
""" Determines priority based on vehicle orientation."""
|
|
return abs(vehicle.get_transform().rotation.yaw) # Direction angle
|
|
|
|
def handle_simultaneous_arrival(vehicles):
|
|
for v in vehicles:
|
|
delay = random.uniform(0.5 , 1.5)
|
|
v.wait(delay)
|
|
|
|
|
|
|
|
# Manage uncontrolled intersections
|
|
def manage_uncontrolled_intersections():
|
|
vehicles = get_vehicles()
|
|
|
|
for vehicle in vehicles:
|
|
vehicle_location = vehicle.get_location()
|
|
|
|
for intersection in INTERSECTION_ZONES:
|
|
distance = vehicle_location.distance(intersection)
|
|
|
|
# If a vehicle approaches an intersection without a traffic light
|
|
if distance < 4:
|
|
nearby_vehicles = [
|
|
v for v in vehicles if v.id != vehicle.id and v.get_location().distance(intersection) < 10
|
|
]
|
|
|
|
if nearby_vehicles: # Sort nearby vehicles by priority (distance, speed, direction, orientation)
|
|
closest_vehicle = min(nearby_vehicles, key=lambda v: v.get_location().distance(vehicle.get_location()))
|
|
|
|
# Check if the closest vehicle is engaged in the intersection
|
|
if closest_vehicle.get_speed().length() > 0:
|
|
# The vehicle is engaged, so the other must yield
|
|
print(f"Vehicle {vehicle.id} is waiting at the intersection to yield the right of way.")
|
|
# Apply moderate braking to avoid harsh braking
|
|
vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.5))
|
|
else:
|
|
print(f"Vehicle {vehicle.id} can cross the intersection.")
|
|
# Allow gentle acceleration to cross
|
|
vehicle.apply_control(carla.VehicleControl(throttle=0.3, brake=0.0))
|
|
else:
|
|
# Add a waiting time to prevent blockages
|
|
vehicle.waiting_time = getattr(vehicle, "waiting_time", 0)
|
|
|
|
if vehicle.waiting_time > 1: # If the vehicle has waited too long
|
|
print(f"Vehicle {vehicle.id} has waited too long, it crosses.")
|
|
vehicle.apply_control(carla.VehicleControl(throttle=0.5, brake=0.0))
|
|
else:
|
|
vehicle.waiting_time += world.get_snapshot().timestamp.delta_seconds
|
|
print(f"Vehicle {vehicle.id} is waiting for {vehicle.waiting_time:.1f} sec.")
|
|
|
|
|
|
|
|
|
|
|
|
collision_status = {} # Statut de collision entre véhicules
|
|
|
|
def has_collision(vehicle1, vehicle2):
|
|
"""
|
|
Vérifie si deux véhicules sont en collision. Cela suppose une détection de collision basée sur la proximité.
|
|
"""
|
|
distance = vehicle1.get_location().distance(vehicle2.get_location())
|
|
return distance < 2 # Hypothétique, distance minimale pour considérer une collision
|
|
|
|
def reset_collision(vehicle1, vehicle2):
|
|
"""
|
|
Après une collision, attend un délai et réajuste les véhicules.
|
|
Si cela n'est pas possible, retire un véhicule pour dégager la route.
|
|
"""
|
|
print(f"Collision entre {vehicle1.id} et {vehicle2.id} détectée. Tentative de dégagement des véhicules.")
|
|
|
|
# Attente de quelques secondes avant de dégager les véhicules (ex : 3 secondes)
|
|
time.sleep(3) # Attente de 3 secondes pour simuler un délai de dégagement
|
|
|
|
# Tentative de réajustement de la position des véhicules (ex : déplacement de 2 mètres)
|
|
vehicle1.set_position(vehicle1.get_location().x + 2, vehicle1.get_location().y) # Déplacer légèrement à droite
|
|
vehicle2.set_position(vehicle2.get_location().x - 2, vehicle2.get_location().y) # Déplacer légèrement à gauche
|
|
|
|
# Réinitialisation des vitesses des véhicules
|
|
vehicle1.set_speed(5) # Réduire la vitesse à 5 km/h
|
|
vehicle2.set_speed(5) # Réduire la vitesse à 5 km/h
|
|
|
|
# Après réajustement, vérifier si la collision persiste
|
|
if has_collision(vehicle1, vehicle2):
|
|
# Si la collision persiste, retirer un des véhicules bloqués
|
|
print(f"Le véhicule {vehicle2.id} ne peut pas se dégager, suppression du véhicule.")
|
|
vehicle2.destroy() # Retirer le véhicule bloqué de la simulation
|
|
return True # Collision persistante, dégagement du véhicule
|
|
|
|
# Sinon, reprendre la circulation
|
|
print(f"Les véhicules {vehicle1.id} et {vehicle2.id} ont été dégagés et circulent à nouveau.")
|
|
vehicle1.set_speed(10) # Vitesse normale
|
|
vehicle2.set_speed(10) # Vitesse normale
|
|
|
|
return False # Pas de besoin de retirer un véhicule
|
|
|
|
def check_and_remove_collisions(vehicles):
|
|
"""
|
|
Vérifie les collisions et applique le dégagement des véhicules bloqués.
|
|
Si les véhicules ne peuvent pas être dégagés, retire un des véhicules en collision.
|
|
"""
|
|
for i, vehicle1 in enumerate(vehicles):
|
|
for vehicle2 in vehicles[i + 1:]:
|
|
if has_collision(vehicle1, vehicle2):
|
|
# Si une collision est détectée, réinitialiser les véhicules ou retirer un véhicule
|
|
if reset_collision(vehicle1, vehicle2):
|
|
# Si un véhicule a été supprimé, on s'assure que le processus continue sans le véhicule supprimé
|
|
vehicles.remove(vehicle2)
|
|
break # On arrête de vérifier après une collision
|
|
|
|
# Exemple d'utilisation dans la logique d'un point de spawn
|
|
def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_collisions=False):
|
|
blocked_vehicles = []
|
|
|
|
for vehicle in vehicles:
|
|
if vehicle.get_location().distance(spawn_point.location) < 20:
|
|
if vehicle.get_speed() < 2: # Si le véhicule est lent ou arrêté
|
|
blocked_vehicles.append(vehicle)
|
|
|
|
if len(blocked_vehicles) >= max_density:
|
|
if remove_collisions:
|
|
# Vérifie les collisions et applique le dégagement ou suppression de véhicules
|
|
check_and_remove_collisions(vehicles)
|
|
|
|
for vehicle in blocked_vehicles:
|
|
print(f"Suppression du véhicule bloqué {vehicle.id} après collision.")
|
|
vehicle.destroy() # Suppression ou gestion du véhicule bloqué
|
|
|
|
return True # Spawn occupé
|
|
|
|
return False # Spawn libre
|
|
|
|
|
|
|
|
def communicate_with_other_vehicle(vehicle):
|
|
ohter_vehicles= get_nearby_vehicles(vehicle,radius=5.0)# récupération des véhicules voisins dans un rayon de 5.0 mètres
|
|
|
|
#vérification si les véhicules sont en trajectoire de collisin
|
|
for other in other_vehicles:
|
|
if is_on_collision_course(vehicle, other):
|
|
if vehicle.id < other.id: # Comparer les ID pour décider qui passe
|
|
vehicle.move()
|
|
other.stop()
|
|
else:
|
|
vehicle.stop()
|
|
other.move()
|
|
|
|
# detection la ligne d'arret en avance
|
|
|
|
STOP_DISTANCE = 10 # Distance avant la ligne d'arrêt
|
|
|
|
def detect_stop_line(vehicle,stop_line_position):
|
|
stop_line = get_nearest_stop_line(vehicle,stop_line_position)
|
|
if stop_line and vehicle.distance_to(stop_line) <= STOP_DISTANCE:
|
|
return True # Le véhicule doit s'arrêter
|
|
return False # Le véhicule peut continuer
|
|
|
|
|
|
|
|
|
|
|
|
def slow_down_before_stop(vehicle):
|
|
if detect_stop_line(vehicle):
|
|
vehicle.set_speed(vehicle.speed * 0.5) # Ralentir à 50% de la vitesse
|
|
if vehicle.speed < 1.0: # Si la vitesse est très faible, arrêter totalement
|
|
vehicle.stop()
|
|
|
|
|
|
|
|
def ensure_correct_stop(vehicle):
|
|
if vehicle.distance_to(get_nearest_stop_line(vehicle)) < STOP_DISTANCE - 0.5:
|
|
if is_space_behind(vehicle):
|
|
vehicle.move_backward(1.0) # Reculer légèrement si possible
|
|
else:
|
|
vehicle.wait(1) # Attendre quelques secondes avant de repartir
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Classe représentant un camion
|
|
class Truck:
|
|
def __init__(self, vehicle):
|
|
self.vehicle = vehicle
|
|
self.speed = vehicle.get_velocity().length()
|
|
self.steering_angle = vehicle.get_control().steer
|
|
|
|
def update(self):
|
|
self.speed = self.vehicle.get_velocity().length()
|
|
self.steering_angle = self.vehicle.get_control().steer
|
|
# Fonction pur enregister la distance détectée par le capteur
|
|
def callback(event,distance_data):
|
|
if event:
|
|
distance_data.append(event.distance)
|
|
|
|
# Fonction pour obtenir l'espace libre à gauche et à droite via un capteur RayCast
|
|
def get_clearance(vehicle, side, world):
|
|
blueprint = world.get_blueprint_library().find('sensor.other.ray_cast')
|
|
|
|
# Position du capteur (décalé à gauche ou à droite du camion)
|
|
sensor_location = carla.Location(x=2.5, y=-1 if side == "left" else 1, z=1.5)
|
|
sensor_rotation = carla.Rotation(yaw=90 if side == "left" else -90) # Rayon vers l'extérieur
|
|
|
|
sensor_transform = carla.Transform(sensor_location, sensor_rotation)
|
|
|
|
# Création et attachement du capteur au camion
|
|
sensor = world.spawn_actor(blueprint, sensor_transform, attach_to=vehicle)
|
|
|
|
# Fonction pour récupérer la distance détectée
|
|
distance_data = []
|
|
|
|
# écoute des événements de distance avec la fonction callback
|
|
sensor.listen(lambda event:callback(event,distance_data))
|
|
|
|
# Attendre un instant pour capter les données
|
|
time.sleep(0.1)
|
|
|
|
|
|
# Suppression du capteur
|
|
sensor.destroy()
|
|
|
|
# Vérification et affichage du message
|
|
if distance_data:
|
|
min_distance = min(distance_data)
|
|
print(f"📏 Distance détectée ({side}) : {min_distance:.2f} mètres")
|
|
return min_distance
|
|
else:
|
|
print(f"❌ Aucune distance détectée sur le côté {side}")
|
|
return 5.0 # Valeur par défaut si aucune détection
|
|
|
|
return min(distance_data) if distance_data else 5.0 # Valeur par défaut si rien n'est détecté
|
|
|
|
# Fonction pour ajuster la direction du camion en fonction des obstacles
|
|
def adjust_truck_steering(truck, world):
|
|
truck.update()
|
|
|
|
truck_width = 3.0 # Largeur estimée du camion
|
|
|
|
# Récupération de l'espace libre
|
|
left_clearance = get_clearance(truck.vehicle, "left", world)
|
|
right_clearance = get_clearance(truck.vehicle, "right", world)
|
|
front_clearance = get_clearance(truck.vehicle, "front", world)
|
|
|
|
|
|
control = truck.vehicle.get_control()
|
|
|
|
# Ajuster la direction pour éviter les obstacles
|
|
if left_clearance < truck_width:
|
|
control.steer -= 0.2 # Tourner légèrement à droite
|
|
elif right_clearance < truck_width:
|
|
control.steer += 0.2 # Tourner légèrement à gauche
|
|
|
|
# Si l'espace est vraiment restreint, ajuster davantage
|
|
if left_clearance < truck_width * 0.7:
|
|
control.steer -= 0.2 # Tourner plus à droite si l'espace est très serré
|
|
elif right_clearance < truck_width * 0.7:
|
|
control.steer += 0.2 # Tourner plus à gauche si l'espace est très serré
|
|
|
|
# Ajuster la vitesse en fonction de la distance devant
|
|
if front_clearance < 3.0: # Si un véhicule est trop proche devant
|
|
control.brake = 0.5 # Appliquer le freinage
|
|
control.throttle = 0.0 # Réduire la vitesse à zéro pour éviter une collision
|
|
|
|
|
|
|
|
# fonction pour freiner
|
|
distance_ahead = get_clearance(truck.vehicle,"front",world)
|
|
|
|
if distance_ahead < 3.0: # si un vehicule est trop proche devant
|
|
control.brake=0.5 # Appliquer le freinage
|
|
|
|
# Réduire la vitesse si le virage est serré
|
|
if abs(control.steer) > 0.2: # Éviter un virage trop brusque
|
|
control.throttle = min(control.throttle,0.4) # reduire la vitesse
|
|
#control.throttle *= 0.5 # Réduction de 20% de l'accélération
|
|
|
|
# Appliquer le nouveau contrôle au camion
|
|
truck.vehicle.apply_control(control)
|
|
|
|
|
|
|
|
|
|
# Filtrer les camions dans la simulation
|
|
vehicles = world.get_actors().filter('vehicle.*')
|
|
|
|
trucks = [Truck(vehicle) for vehicle in vehicles if "truck" in vehicle.type_id]
|
|
|
|
while True:
|
|
for truck in trucks:
|
|
adjust_truck_steering(truck, world)
|
|
time.sleep(0.08) # Mettre à jour toutes les 0.08secondes
|
|
|
|
|
|
|
|
|
|
|
|
# Pour suivre le temps d'arrêt des véhicules
|
|
|
|
stuck_vehicles_time = {} # Stocke le temps d'arrêt des véhicules
|
|
|
|
def is_vehicle_blocked(vehicle, max_stop_time=30):
|
|
"""
|
|
Vérifie si un véhicule est bloqué en fonction de son temps d'arrêt et de son accélération.
|
|
- max_stop_time : Temps (en secondes) avant de considérer un véhicule comme bloqué.
|
|
"""
|
|
global stuck_vehicles_time
|
|
current_time = time.time()
|
|
vehicle_id = vehicle.id
|
|
|
|
# Vérifier si le véhicule est lent ou arrêté
|
|
if vehicle.get_speed() < 2:
|
|
if vehicle_id not in stuck_vehicles_time:
|
|
stuck_vehicles_time[vehicle_id] = current_time # Commencer le suivi du temps d'arrêt
|
|
else:
|
|
time_stopped = current_time - stuck_vehicles_time[vehicle_id]
|
|
if time_stopped > max_stop_time and vehicle.get_acceleration() < 0.1:
|
|
return True # Véhicule bloqué après 30 sec d'arrêt
|
|
else:
|
|
# Si le véhicule bouge à nouveau, on le retire des bloqués
|
|
if vehicle_id in stuck_vehicles_time:
|
|
del stuck_vehicles_time[vehicle_id]
|
|
|
|
return False # Véhicule non bloqué
|
|
|
|
def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_stuck_vehicles=False):
|
|
blocked_vehicles = []
|
|
|
|
for vehicle in vehicles:
|
|
if vehicle.get_location().distance(spawn_point.location) < 20:
|
|
if is_vehicle_blocked(vehicle):
|
|
blocked_vehicles.append(vehicle)
|
|
|
|
if len(blocked_vehicles) >= max_density:
|
|
if remove_stuck_vehicles:
|
|
for vehicle in blocked_vehicles:
|
|
print(f"Suppression du véhicule bloqué {vehicle.id} après 30 sec d'arrêt.")
|
|
vehicle.destroy()
|
|
del stuck_vehicles_time[vehicle.id] # Retirer de la liste des bloqués
|
|
return True # Spawn bloqué
|
|
|
|
return False # Spawn libre
|
|
|
|
|
|
|
|
|
|
|
|
vehicle_blueprints = world.get_blueprint_library().filter('vehicle*')
|
|
spawn_points = world.get_map().get_spawn_points()
|
|
# Spawn 30 vehicles randomly distributed throughout the map
|
|
# for each spawn point, we choose a random vehicle from the blueprint library
|
|
vehicles = []
|
|
for i in range(0,30):
|
|
vehicles.append(world.try_spawn_actor(random.choice(vehicle_blueprints), random.choice(spawn_points)))
|
|
|
|
blueprint_library = world.get_blueprint_library()
|
|
print("Available vehicule models:", blueprint_library.filter('vehicle.*'))
|
|
|
|
|
|
print("Number of spawn points:", len(spawn_points))
|
|
|
|
|
|
|
|
# Function to try spawning the vehicle at multiple spawn points
|
|
def try_spawn_vehicle(vehicle_blueprints, spawn_points):
|
|
for _ in range(40): # Try up to 10 spawn points
|
|
spawn_point = random.choice(spawn_points)
|
|
vehicle = world.try_spawn_actor(random.choice(vehicle_blueprints), spawn_point)
|
|
if vehicle is not None:
|
|
return vehicle
|
|
return None # Return None if all attempts fail
|
|
|
|
# Example usage in the main code
|
|
vehicle = try_spawn_vehicle(vehicle_blueprints, spawn_points)
|
|
if vehicle is not None:
|
|
vehicles.append(vehicle)
|
|
print(f"Vehicle {vehicle.id} created at {vehicle.get_location()}")
|
|
vehicle.set_autopilot(True)
|
|
else:
|
|
print(f"Failed to create vehicle after several attempts.")
|
|
|
|
|
|
for vehicle in world.get_actors().filter('*vehicle*'):
|
|
if vehicle.attributes['role_name'] != 'ego':
|
|
vehicle.set_autopilot(True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
world.set_weather(carla.WeatherParameters.HardRainNoon)
|
|
world.set_weather(carla.WeatherParameters.ClearNoon)
|
|
world.set_weather(carla.WeatherParameters.HardRainNight)
|
|
world.set_weather(carla.WeatherParameters.CloudySunset) # Set weather to CloudySunset
|
|
world.set_weather(carla.WeatherParameters.WetNoon) # Set weather to wet
|
|
world.set_weather(carla.WeatherParameters.CloudyNoon)
|
|
world.set_weather(carla.WeatherParameters.SoftRainNight)
|
|
world.set_weather(carla.WeatherParameters.WetCloudySunset)
|
|
print(dir(carla.WeatherParameters))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|