0

我有一个 python 脚本,它将:

  1. 遍历给定目录中的 rosbags
  2. 每个非图像消息直接复制到新包中,每个图像消息将被压缩并添加到新包中

我决定制作这个脚本的新版本以加快速度。我的新脚本可以:

  1. 遍历给定目录中的 rosbags
  2. 每条非图片消息直接复制到新包中
  3. 每个图像消息都添加到队列中
  4. 处理完包中的所有消息后,我创建从队列中读取、压缩图像并将其添加到完成列表的进程。
  5. 循环完成列表以将压缩图像添加到新包中

我创建的代码有一些问题。

  1. 只有一个 CPU 似乎处于 100% 使用率。
  2. 当它移动到下一个 rosbag 时,内存使用率仍然很高。好像有些东西没有被删除。

我不相信需要 ROS 知识来解决这个问题。我是否正确使用了多处理库?

import glob
import os
import argparse
from datetime import datetime
import numpy as np
import cv2
import sys
import copy

import rosbag
import rospy
from sensor_msgs.msg import CompressedImage
from cv_bridge import CvBridge
from multiprocessing import Process, Manager, Queue, cpu_count, queues

# Image constants
IMG_W = 1280
IMG_H = 1024
NUM_CHANNEL = 3

bridge = CvBridge()

parser = argparse.ArgumentParser(
    description='Creates a bag containing the image data')

parser.add_argument(
    'input_dirs',
    nargs='+',
    help='Input and Output directory for bag files.')

args = parser.parse_args()

queue = Queue()
manager = Manager()
converted_imgs = manager.list()

bag_path=''
out_path=''

if len(args.input_dirs) == 2:
    bag_path = args.input_dirs[0]
    out_path = args.input_dirs[1]
    os.chdir(bag_path)
else:
    print('Please enter the input and output bag path.')
    exit()

bag_names = []
# Add all bags in bag_path to bag_names
for file in sorted(os.listdir(bag_path)):
    if file.endswith(".bag"):
        bag_names.append(file)

camera_image_topics = {
    '/cam_a/image_raw': '/cam_a/compressed',
    '/cam_b/image_raw': '/cam_b/compressed',
    '/cam_c/image_raw': '/cam_c/compressed'
}

# Display progress function
# Prints out the current progress on the same line
def display_progress(progress):
    sys.stdout.write("\r" + 'PROGRESS ' + format(progress, '.2f') + '%')
    sys.stdout.flush()

# Convert each Image to CompressedImage
def create_msg(img_total):
    global queue
    global converted_imgs

    while not queue.empty():
        topic, raw_data, t = queue.get()

        # Display progress
        progress = ((img_total - queue.qsize()) * 100.00) / img_total
        display_progress(progress)

        # Create CompressedImage msg and compress the image
        cmprs_msg = CompressedImage()
        cmprs_msg.format = "png"
        cmprs_msg.data = np.array(cv2.imencode('.png', raw_data, [int(cv2.IMWRITE_PNG_COMPRESSION), 3])[1]).tostring()
        cmprs_msg.header.stamp = t

        # Add message to be written to the outbag
        converted_imgs.append((topic, cmprs_msg))
    return

