0

@Ashkan 我看到了你对那个问题的回答(How to write Get_state() return based in multi-agent based on agent-id?

你给出了一些示例代码: def get_state(self):

    agent_state_dict = {}
    i = 0
    for intersection, edges in self.scenario.get_node_mapping():
        i = i + 1
        agent_id = self.agent_name_prefix + str(i) # self.agent_name_prefix is defined as string "intersection"

        speeds = []
        dist_to_intersec = []
        traffic_light_states = []

        ..... code .....

        # construct the state (observation) for each agent
        observation = np.array(
            np.concatenate([
                speeds, dist_to_intersec, traffic_light_states  

        # each intersection is an agent, so we will make a dictionary that maps form "self.agent_name_prefix+'i'" to the state of that agent.
        agent_state_dict.update({agent_id: observation})

    return agent_state_dict

我对您的代码有一些疑问:

  1. 在'for'循环中,你使用了一次交集和边,没有其他的交集和边的用法,这里的交集和边的作用是什么?
  2. 在 dist_to_intesec[] 上,基于 green_wave_env.py,它会返回所有车辆到所有路口的距离,而不是返回车辆到特殊/个人路口的距离,我不太了解你的 dist_to_intersec[],你能解释一下?

  3. 如何查看get_state()的数据,比如我要获取dist_to_intersec的数据。

在我的项目中:基于网格网络,我想在一个水平时间获得一个交叉路口的圆圈(例如,该圆圈的半径为 100m,交叉路口是中心点)上有多少车辆。所以你的回复对我有很大帮助。@阿什坎

4

1 回答 1

1

在那篇文章中,我刚刚发布了代码的骨架以便更好地理解,完整的代码没有显示在那里。

以下是您的问题的答案:

  1. intersection稍后用于获取交叉路口交通信号灯的self.k.traffic_light.get_state(intersection)状态。edges用作输入以observed_vehicle_ids = self.k_closest_to_intersection_edge(edges, self.num_closest_vehicles_onbound)获取指定边缘上的汽车。

  2. dist_to_intersec是一个变量,它将简单地存储观察到的车辆的距离(不是所有车辆到所有交叉口的距离),存储在observed_vehicle_ids

for veh_id in observed_vehicle_ids:
    if veh_id == 0:
        dist_to_intersec.append(-1)
        speeds.append(-1)
    else:
        dist_to_intersec.append(
            (self.k.scenario.edge_length(
                self.k.vehicle.get_edge(veh_id))
                - self.k.vehicle.get_position(veh_id)) / max_dist
        )
        speeds.append(
            self.k.vehicle.get_speed(veh_id) / max_speed
        )
  1. 您可以打印状态。

要在交叉路口的每个边缘获得固定数量的车辆,您可以尝试以下操作:

def k_closest_to_intersection_edge(self, edges, k):
    """
    Return the veh_id of the 4*k closest vehicles to an intersection for
    each edge (k closest vehicles on each edge). 
    """
    if k < 0:
        raise IndexError("k must be greater than 0")
    ids = []

    def sort_lambda(veh_id):
        return self.get_distance_to_intersection(veh_id)

    for edge in edges:
        vehicles = self.k.vehicle.get_ids_by_edge(edge)
        veh_ids_per_bound = sorted(
            vehicles,
            key=sort_lambda
        )
        if len(veh_ids_per_bound) >= k: # we have more than k vehicles, and we need to cut
            ids += veh_ids_per_bound[:k]
        else: # we have less than k vehicles, and we need to pad
            padding = k - len(veh_ids_per_bound)
            ids += (veh_ids_per_bound + [0]*padding)

    return ids
于 2019-08-14T22:35:39.297 回答