1

我正在尝试以编程方式构建代理,并且一直在关注这个示例。当我查看在faust -A <> agents脚本开始时生成的主题和代理时,我可以看到它们。代理名称类似于{topic}_agent示例。所有这些代理分别接触到一些 rest api。这是代码。

def create_agent(next_topic, model_metadata: Dict):
    """ 
         creation of a single agent.
         `start_topic`:  str
             Just a string that you can use in other functions
             to figure out how messages in that topic can be
             transformed
         `next_topic`:  faust.topics.Topic
             A faust `app.topic` instance
    """
    async def agent(stream):
        """Call domino model and return response
        """
        async for message in stream:
            domino_req = {"data": message.asdict()}
            app.logger.info(domino_req)
            response = requests.post(
                model_metadata['url'],
                auth=(
                    model_metadata['auth'],
                    model_metadata['auth'],
                ),
                json=domino_req,
            )
            if response.status_code == 200:
                answer = response.json()
                await next_topic.send(answer['result'])
            else:
                app.logger.error(response.reason)
    # app.logger.info(f"NEW Agent Created: ## Agent - {consumer} ##")
    return agent

def agents(registry_response):
    """
        configuration of multiple agents
    """
    agents = []
    for key, value in registry_response.items():
        """ `topic`:  app.topic instance """
        agents.append(
            # `topic.start`: str
            # `topic.next`: faust.topics.Topic
            (create_agent(next_topic= all_responses, model_metadata = value),key)
        )
    return agents

def attach_agent(agent, topic):
    """ Attach the agent to the Faust app """
    # `topic.faust`: faust.topics.Topic
    # it is equivalent to `app.topic(topic.start)`
    print("hello")
    app.agent(channel=app.topic(topic), name=f"{topic}_agent")(agent)
    # new_agent.start()
    # app.logger.info("hello")
    # app.logger.info(new_agent)
    # app.logger.info(new_agent.info())
    # app.logger.info(app.agents)

# @app.task
# async def get_model_registry():
#     """
#     Create topics and agents for the initial set of models present in the registry
#     """
app.logger.info('APP STARTED')
app.logger.info('Fetching Models from Model Registry')
#TODO: Call the Model Registry and process it
#Just mocking the registry 
registry_response = initial_model_registry_metadata
app.logger.info(f'Number of Models Found {len(registry_response)}')
for agent,topic in agents(registry_response):
    attach_agent(agent, topic)

@app.page('/model/{key_ai_model}')
class frontdoor(View):
    async def get(self, request: Request) -> Response:
        return self.json({'key_ai_model': key_ai_model})
    async def post(self, request: Request, key_ai_model: str) -> Response:
        request_id = str(uuid.uuid4())
        src = await request.json()
        msg = GdeltRequest(**src)
        app.logger.info(msg)
        await gdelt_agent.cast(msg, key=request_id)
        return self.json({'request_id': request_id, 'key_ai_model': key_ai_model})

@app.agent(all_responses)
async def print_responses(stream):
    async for message in stream:
        print(message)

但是,当我发布一些东西时,我收到了一个错误 - NameError: name 'gdelt_agent' is not defined。这里的任何帮助将不胜感激。

4

1 回答 1

0

我已经能够通过将代理分配为字典中的值并稍后使用它将消息投射到代理来解决这个问题。

agents_dict = {}
topics_dict ={}
def attach_agent(agent, topic):
    """ Attach the agent to the Faust app """
    # `topic.faust`: faust.topics.Topic
    # it is equivalent to `app.topic(topic.start)`
    print("hello")
    topics_dict[topic] = app.topic(topic)
    agents_dict[topic] = app.agent(channel=topics_dict[topic], name=f"{topic}_agent")(agent)
    agents_dict[topic].start()

谢谢大家。

于 2021-02-25T21:03:52.830 回答