for i in range(len(bag_names)):
    print('Reading bag ' + bag_names[i])

    with rosbag.Bag(out_path + 'compressed_' + bag_names[i], 'w') as outbag:
        count = 0.0  # Init as float
        msg_total = rosbag.Bag(bag_names[i]).get_message_count()

        print('Copying non image ros messages into outbag,')
        print('also adding image messages to the queue')
        # print('Copying msgs and compressing images into outbag.')
        for topic, msg, t in rosbag.Bag(bag_names[i]).read_messages():
            if topic not in camera_image_topics:
                outbag.write(topic, msg, t)
            else:
                # Convert image data to numpy matrix and add to queue
                cv2_matrix = bridge.imgmsg_to_cv2(msg, msg.encoding)
                raw_data = np.asarray(cv2_matrix).reshape((IMG_H, IMG_W, NUM_CHANNEL))
                queue.put((camera_image_topics[topic], raw_data, t))

            # Increase count of finished messages
            count += 1

            # Display progress
            progress = (count/msg_total) * 100
            display_progress(progress)
        # Finished
        print(' [DONE]')

        img_total = queue.qsize()

        print('Compressing images within the queue')
        processes = []
        for cpu_num in xrange(cpu_count()):
            p = Process(target=create_msg, args=(img_total,))
            processes.append(p)
        for p in processes:
            p.start()

        # Finished
        for p in processes:
            p.join()
        print(' [DONE]')

        print('Adding images to the outbag')
        for topic, msg in converted_imgs:
            outbag.write(topic, msg, msg.header.stamp)

更新 1:按要求提供 MVP。它似乎使用所有核心......

from multiprocessing import Process, Manager, Queue, cpu_count, queues

queue = Queue()
manager = Manager()
converted_imgs = manager.list()


# Convert each Image to CompressedImage
def create_msg(img_total):
    global queue
    global converted_imgs

    while not queue.empty():
        x = queue.get()

        # Add message to be written to the outbag
        converted_imgs.append(x ** 2)
    return

print('Add to queue')
for x in range(0, 100000):
    queue.put(10)

img_total = queue.qsize()

print('Calculate square of each queue member')
processes = []
for cpu_num in xrange(cpu_count()):
    p = Process(target=create_msg, args=(img_total,))
    processes.append(p)
for p in processes:
    p.start()

# Finished
for p in processes:
    p.join()
print(' [DONE]')

print('Adding images to the outbag')
for x in converted_imgs:
    print(x)

更新2:我认为我写的代码是正确的。正如罗斯在评论中所说,它很可能在队列中序列化。我添加了评论以确认他写的内容。看起来只使用了两个进程,然后降为一个。我认为对象太大,无法使程序具有多个进程。

