From 944502f4f61f5f6a774d832c484b8480344c295b Mon Sep 17 00:00:00 2001 From: Liyuan Zhu Date: Tue, 25 Feb 2025 09:05:47 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivefahrsimulator_Nna.py | 502 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 502 insertions(+) create mode 100644 drivefahrsimulator_Nna.py diff --git a/drivefahrsimulator_Nna.py b/drivefahrsimulator_Nna.py new file mode 100644 index 0000000..798040d --- /dev/null +++ b/drivefahrsimulator_Nna.py @@ -0,0 +1,502 @@ +import carla +import random +import time + + +print("CARLA library imported successfully") + + + +#connect to the client +client = carla.Client('localhost',2000 ) +#client.load_world('Town10HD') +client.load_world('Town03') +#client.load_world('Town05') +client.set_timeout(10.0) +world = client.get_world() + + + + +# Funktion that forces vehicles to stop when the light is red +def enforce_stop(world): + vehicles = world.get_actors().filter('vehicle.*') + + for vehicle in vehicles: + if vehicle is None: + continue + + velocity = vehicle.get_velocity() + speed = (velocity.x**2 + velocity.y**2 + velocity.z**2) ** 0.5 + + # recovering stop signs + stop_signs = world.get_actors().filter("traffic.stop") + #loop through all stop signs + for stop in stop_signs: + distance = vehicle.get_location().distance(stop.get_location()) + + # if the vehicle is less than 2 metres from the stop sign and almost at a standstill + if distance < 2.0 and speed < 0.1:# collision and motionless + print(f"🚹 Infraction dĂ©tectĂ©e : Le vĂ©hicule {vehicle.id} n'a pas respectĂ© le STOP !") + # apply the brakes to simulate the infringement + vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) + + + +# Handle traffic lights and stop signs +def manage_traffic(): + traffic_lights = world.get_actors().filter('traffic.traffic_light') + vehicles = world.get_actors().filter('vehicle.*') + + for traffic_light in traffic_lights: + for vehicle in vehicles: + distance = vehicle.get_location().distance(traffic_light.get_location()) + + # Check if vehicle is near the red light + if distance < 10 and traffic_light.state == carla.TrafficLightState.Red: + vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0)) # Brake if red light + + # Alternate traffic lights synchronously + if traffic_light.state == carla.TrafficLightState.Red: + traffic_light.set_state(carla.TrafficLightState.Green) + else: + traffic_light.set_state(carla.TrafficLightState.Red) + + +map_name = world.get_map().name +print("Loaded map name:", map_name) + + + +# Define uncontrolled intersections (add more coordinates if needed) +INTERSECTION_ZONES = [ + carla.Location(x=100, y=200, z=0), + carla.Location(x=-50, y=30, z=0), +] + +# Define priority based on vehicle orientation, add a function that analyzes the vehicle's movement angle +def get_vehicle_priority(vehicle): + """ Determines priority based on vehicle orientation.""" + return abs(vehicle.get_transform().rotation.yaw) # Direction angle + +def handle_simultaneous_arrival(vehicles): + for v in vehicles: + delay = random.uniform(0.5 , 1.5) + v.wait(delay) + + + +# Manage uncontrolled intersections +def manage_uncontrolled_intersections(): + vehicles = get_vehicles() + + for vehicle in vehicles: + vehicle_location = vehicle.get_location() + + for intersection in INTERSECTION_ZONES: + distance = vehicle_location.distance(intersection) + + # If a vehicle approaches an intersection without a traffic light + if distance < 4: + nearby_vehicles = [ + v for v in vehicles if v.id != vehicle.id and v.get_location().distance(intersection) < 10 + ] + + if nearby_vehicles: # Sort nearby vehicles by priority (distance, speed, direction, orientation) + closest_vehicle = min(nearby_vehicles, key=lambda v: v.get_location().distance(vehicle.get_location())) + + # Check if the closest vehicle is engaged in the intersection + if closest_vehicle.get_speed().length() > 0: + # The vehicle is engaged, so the other must yield + print(f"Vehicle {vehicle.id} is waiting at the intersection to yield the right of way.") + # Apply moderate braking to avoid harsh braking + vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.5)) + else: + print(f"Vehicle {vehicle.id} can cross the intersection.") + # Allow gentle acceleration to cross + vehicle.apply_control(carla.VehicleControl(throttle=0.3, brake=0.0)) + else: + # Add a waiting time to prevent blockages + vehicle.waiting_time = getattr(vehicle, "waiting_time", 0) + + if vehicle.waiting_time > 1: # If the vehicle has waited too long + print(f"Vehicle {vehicle.id} has waited too long, it crosses.") + vehicle.apply_control(carla.VehicleControl(throttle=0.5, brake=0.0)) + else: + vehicle.waiting_time += world.get_snapshot().timestamp.delta_seconds + print(f"Vehicle {vehicle.id} is waiting for {vehicle.waiting_time:.1f} sec.") + + + + + +collision_status = {} # Statut de collision entre vĂ©hicules + +def has_collision(vehicle1, vehicle2): + """ + VĂ©rifie si deux vĂ©hicules sont en collision. Cela suppose une dĂ©tection de collision basĂ©e sur la proximitĂ©. + """ + distance = vehicle1.get_location().distance(vehicle2.get_location()) + return distance < 2 # HypothĂ©tique, distance minimale pour considĂ©rer une collision + +def reset_collision(vehicle1, vehicle2): + """ + AprĂšs une collision, attend un dĂ©lai et rĂ©ajuste les vĂ©hicules. + Si cela n'est pas possible, retire un vĂ©hicule pour dĂ©gager la route. + """ + print(f"Collision entre {vehicle1.id} et {vehicle2.id} dĂ©tectĂ©e. Tentative de dĂ©gagement des vĂ©hicules.") + + # Attente de quelques secondes avant de dĂ©gager les vĂ©hicules (ex : 3 secondes) + time.sleep(3) # Attente de 3 secondes pour simuler un dĂ©lai de dĂ©gagement + + # Tentative de rĂ©ajustement de la position des vĂ©hicules (ex : dĂ©placement de 2 mĂštres) + vehicle1.set_position(vehicle1.get_location().x + 2, vehicle1.get_location().y) # DĂ©placer lĂ©gĂšrement Ă  droite + vehicle2.set_position(vehicle2.get_location().x - 2, vehicle2.get_location().y) # DĂ©placer lĂ©gĂšrement Ă  gauche + + # RĂ©initialisation des vitesses des vĂ©hicules + vehicle1.set_speed(5) # RĂ©duire la vitesse Ă  5 km/h + vehicle2.set_speed(5) # RĂ©duire la vitesse Ă  5 km/h + + # AprĂšs rĂ©ajustement, vĂ©rifier si la collision persiste + if has_collision(vehicle1, vehicle2): + # Si la collision persiste, retirer un des vĂ©hicules bloquĂ©s + print(f"Le vĂ©hicule {vehicle2.id} ne peut pas se dĂ©gager, suppression du vĂ©hicule.") + vehicle2.destroy() # Retirer le vĂ©hicule bloquĂ© de la simulation + return True # Collision persistante, dĂ©gagement du vĂ©hicule + + # Sinon, reprendre la circulation + print(f"Les vĂ©hicules {vehicle1.id} et {vehicle2.id} ont Ă©tĂ© dĂ©gagĂ©s et circulent Ă  nouveau.") + vehicle1.set_speed(10) # Vitesse normale + vehicle2.set_speed(10) # Vitesse normale + + return False # Pas de besoin de retirer un vĂ©hicule + +def check_and_remove_collisions(vehicles): + """ + VĂ©rifie les collisions et applique le dĂ©gagement des vĂ©hicules bloquĂ©s. + Si les vĂ©hicules ne peuvent pas ĂȘtre dĂ©gagĂ©s, retire un des vĂ©hicules en collision. + """ + for i, vehicle1 in enumerate(vehicles): + for vehicle2 in vehicles[i + 1:]: + if has_collision(vehicle1, vehicle2): + # Si une collision est dĂ©tectĂ©e, rĂ©initialiser les vĂ©hicules ou retirer un vĂ©hicule + if reset_collision(vehicle1, vehicle2): + # Si un vĂ©hicule a Ă©tĂ© supprimĂ©, on s'assure que le processus continue sans le vĂ©hicule supprimĂ© + vehicles.remove(vehicle2) + break # On arrĂȘte de vĂ©rifier aprĂšs une collision + +# Exemple d'utilisation dans la logique d'un point de spawn +def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_collisions=False): + blocked_vehicles = [] + + for vehicle in vehicles: + if vehicle.get_location().distance(spawn_point.location) < 20: + if vehicle.get_speed() < 2: # Si le vĂ©hicule est lent ou arrĂȘtĂ© + blocked_vehicles.append(vehicle) + + if len(blocked_vehicles) >= max_density: + if remove_collisions: + # VĂ©rifie les collisions et applique le dĂ©gagement ou suppression de vĂ©hicules + check_and_remove_collisions(vehicles) + + for vehicle in blocked_vehicles: + print(f"Suppression du vĂ©hicule bloquĂ© {vehicle.id} aprĂšs collision.") + vehicle.destroy() # Suppression ou gestion du vĂ©hicule bloquĂ© + + return True # Spawn occupĂ© + + return False # Spawn libre + + + +def communicate_with_other_vehicle(vehicle): + ohter_vehicles= get_nearby_vehicles(vehicle,radius=5.0)# rĂ©cupĂ©ration des vĂ©hicules voisins dans un rayon de 5.0 mĂštres + + #vĂ©rification si les vĂ©hicules sont en trajectoire de collisin + for other in other_vehicles: + if is_on_collision_course(vehicle, other): + if vehicle.id < other.id: # Comparer les ID pour dĂ©cider qui passe + vehicle.move() + other.stop() + else: + vehicle.stop() + other.move() + +# detection la ligne d'arret en avance + +STOP_DISTANCE = 10 # Distance avant la ligne d'arrĂȘt + +def detect_stop_line(vehicle,stop_line_position): + stop_line = get_nearest_stop_line(vehicle,stop_line_position) + if stop_line and vehicle.distance_to(stop_line) <= STOP_DISTANCE: + return True # Le vĂ©hicule doit s'arrĂȘter + return False # Le vĂ©hicule peut continuer + + + + + +def slow_down_before_stop(vehicle): + if detect_stop_line(vehicle): + vehicle.set_speed(vehicle.speed * 0.5) # Ralentir Ă  50% de la vitesse + if vehicle.speed < 1.0: # Si la vitesse est trĂšs faible, arrĂȘter totalement + vehicle.stop() + + + +def ensure_correct_stop(vehicle): + if vehicle.distance_to(get_nearest_stop_line(vehicle)) < STOP_DISTANCE - 0.5: + if is_space_behind(vehicle): + vehicle.move_backward(1.0) # Reculer lĂ©gĂšrement si possible + else: + vehicle.wait(1) # Attendre quelques secondes avant de repartir + + + + + + + +# Classe reprĂ©sentant un camion +class Truck: + def __init__(self, vehicle): + self.vehicle = vehicle + self.speed = vehicle.get_velocity().length() + self.steering_angle = vehicle.get_control().steer + + def update(self): + self.speed = self.vehicle.get_velocity().length() + self.steering_angle = self.vehicle.get_control().steer +# Fonction pur enregister la distance dĂ©tectĂ©e par le capteur +def callback(event,distance_data): + if event: + distance_data.append(event.distance) + +# Fonction pour obtenir l'espace libre Ă  gauche et Ă  droite via un capteur RayCast +def get_clearance(vehicle, side, world): + blueprint = world.get_blueprint_library().find('sensor.other.ray_cast') + + # Position du capteur (dĂ©calĂ© Ă  gauche ou Ă  droite du camion) + sensor_location = carla.Location(x=2.5, y=-1 if side == "left" else 1, z=1.5) + sensor_rotation = carla.Rotation(yaw=90 if side == "left" else -90) # Rayon vers l'extĂ©rieur + + sensor_transform = carla.Transform(sensor_location, sensor_rotation) + + # CrĂ©ation et attachement du capteur au camion + sensor = world.spawn_actor(blueprint, sensor_transform, attach_to=vehicle) + + # Fonction pour rĂ©cupĂ©rer la distance dĂ©tectĂ©e + distance_data = [] + + # Ă©coute des Ă©vĂ©nements de distance avec la fonction callback + sensor.listen(lambda event:callback(event,distance_data)) + + # Attendre un instant pour capter les donnĂ©es + time.sleep(0.1) + + + # Suppression du capteur + sensor.destroy() + + # VĂ©rification et affichage du message + if distance_data: + min_distance = min(distance_data) + print(f"📏 Distance dĂ©tectĂ©e ({side}) : {min_distance:.2f} mĂštres") + return min_distance + else: + print(f"❌ Aucune distance dĂ©tectĂ©e sur le cĂŽtĂ© {side}") + return 5.0 # Valeur par dĂ©faut si aucune dĂ©tection + + return min(distance_data) if distance_data else 5.0 # Valeur par dĂ©faut si rien n'est dĂ©tectĂ© + +# Fonction pour ajuster la direction du camion en fonction des obstacles +def adjust_truck_steering(truck, world): + truck.update() + + truck_width = 3.0 # Largeur estimĂ©e du camion + + # RĂ©cupĂ©ration de l'espace libre + left_clearance = get_clearance(truck.vehicle, "left", world) + right_clearance = get_clearance(truck.vehicle, "right", world) + front_clearance = get_clearance(truck.vehicle, "front", world) + + + control = truck.vehicle.get_control() + + # Ajuster la direction pour Ă©viter les obstacles + if left_clearance < truck_width: + control.steer -= 0.2 # Tourner lĂ©gĂšrement Ă  droite + elif right_clearance < truck_width: + control.steer += 0.2 # Tourner lĂ©gĂšrement Ă  gauche + + # Si l'espace est vraiment restreint, ajuster davantage + if left_clearance < truck_width * 0.7: + control.steer -= 0.2 # Tourner plus Ă  droite si l'espace est trĂšs serrĂ© + elif right_clearance < truck_width * 0.7: + control.steer += 0.2 # Tourner plus Ă  gauche si l'espace est trĂšs serrĂ© + + # Ajuster la vitesse en fonction de la distance devant + if front_clearance < 3.0: # Si un vĂ©hicule est trop proche devant + control.brake = 0.5 # Appliquer le freinage + control.throttle = 0.0 # RĂ©duire la vitesse Ă  zĂ©ro pour Ă©viter une collision + + + + # fonction pour freiner + distance_ahead = get_clearance(truck.vehicle,"front",world) + + if distance_ahead < 3.0: # si un vehicule est trop proche devant + control.brake=0.5 # Appliquer le freinage + + # RĂ©duire la vitesse si le virage est serrĂ© + if abs(control.steer) > 0.2: # Éviter un virage trop brusque + control.throttle = min(control.throttle,0.4) # reduire la vitesse + #control.throttle *= 0.5 # RĂ©duction de 20% de l'accĂ©lĂ©ration + + # Appliquer le nouveau contrĂŽle au camion + truck.vehicle.apply_control(control) + + + + + # Filtrer les camions dans la simulation + vehicles = world.get_actors().filter('vehicle.*') + + trucks = [Truck(vehicle) for vehicle in vehicles if "truck" in vehicle.type_id] + + while True: + for truck in trucks: + adjust_truck_steering(truck, world) + time.sleep(0.08) # Mettre Ă  jour toutes les 0.08secondes + + + + + +# Pour suivre le temps d'arrĂȘt des vĂ©hicules + +stuck_vehicles_time = {} # Stocke le temps d'arrĂȘt des vĂ©hicules + +def is_vehicle_blocked(vehicle, max_stop_time=30): + """ + VĂ©rifie si un vĂ©hicule est bloquĂ© en fonction de son temps d'arrĂȘt et de son accĂ©lĂ©ration. + - max_stop_time : Temps (en secondes) avant de considĂ©rer un vĂ©hicule comme bloquĂ©. + """ + global stuck_vehicles_time + current_time = time.time() + vehicle_id = vehicle.id + + # VĂ©rifier si le vĂ©hicule est lent ou arrĂȘtĂ© + if vehicle.get_speed() < 2: + if vehicle_id not in stuck_vehicles_time: + stuck_vehicles_time[vehicle_id] = current_time # Commencer le suivi du temps d'arrĂȘt + else: + time_stopped = current_time - stuck_vehicles_time[vehicle_id] + if time_stopped > max_stop_time and vehicle.get_acceleration() < 0.1: + return True # VĂ©hicule bloquĂ© aprĂšs 30 sec d'arrĂȘt + else: + # Si le vĂ©hicule bouge Ă  nouveau, on le retire des bloquĂ©s + if vehicle_id in stuck_vehicles_time: + del stuck_vehicles_time[vehicle_id] + + return False # VĂ©hicule non bloquĂ© + +def is_spawn_point_occupied(spawn_point, vehicles, max_density=2, remove_stuck_vehicles=False): + blocked_vehicles = [] + + for vehicle in vehicles: + if vehicle.get_location().distance(spawn_point.location) < 20: + if is_vehicle_blocked(vehicle): + blocked_vehicles.append(vehicle) + + if len(blocked_vehicles) >= max_density: + if remove_stuck_vehicles: + for vehicle in blocked_vehicles: + print(f"Suppression du vĂ©hicule bloquĂ© {vehicle.id} aprĂšs 30 sec d'arrĂȘt.") + vehicle.destroy() + del stuck_vehicles_time[vehicle.id] # Retirer de la liste des bloquĂ©s + return True # Spawn bloquĂ© + + return False # Spawn libre + + + + + +vehicle_blueprints = world.get_blueprint_library().filter('vehicle*') +spawn_points = world.get_map().get_spawn_points() +# Spawn 30 vehicles randomly distributed throughout the map +# for each spawn point, we choose a random vehicle from the blueprint library +vehicles = [] +for i in range(0,30): + vehicles.append(world.try_spawn_actor(random.choice(vehicle_blueprints), random.choice(spawn_points))) + +blueprint_library = world.get_blueprint_library() +print("Available vehicule models:", blueprint_library.filter('vehicle.*')) + + +print("Number of spawn points:", len(spawn_points)) + + + +# Function to try spawning the vehicle at multiple spawn points +def try_spawn_vehicle(vehicle_blueprints, spawn_points): + for _ in range(40): # Try up to 10 spawn points + spawn_point = random.choice(spawn_points) + vehicle = world.try_spawn_actor(random.choice(vehicle_blueprints), spawn_point) + if vehicle is not None: + return vehicle + return None # Return None if all attempts fail + +# Example usage in the main code +vehicle = try_spawn_vehicle(vehicle_blueprints, spawn_points) +if vehicle is not None: + vehicles.append(vehicle) + print(f"Vehicle {vehicle.id} created at {vehicle.get_location()}") + vehicle.set_autopilot(True) +else: + print(f"Failed to create vehicle after several attempts.") + + +for vehicle in world.get_actors().filter('*vehicle*'): + if vehicle.attributes['role_name'] != 'ego': + vehicle.set_autopilot(True) + + + + + + +world.set_weather(carla.WeatherParameters.HardRainNoon) +world.set_weather(carla.WeatherParameters.ClearNoon) +world.set_weather(carla.WeatherParameters.HardRainNight) +world.set_weather(carla.WeatherParameters.CloudySunset) # Set weather to CloudySunset +world.set_weather(carla.WeatherParameters.WetNoon) # Set weather to wet +world.set_weather(carla.WeatherParameters.CloudyNoon) +world.set_weather(carla.WeatherParameters.SoftRainNight) +world.set_weather(carla.WeatherParameters.WetCloudySunset) +print(dir(carla.WeatherParameters)) + + + + + + + + + + + + + + + + + + + + + + + +