我正在尝试从 pulsar 读取数据并在 Python 中使用 asyncio 和 asyncpg 写入 postgres 表。
import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
class Connection:
loop = asyncio.get_event_loop()
@classmethod
async def connect_persist_postgresql(cls, data):
connection = await asyncpg.connect(
user='postgres,
password='password_postgres',
database='postgres',
host='10.xxx.xxx.xx',
port=5432
)
query = '''INSERT INTO {}.{} (
uid,
col1,
col2,
col3,
col4
)
VALUES (
$1,
$2,
$3,
$4,
$5,
)'''
if isinstance(data, list):
for event in data:
await connection.execute(
query,
'schema',
'table_test',
event['uid'],
event['col1'],
event['col2'],
event['col3'],
event['col4'],
)
else:
await connection.execute(
query,
'schema',
'table_test',
data[0]['uid'],
data[0]['col1'],
data[0]['col2'],
data[0]['col3'],
data[0]['col4']
)
await connection.close()
然后我作为消费者连接到 pulsar,获取数据并尝试使用下一行写入 postgresql 表:def run_consumer():
client = pulsar.Client(
'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql',
'test-subs')
while True:
msg = consumer.receive()
unpacked_list = []
loop = asyncio.get_event_loop()
try:
my_new_string_value = msg.data()
unpacked_list.append(my_new_string_value)
loop.run_until_complete(connect_persist_postgresql(unpacked_list))
except Exception as a:
print(a)
但我收到一个错误:This event loop is already running
欢迎任何提示,在此先感谢。