Copying non image ros messages into outbag,
also adding image messages to the queue
PROGRESS 100.00% [DONE]
Compressing images within the queue
Process 0
Process: 0 Queue size: 1472
Process 1
Process 2
Process: 2 Queue size: 1471
Process 3
Process: 0 Queue size: 1470
Process: 2 Queue size: 1469
Process: 0 Queue size: 1468
Process: 2 Queue size: 1467
Process: 0 Queue size: 1466
Process: 2 Queue size: 1465
Process: 0 Queue size: 1464
Process: 2 Queue size: 1463
Process: 0 Queue size: 1462
Process: 2 Queue size: 1461
Process: 0 Queue size: 1460
Process: 2 Queue size: 1459
Process: 0 Queue size: 1458
Process: 2 Queue size: 1457
Process: 0 Queue size: 1456
Process: 2 Queue size: 1455
Process: 0 Queue size: 1454
Process: 2 Queue size: 1453
Process: 0 Queue size: 1452
Process: 2 Queue size: 1451
Process: 0 Queue size: 1450
Process: 2 Queue size: 1449
Process: 0 Queue size: 1448
Process: 2 Queue size: 1447
Process: 0 Queue size: 1446
Process: 2 Queue size: 1445
Process: 0 Queue size: 1444
Process: 2 Queue size: 1443
Process: 0 Queue size: 1442
Process: 2 Queue size: 1441
Process: 0 Queue size: 1440
Process: 2 Queue size: 1439
Process: 0 Queue size: 1438
Process: 2 Queue size: 1437
Process: 0 Queue size: 1436
Process: 2 Queue size: 1435
Process: 0 Queue size: 1434
Process: 2 Queue size: 1433
Process: 0 Queue size: 1432
Process: 2 Queue size: 1431
Process: 0 Queue size: 1430
Process: 2 Queue size: 1429
Process: 0 Queue size: 1428
Process: 2 Queue size: 1427
Process: 0 Queue size: 1426
Process: 2 Queue size: 1425
Process: 0 Queue size: 1424
Process: 2 Queue size: 1423
Process: 0 Queue size: 1422
Process: 2 Queue size: 1421
Process: 0 Queue size: 1420
Process: 2 Queue size: 1419
Process: 0 Queue size: 1418
Process: 2 Queue size: 1417
Process: 0 Queue size: 1416
Process: 2 Queue size: 1415
Process: 0 Queue size: 1414
Process: 2 Queue size: 1413
Process: 0 Queue size: 1412
Process: 2 Queue size: 1411
Process: 0 Queue size: 1410
Process: 2 Queue size: 1409
Process: 0 Queue size: 1408
Process: 2 Queue size: 1407
Process: 0 Queue size: 1406
Process: 2 Queue size: 1405
Process: 0 Queue size: 1404
Process: 2 Queue size: 1403
Process: 0 Queue size: 1402
Process: 2 Queue size: 1401
Process: 0 Queue size: 1400
Process: 2 Queue size: 1399
Process: 0 Queue size: 1398
Process: 2 Queue size: 1397
Process: 0 Queue size: 1396
Process: 2 Queue size: 1395
Process: 0 Queue size: 1394
Process: 2 Queue size: 1393
Process: 0 Queue size: 1392
Process: 2 Queue size: 1391
Process: 0 Queue size: 1390
Process: 2 Queue size: 1389
Process: 0 Queue size: 1388
Process: 2 Queue size: 1387
Process: 0 Queue size: 1386
Process: 2 Queue size: 1385
Process: 0 Queue size: 1384
Process: 2 Queue size: 1383
Process: 0 Queue size: 1382
Process: 2 Queue size: 1381
Process: 0 Queue size: 1380
Process: 2 Queue size: 1379
Process: 0 Queue size: 1378
Process: 2 Queue size: 1377
Process: 0 Queue size: 1376
Process: 2 Queue size: 1375
Process: 0 Queue size: 1374
Process: 2 Queue size: 1373
Process: 0 Queue size: 1372
Process: 2 Queue size: 1371
Process: 0 Queue size: 1370
Process: 2 Queue size: 1369
Process: 0 Queue size: 1368
Process: 2 Queue size: 1367
Process: 0 Queue size: 1366
Process: 2 Queue size: 1365
Process: 0 Queue size: 1364
Process: 2 Queue size: 1363
Process: 0 Queue size: 1362
Process: 2 Queue size: 1361
Process: 0 Queue size: 1360
Process: 2 Queue size: 1359
Process: 0 Queue size: 1358
Process: 2 Queue size: 1357
Process: 0 Queue size: 1356
Process: 2 Queue size: 1355
Process: 0 Queue size: 1354
Process: 2 Queue size: 1353
Process: 0 Queue size: 1352
Process: 2 Queue size: 1351
Process: 0 Queue size: 1350
Process: 2 Queue size: 1349
Process: 0 Queue size: 1348
Process: 2 Queue size: 1347
Process: 0 Queue size: 1346
Process: 2 Queue size: 1345
Process: 0 Queue size: 1344
Process: 2 Queue size: 1343
Process: 0 Queue size: 1342
Process: 2 Queue size: 1341
Process: 0 Queue size: 1340
Process: 2 Queue size: 1339
Process: 0 Queue size: 1338
Process: 2 Queue size: 1337
Process: 0 Queue size: 1336
Process: 2 Queue size: 1335
Process: 0 Queue size: 1334
Process: 2 Queue size: 1333
Process: 0 Queue size: 1332
Process: 2 Queue size: 1331
Process: 0 Queue size: 1330
Process: 2 Queue size: 1329
Process: 0 Queue size: 1328
Process: 2 Queue size: 1327
Process: 0 Queue size: 1326
Process: 2 Queue size: 1325
Process: 0 Queue size: 1324
Process: 2 Queue size: 1323
Process: 0 Queue size: 1322
Process: 2 Queue size: 1321
Process: 0 Queue size: 1320
Process: 2 Queue size: 1319
Process: 0 Queue size: 1318
Process: 2 Queue size: 1317
Process: 0 Queue size: 1316
Process: 2 Queue size: 1315
Process: 0 Queue size: 1314
Process: 2 Queue size: 1313
Process: 0 Queue size: 1312
Process: 2 Queue size: 1311
Process: 0 Queue size: 1310
Process: 2 Queue size: 1309
Process: 0 Queue size: 1308
Process: 2 Queue size: 1307
Process: 0 Queue size: 1306
Process: 2 Queue size: 1305
Process: 0 Queue size: 1304
Process: 2 Queue size: 1303
Process: 0 Queue size: 1302
Process: 2 Queue size: 1301
Process: 0 Queue size: 1300
Process: 2 Queue size: 1299
Process: 0 Queue size: 1298
Process: 2 Queue size: 1297
Process: 0 Queue size: 1296
Process: 2 Queue size: 1295
Process: 0 Queue size: 1294
Process: 2 Queue size: 1293
Process: 0 Queue size: 1292
Process: 2 Queue size: 1291
Process: 0 Queue size: 1290
Process: 2 Queue size: 1289
Process: 0 Queue size: 1288
Process: 2 Queue size: 1287
Process: 0 Queue size: 1286
Process: 2 Queue size: 1285
Process: 0 Queue size: 1284
Process: 2 Queue size: 1283
Process: 0 Queue size: 1282
Process: 2 Queue size: 1281
Process: 0 Queue size: 1280
Process: 2 Queue size: 1279
Process: 0 Queue size: 1278
Process: 2 Queue size: 1277
Process: 0 Queue size: 1276
Process: 2 Queue size: 1275
Process: 0 Queue size: 1274
Process: 2 Queue size: 1273
Process: 0 Queue size: 1272
Process: 2 Queue size: 1271
Process: 0 Queue size: 1270
Process: 2 Queue size: 1269
Process: 0 Queue size: 1268
Process: 2 Queue size: 1267
Process: 0 Queue size: 1266
Process: 2 Queue size: 1265
Process: 0 Queue size: 1264
Process: 2 Queue size: 1263
Process: 0 Queue size: 1262
Process: 2 Queue size: 1261
Process: 0 Queue size: 1260
Process: 2 Queue size: 1259
Process: 0 Queue size: 1258
Process: 2 Queue size: 1257
Process: 0 Queue size: 1256
Process: 2 Queue size: 1255
Process: 0 Queue size: 1254
Process: 2 Queue size: 1253
Process: 0 Queue size: 1252
Process: 2 Queue size: 1251
Process: 0 Queue size: 1250
Process: 2 Queue size: 1249
Process: 0 Queue size: 1248
Process: 2 Queue size: 1247
Process: 0 Queue size: 1246
Process: 2 Queue size: 1245
Process: 0 Queue size: 1244
Process: 2 Queue size: 1243
Process: 0 Queue size: 1242
Process: 2 Queue size: 1241
Process: 0 Queue size: 1240
Process: 2 Queue size: 1239
Process: 0 Queue size: 1238
Process: 2 Queue size: 1237
Process: 0 Queue size: 1236
Process: 2 Queue size: 1235
Process: 0 Queue size: 1234
Process: 2 Queue size: 1233
Process: 0 Queue size: 1232
Process: 2 Queue size: 1231
Process: 0 Queue size: 1230
Process: 2 Queue size: 1229
Process: 0 Queue size: 1228
Process: 2 Queue size: 1227
Process: 0 Queue size: 1226
Process: 2 Queue size: 1225
Process: 0 Queue size: 1224
Process: 2 Queue size: 1223
Process: 0 Queue size: 1222
Process: 2 Queue size: 1221
Process: 0 Queue size: 1220
Process: 2 Queue size: 1219
Process: 0 Queue size: 1218
Process: 2 Queue size: 1217
Process: 0 Queue size: 1216
Process: 2 Queue size: 1215
Process: 0 Queue size: 1214
Process: 2 Queue size: 1213
Process: 0 Queue size: 1212
Process: 2 Queue size: 1211
Process: 0 Queue size: 1210
Process: 2 Queue size: 1209
Process: 0 Queue size: 1208
Process: 2 Queue size: 1207
Process: 0 Queue size: 1206
Process: 2 Queue size: 1205
Process: 0 Queue size: 1204
Process: 2 Queue size: 1203
Process: 0 Queue size: 1202
Process: 2 Queue size: 1201
Process: 0 Queue size: 1200
Process: 2 Queue size: 1199
Process: 0 Queue size: 1198
Process: 2 Queue size: 1197
Process: 0 Queue size: 1196
Process: 2 Queue size: 1195
Process: 0 Queue size: 1194
Process: 2 Queue size: 1193
Process: 0 Queue size: 1192
Process: 2 Queue size: 1191
Process: 0 Queue size: 1190
Process: 2 Queue size: 1189
Process: 0 Queue size: 1188
Process: 2 Queue size: 1187
Process: 0 Queue size: 1186
Process: 2 Queue size: 1185
Process: 0 Queue size: 1184
Process: 2 Queue size: 1183
Process: 0 Queue size: 1182
Process: 2 Queue size: 1181
Process: 0 Queue size: 1180
Process: 2 Queue size: 1179
Process: 0 Queue size: 1178
Process: 2 Queue size: 1177
Process: 0 Queue size: 1176
Process: 2 Queue size: 1175
Process: 0 Queue size: 1174
Process: 2 Queue size: 1173
Process: 0 Queue size: 1172
Process: 2 Queue size: 1171
Process: 0 Queue size: 1170
Process: 2 Queue size: 1169
Process: 0 Queue size: 1168
Process: 2 Queue size: 1167
Process: 0 Queue size: 1166
Process: 2 Queue size: 1165
Process: 0 Queue size: 1164
Process: 2 Queue size: 1163
Process: 0 Queue size: 1162
Process: 2 Queue size: 1161
Process: 0 Queue size: 1160
Process: 2 Queue size: 1159
Process: 0 Queue size: 1158
Process: 2 Queue size: 1157
Process: 0 Queue size: 1156
Process: 2 Queue size: 1155
Process: 0 Queue size: 1154
Process: 2 Queue size: 1153
Process: 0 Queue size: 1152
Process: 2 Queue size: 1151
Process: 0 Queue size: 1150
Process: 2 Queue size: 1149
Process: 0 Queue size: 1148
Process: 2 Queue size: 1147
Process: 0 Queue size: 1146
Process: 2 Queue size: 1145
Process: 0 Queue size: 1144
Process: 2 Queue size: 1143
Process: 0 Queue size: 1142
Process: 2 Queue size: 1141
Process: 0 Queue size: 1140
Process: 2 Queue size: 1139
Process: 0 Queue size: 1138
Process: 2 Queue size: 1137
Process: 0 Queue size: 1136
Process: 2 Queue size: 1135
Process: 0 Queue size: 1134
Process: 2 Queue size: 1133
Process: 0 Queue size: 1132
Process: 2 Queue size: 1131
Process: 0 Queue size: 1130
Process: 2 Queue size: 1129
Process: 0 Queue size: 1128
Process: 2 Queue size: 1127
Process: 0 Queue size: 1126
Process: 2 Queue size: 1125
Process: 0 Queue size: 1124
Process: 2 Queue size: 1123
Process: 0 Queue size: 1122
Process: 2 Queue size: 1121
Process: 0 Queue size: 1120
Process: 2 Queue size: 1119
Process: 0 Queue size: 1118
Process: 2 Queue size: 1117
Process: 0 Queue size: 1116
Process: 2 Queue size: 1115
Process: 0 Queue size: 1114
Process: 2 Queue size: 1113
Process: 0 Queue size: 1112
Process: 2 Queue size: 1111
Process: 0 Queue size: 1110
Process: 2 Queue size: 1109
Process: 0 Queue size: 1108
Process: 2 Queue size: 1107
Process: 0 Queue size: 1106
Process: 2 Queue size: 1105
Process: 0 Queue size: 1104
Process: 2 Queue size: 1103
Process: 0 Queue size: 1102
Process: 2 Queue size: 1101
Process: 0 Queue size: 1100
Process: 2 Queue size: 1099
Process: 0 Queue size: 1098
Process: 2 Queue size: 1097
Process: 0 Queue size: 1096
Process: 2 Queue size: 1095
Process: 0 Queue size: 1094
Process: 2 Queue size: 1093
Process: 0 Queue size: 1092
Process: 2 Queue size: 1091
Process: 0 Queue size: 1090
Process: 2 Queue size: 1089
Process: 0 Queue size: 1088
Process: 2 Queue size: 1087
Process: 0 Queue size: 1086
Process: 2 Queue size: 1085
Process: 0 Queue size: 1084
Process: 2 Queue size: 1083
Process: 0 Queue size: 1082
Process: 2 Queue size: 1081
Process: 0 Queue size: 1080
Process: 2 Queue size: 1079
Process: 0 Queue size: 1078
Process: 2 Queue size: 1077
Process: 0 Queue size: 1076
Process: 2 Queue size: 1075
Process: 0 Queue size: 1074
Process: 2 Queue size: 1073
Process: 0 Queue size: 1072
Process: 2 Queue size: 1071
Process: 0 Queue size: 1070
Process: 2 Queue size: 1069
Process: 0 Queue size: 1068
Process: 2 Queue size: 1067
Process: 0 Queue size: 1066
Process: 2 Queue size: 1065
Process: 0 Queue size: 1064
Process: 2 Queue size: 1063
Process: 0 Queue size: 1062
Process: 2 Queue size: 1061
Process: 0 Queue size: 1060
Process: 2 Queue size: 1059
Process: 0 Queue size: 1058
Process: 2 Queue size: 1057
Process: 0 Queue size: 1056
Process: 2 Queue size: 1055
Process: 0 Queue size: 1054
Process: 2 Queue size: 1053
Process: 0 Queue size: 1052
Process: 2 Queue size: 1051
Process: 2 Queue size: 1050
Process: 2 Queue size: 1049
Process: 2 Queue size: 1048
Process: 2 Queue size: 1047
Process: 2 Queue size: 1046
Process: 2 Queue size: 1045
Process: 2 Queue size: 1044
Process: 2 Queue size: 1043
Process: 2 Queue size: 1042
Process: 2 Queue size: 1041
Process: 2 Queue size: 1040
Process: 2 Queue size: 1039
Process: 2 Queue size: 1038
Process: 2 Queue size: 1037
Process: 2 Queue size: 1036
Process: 2 Queue size: 1035
Process: 2 Queue size: 1034
Process: 2 Queue size: 1033
Process: 2 Queue size: 1032
Process: 2 Queue size: 1031
Process: 2 Queue size: 1030
Process: 2 Queue size: 1029
Process: 2 Queue size: 1028
Process: 2 Queue size: 1027
Process: 2 Queue size: 1026
Process: 2 Queue size: 1025
Process: 2 Queue size: 1024
Process: 2 Queue size: 1023
Process: 2 Queue size: 1022
Process: 2 Queue size: 1021
Process: 2 Queue size: 1020
Process: 2 Queue size: 1019
Process: 2 Queue size: 1018
Process: 2 Queue size: 1017
Process: 2 Queue size: 1016
Process: 2 Queue size: 1015
Process: 2 Queue size: 1014
Process: 2 Queue size: 1013
Process: 2 Queue size: 1012
Process: 2 Queue size: 1011
Process: 2 Queue size: 1010
Process: 2 Queue size: 1009
Process: 2 Queue size: 1008
Process: 2 Queue size: 1007
Process: 2 Queue size: 1006
Process: 2 Queue size: 1005
Process: 2 Queue size: 1004
Process: 2 Queue size: 1003
Process: 2 Queue size: 1002
Process: 2 Queue size: 1001
Process: 2 Queue size: 1000
Process: 2 Queue size: 999
Process: 2 Queue size: 998
Process: 2 Queue size: 997
Process: 2 Queue size: 996
Process: 2 Queue size: 995
Process: 2 Queue size: 994
Process: 2 Queue size: 993
Process: 2 Queue size: 992
Process: 2 Queue size: 991
Process: 2 Queue size: 990
Process: 2 Queue size: 989
Process: 2 Queue size: 988
Process: 2 Queue size: 987
Process: 2 Queue size: 986
Process: 2 Queue size: 985
Process: 2 Queue size: 984
Process: 2 Queue size: 983
Process: 2 Queue size: 982
Process: 2 Queue size: 981
Process: 2 Queue size: 980
Process: 2 Queue size: 979
Process: 2 Queue size: 978
Process: 2 Queue size: 977
Process: 2 Queue size: 976
Process: 2 Queue size: 975
Process: 2 Queue size: 974
Process: 2 Queue size: 973
Process: 2 Queue size: 972
Process: 2 Queue size: 971
Process: 2 Queue size: 970
Process: 2 Queue size: 969
Process: 2 Queue size: 968
Process: 2 Queue size: 967
Process: 2 Queue size: 966
Process: 2 Queue size: 965
Process: 2 Queue size: 964
Process: 2 Queue size: 963
Process: 2 Queue size: 962
Process: 2 Queue size: 961
Process: 2 Queue size: 960
Process: 2 Queue size: 959
Process: 2 Queue size: 958
Process: 2 Queue size: 957
Process: 2 Queue size: 956
Process: 2 Queue size: 955
Process: 2 Queue size: 954
Process: 2 Queue size: 953
...
Process: 2 Queue size: 16
Process: 2 Queue size: 15
Process: 2 Queue size: 14
Process: 2 Queue size: 13
Process: 2 Queue size: 12
Process: 2 Queue size: 11
Process: 2 Queue size: 10
Process: 2 Queue size: 9
Process: 2 Queue size: 8
Process: 2 Queue size: 7
Process: 2 Queue size: 6
Process: 2 Queue size: 5
Process: 2 Queue size: 4
Process: 2 Queue size: 3
Process: 2 Queue size: 2
Process: 2 Queue size: 1
Process: 2 Queue size: 0
 [DONE]
4

1 回答 1

1

I'm not particularly familiar with how global, queue and processes interact, but it looks like that may be your bottleneck. Try having your processes return values to the main thread via pipe!

# Convert each Image to CompressedImage  
def create_msg(img_total, pipe):
# You won't need these:
# global queue
# global converted_imgs

    while not YOUR_COUNTER():
        x = queue.get()
        # Add message to be written to the outbag
        pipe.send(x ** 2)
    return

# create a list to keep all processes
processes = []
# create a list to keep connections
parent_connections = []
for j in range(cpu_count()):
    # create a pipe for communication
    parent_conn, child_conn = Pipe()
    parent_connections.append(parent_conn)
    process_args = (YOUR_ARGS, child_conn)
    # create the process, pass instance and connection
    process = Process(target=create_msg, args=(process_args,))
    processes.append(process)
    # start all processes
for process in processes:
    process.start()

# make sure that all processes have finished
for process in processes:
    process.join()
for parent_connection in parent_connections:
    while parent_connection.poll():
        recv = parent_connection.recv()
        # Redefine this list somewhere else to use
        converted_imgs.append(recv)
于 2018-04-09T01:25:19.847 回答