我正在尝试以编程方式构建代理,并且一直在关注这个示例。当我查看在faust -A <> agents
脚本开始时生成的主题和代理时,我可以看到它们。代理名称类似于{topic}_agent
示例。所有这些代理分别接触到一些 rest api。这是代码。
def create_agent(next_topic, model_metadata: Dict):
"""
creation of a single agent.
`start_topic`: str
Just a string that you can use in other functions
to figure out how messages in that topic can be
transformed
`next_topic`: faust.topics.Topic
A faust `app.topic` instance
"""
async def agent(stream):
"""Call domino model and return response
"""
async for message in stream:
domino_req = {"data": message.asdict()}
app.logger.info(domino_req)
response = requests.post(
model_metadata['url'],
auth=(
model_metadata['auth'],
model_metadata['auth'],
),
json=domino_req,
)
if response.status_code == 200:
answer = response.json()
await next_topic.send(answer['result'])
else:
app.logger.error(response.reason)
# app.logger.info(f"NEW Agent Created: ## Agent - {consumer} ##")
return agent
def agents(registry_response):
"""
configuration of multiple agents
"""
agents = []
for key, value in registry_response.items():
""" `topic`: app.topic instance """
agents.append(
# `topic.start`: str
# `topic.next`: faust.topics.Topic
(create_agent(next_topic= all_responses, model_metadata = value),key)
)
return agents
def attach_agent(agent, topic):
""" Attach the agent to the Faust app """
# `topic.faust`: faust.topics.Topic
# it is equivalent to `app.topic(topic.start)`
print("hello")
app.agent(channel=app.topic(topic), name=f"{topic}_agent")(agent)
# new_agent.start()
# app.logger.info("hello")
# app.logger.info(new_agent)
# app.logger.info(new_agent.info())
# app.logger.info(app.agents)
# @app.task
# async def get_model_registry():
# """
# Create topics and agents for the initial set of models present in the registry
# """
app.logger.info('APP STARTED')
app.logger.info('Fetching Models from Model Registry')
#TODO: Call the Model Registry and process it
#Just mocking the registry
registry_response = initial_model_registry_metadata
app.logger.info(f'Number of Models Found {len(registry_response)}')
for agent,topic in agents(registry_response):
attach_agent(agent, topic)
@app.page('/model/{key_ai_model}')
class frontdoor(View):
async def get(self, request: Request) -> Response:
return self.json({'key_ai_model': key_ai_model})
async def post(self, request: Request, key_ai_model: str) -> Response:
request_id = str(uuid.uuid4())
src = await request.json()
msg = GdeltRequest(**src)
app.logger.info(msg)
await gdelt_agent.cast(msg, key=request_id)
return self.json({'request_id': request_id, 'key_ai_model': key_ai_model})
@app.agent(all_responses)
async def print_responses(stream):
async for message in stream:
print(message)
但是,当我发布一些东西时,我收到了一个错误 - NameError: name 'gdelt_agent' is not defined
。这里的任何帮助将不胜感激。