5

我正在尝试使用编译一些Python 3.3代码cx_freeze,编译后生成的test.exe文件将创建无限数量的程序实例,导致我的 Windows 7 系统变得不稳定。它在 Python 中运行时按预期工作,但一旦编译它就会导致问题。这是我在主脚本中的导入:

import sys
from multiprocessing import Pool, Queue
from threading import Thread
from time import sleep, time
from inspect import getmembers
from PyQt5 import QtWidgets, QtCore, QtGui
from main_ui import Ui_MainWindow           # Generated UI from pyuic, imports 
                                            # QtWidgets, QtCore, and QtGui
from devices import Device1, Device2        # Both are serial.Serial objects

setup.py脚本:

import sys
from cx_Freeze import setup, Executable

product_name = 'Product Name'

path_platforms = ("C:\Python33\Lib\site-packages\PyQt5\plugins\platforms\qwindows.dll",
                  "platforms\qwindows.dll")

includes = ['PyQt5.QtWidgets', 'PyQt5.QtCore', 'PyQt5.QtGui']
include_files = [path_platforms]
excludes = ['_gtkagg', '_tkagg', 'bsddb', 'curses', 'email', 'pywin.debugger',
            'pywin.debugger.dbgcon', 'pywin.dialogs', 'tcl',
            'Tkconstants', 'Tkinter']
packages = ['os']
path = []

bdist_msi_options = {'add_to_path':   False}

build_exe_options = {'includes':      includes,
                     'include_files': include_files,
                     'excludes':      excludes,
                     'packages':      packages,
                     'path':          path,
                     'silent':        True}

base = None
if sys.platform == 'win32':
    base = 'Win32GUI'

exe = Executable(script='main.pyw',
                 base=base,
                 targetName='test.exe')

setup(name=product_name,
      version='1.0',
      description='The Test Program',
      executables=[exe],
      options = {'bdist_msi': bdist_msi_options, 'build_exe': build_exe_options})

当我运行时python setup.py build,会出现以下错误:

Missing modules:
? System imported from serial.serialcli
? TERMIOS imported from serial.serialposix
? __main__ imported from bdb
? _gestalt imported from platform
? _posixsubprocess imported from subprocess
? clr imported from serial.serialcli

尽管有这些错误,它仍然会生成一个test.exe文件。当我执行它时,它会生成看似无限数量的窗口,阻止它的唯一方法是硬重置计算机。同样,主脚本在 Python 下运行良好,但编译后失败。任何帮助将不胜感激!

编辑:根据要求,这是我的主要脚本:

import sys
from multiprocessing import Pool, Queue, freeze_support
from threading import Thread
from time import sleep, time
from inspect import getmembers
from PyQt5 import QtWidgets, QtCore, QtGui
from main_ui import Ui_MainWindow           # Generated by pyuic
import parts                                # Imports time.sleep, datetime.datetime, and threading.Thread
from devices import GwPowerSupply, DataQ    # Imports time.sleep and serial.Serial
# GwPowerSupply is a serial.Serial object to handle communications with a GwInstek PSP-603
# DataQ is also a serial.Serial object to handle communications with a DataQ-155


def file_logger(message):
    logging = True
    if logging:
        with open('log.txt', 'a') as f:
            f.write('{}: {}\n'.format(time(), message))


def compute():
    """
    A function, designed as an independent process, to gather data from the DataQ and Power Supply
    input queues, convert to human values, and output as a single queue
    """
    compute.running = True
    compute.paused = False
    # The initial dict to pass on to the queue
    data_dict = {'upstream': 0, 'downstream': 0, 'high_flow': 0, 'low_flow': 0, 'voltage': 0, 'current': 0, 'offset': 0}
    while compute.running:
        if compute.paused or compute.input_queue.empty():
            continue
        # Get the raw voltage data and convert to pressure/flow
        analog_input = compute.input_queue.get()
        file_logger('Compute received {}'.format(analog_input))
        if analog_input is None:
            continue
        # Four items comes from the DataQ for pressures and flow
        if len(analog_input) == 4:
            # Pressure Transducers are both 1-11V, 0-500 PSI
            if isinstance(analog_input[0], (float, int)):
                data_dict['upstream'] = (analog_input[0]-1) * 50
            if isinstance(analog_input[1], (float, int)):
                data_dict['downstream'] = (analog_input[1]-1) * 50
            # High Flow is 0-5V, 0-1000 Liters/min
            if isinstance(analog_input[2], (float, int)):
                data_dict['high_flow'] = (analog_input[2]*200) * .035147     # Convert SLM to SCFM
            # Low Flow is 0-5V, 0-5 Liters/min
            if isinstance(analog_input[3], (float, int)):
                data_dict['low_flow'] = analog_input[3] * 1000               # Convert SLM to SCCM
        # Two items are from the power supply for voltage and current
        elif len(analog_input) == 2:
            if isinstance(analog_input[0], (float, int)):
                data_dict['voltage'] = analog_input[0]
            if isinstance(analog_input[1], (float, int)):
                data_dict['current'] = analog_input[1]
        # A single item is the offset from the Valve program
        elif len(analog_input) == 1:
            data_dict['offset'] = analog_input[0]
        else:
            return
        compute.output_queue.put(data_dict)
        file_logger('Compute put out {}'.format(data_dict))


def data_q_producer():
    """
    A function, designed as an independent process, to gather data from the DataQ and feed it
    to the computing process
    """
    # Initialize COM port
    data_q = DataQ('COM4')
    data_q.start()
    # Continuously gather data
    while True:
        if not data_q.paused and not data_q.stopped:
            # Gather data and put to queue, either for response or normal
            file_logger('Getting Data from DataQ')
            if data_q.response:
                data = data_q.get_response_data()
                data_q_producer.response_queue.put(data)
            else:
                data = data_q.get_data()
                data_q_producer.queue.put(data)
            file_logger('Got {} from DataQ'.format(data))
        # If a command is received, such as to energize a relay, handle
        if not data_q_producer.output.empty():
            output = data_q_producer.output.get()
            file_logger('Sending {} to DataQ'.format(output))
            # Strings are to stop, run response, etc.
            if isinstance(output, str):
                if output == 'stop':
                    data_q.set_output(0, 0, 0, 0)
                    data_q.stop()
                    data_q.close()
                    data_q_producer.queue.put([])
                    return
                elif output == 'start resp':
                    data_q.response = True
                    data_q.pause()
                    data_q.start_resp()
                    data_q.start()
                elif output == 'stop resp':
                    print('Stopping Response Test')
                    data_q.pause()
                    data_q.setup()
                    data_q.start()
                    data_q.response = False
            # If a single integer is received, it is the new leakage offset.
            elif isinstance(output, float):
                data_q_producer.queue.put([output, ])
            # A List is to set the digital outputs
            elif isinstance(output, list):
                data_q.set_output(output[0], output[1], output[2], output[3])


def pps_producer():
    """
    A function, designed as an independent process, to gather data from the Power Supply and feed it
    to the computing process
    """
    # Initialize COM port
    pps = GwPowerSupply('COM1')
    pps.set_relay(True)
    # Continuously gather voltage and current readings
    while True:
        file_logger('Getting Data from Power Supply')
        voltage = pps.get_value('V')
        current = pps.get_value('A')
        file_logger('Got {}V, {}A from power supply'.format(voltage, current))
        pps_producer.queue.put([voltage, current])
        # If a command is received to change voltage, current, etc.; handle
        if not pps_producer.output.empty():
            output = pps_producer.output.get()
            file_logger('Got {} for Power Supply'.format(output))
            # Bool is to set the relay on or off
            if isinstance(output, bool):
                pps.set_relay(output)
            # String is primarily to stop the power supply (set the relay to Off)
            elif isinstance(output, str) and output == 'stop':
                pps.set_relay(False)
                pps.close()
                pps_producer.queue.put([])
                return
            # A tuple is changing a power supply output setting
            else:
                pps.set_value(output[0], output[1])


def pool_init(input_queue, output_queue, data_q_out, pps_out, response_queue):
    """
    Initializes the above functions with external queue variables.
    see http://stackoverflow.com/a/3843313/852994 for more details
    """
    compute.output_queue = output_queue
    compute.input_queue = input_queue

    data_q_producer.queue = input_queue
    data_q_producer.output = data_q_out
    data_q_producer.response_queue = response_queue

    pps_producer.queue = input_queue
    pps_producer.output = pps_out


class MainGui(QtWidgets.QMainWindow):
    """
    The Main interface builder for the program
    """
    def __init__(self):
        # Initialize MainGui and create the window
        super(MainGui, self).__init__()
        self.ui = Ui_MainWindow()
        self.ui.setupUi(self)

        # The current valve part being tested
        self.valve = None
        # Disables the 'Energize' button when running ATP
        self.auto_mode = False

        # The current measured leakage offset based on the current run's test
        self.measured_offset = 0
        # The leakage offset table based on initial testing
        # @TODO: retest offsets and go to 450 PSI
        self.offset_table = ((-50, 30), (0, 31), (50, 44), (100, 37), (150, 41), (200, 44),
                         (250, 49), (300, 54), (350, 63), (400, 72), (450, 81))
        # A table of calculated leakage offsets to give single-incremental points based on the
        # above tested values
        self.calculated_offsets = []
        for i in range(len(self.offset_table)-1):
            for x in range(self.offset_table[i][0], self.offset_table[i-1][0]):
                x1 = self.offset_table[i][0]
                x2 = self.offset_table[i+1][0]
                y1 = self.offset_table[i][1]
                y2 = self.offset_table[i+1][1]
                y = ((x-x1) * (y2-y1)) / (x2-x1) + y1
                self.calculated_offsets.append(y)

        # Connect UI clicks and presses to commands
        self.ui.btn_all.clicked.connect(lambda: self.select_all_tests(True))
        self.ui.btn_none.clicked.connect(lambda: self.select_all_tests(False))
        self.ui.comboBox.currentTextChanged.connect(self.select_part)
        self.ui.btn_energize.clicked.connect(self.energize)
        self.ui.btn_start.clicked.connect(self.start_tests)
        self.ui.btn_skip.clicked.connect(self.skip_press)

        # Select the initial part
        self.select_part()

        # Initialize queues
        self.input_queue = Queue(10)
        self.output_queue = Queue(10)
        self.data_q_out = Queue(10)
        self.pps_out = Queue(10)
        self.response_queue = Queue(400)
        self.test_queue = Queue(5)
        self.log_queue = Queue(10)

        # Initialize timer to update on-screen values
        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.update_data)
        self.timer.start(25)

        # Initialize process pool
        self.pool = Pool(processes=4, initializer=pool_init,
                         initargs=(self.input_queue, self.output_queue, self.data_q_out,
                                   self.pps_out, self.response_queue))

        # Place the data producing functions into the process pool
        self.pool.apply_async(func=data_q_producer)
        self.pool.apply_async(func=compute)
        self.pool.apply_async(func=pps_producer)

    def closeEvent(self, *args, **kwargs):
        # Verify COM ports are closed properly before exiting
        file_logger('Attempting Exit')
        self.timer.stop()
        self.test_queue.put('ESC')
        self.data_q_out.put('stop')
        self.pps_out.put('stop')
        sleep(.5)
        file_logger('Exited')

    def keyPressEvent(self, event):
        file_logger('Keypress Event: {}'.format(event.key()))
        # Capture different key presses for different functions
        if event.key() == QtCore.Qt.Key_Return:
            self.test_queue.put(float(self.ui.lineEdit.text()))
        elif event.key() == QtCore.Qt.Key_Backspace:
            self.test_queue.put('ESC')
        elif event.key() == QtCore.Qt.Key_S:
        self.test_queue.put('SKIP')

    def skip_press(self):
        file_logger('Skip press Event')
        self.test_queue.put('SKIP')

    def print_to_log(self, text):
        # Enter a line into the log with auto-scrolling
        self.ui.log_output.append(text)
        cursor = self.ui.log_output.textCursor()
        QtGui.QTextCursor.movePosition(cursor, QtGui.QTextCursor.End)
        self.ui.log_output.setTextCursor(cursor)

    def update_data(self):
        # Update status boxes
        if not self.output_queue.empty():
            file_logger('Update Interface Event')
            data_dict = self.output_queue.get()
            # Before calculating corrected leakage, get the offset
            self.measured_offset = data_dict['offset']
            # Modify low flow with offset
            data_dict['low_flow'] -= self.measured_offset - self.calculated_offsets[int(data_dict['upstream'])]
            # Update the status on the UI
            self.ui.upstream_pressure.setText('{:.1f}'.format(data_dict['upstream']))
            self.ui.downstream_pressure.setText('{:.1f}'.format(data_dict['downstream']))
            self.ui.flow_sensor.setText('{:.2f}'.format(data_dict['high_flow']))
            self.ui.leakage_sensor.setText('{:.0f}'.format(data_dict['low_flow']))
            self.ui.voltage.setText('{:.2f}'.format(data_dict['voltage']))
            self.ui.current.setText('{:.3f}'.format(data_dict['current']))
            # Pass the values on to the test queue so the ATP process can use them
            self.test_queue.put(data_dict)
            if self.test_queue.full():
                self.test_queue.get()
            file_logger('Updated Interface')
        # Update log
        if not self.log_queue.empty():
            text = self.log_queue.get()
            file_logger('Printing to log: {}'.format(text))
            # For the countdown timer, delete the previous line, but not the first count!
            if isinstance(text, int) and text != 1:
                cursor = self.ui.log_output.textCursor()
                QtGui.QTextCursor.movePosition(cursor, QtGui.QTextCursor.End, QtGui.QTextCursor.MoveAnchor)
                QtGui.QTextCursor.movePosition(cursor, QtGui.QTextCursor.StartOfLine, QtGui.QTextCursor.KeepAnchor)
                QtGui.QTextCursor.removeSelectedText(cursor)
                # Delete last newline character so the number doesn't print on the next line
                QtGui.QTextCursor.deletePreviousChar(cursor)
            self.print_to_log(str(text))
            file_logger('Printed to log: {}'.format(text))

    def select_all_tests(self, state=True):
        # Select (or deselect if state is False) all tests
        for i in range(len(self.ui.listWidget)):
            self.ui.listWidget.item(i).setSelected(state)

    def select_part(self):
        # Update test list with a new part every time the combo box is changed
        part_name = self.ui.comboBox.currentText()
        for name, obj in getmembers(parts):
            # Get the objects only labled as 'Part'
            if 'Part' in name:
                # Get the object with a part name that corresponds the the selected part
                if part_name in obj().part_name:
                    self.valve = obj()
                    # Clear out the current contents of the test list
                    self.select_all_tests(False)
                    self.ui.listWidget.clear()
                    # Update test list with new tests
                    for test in self.valve.procedure:
                        self.ui.listWidget.addItem(test[0])
                    # Pre-select all tests
                    self.select_all_tests()
                    # Set Coils up properly; if there is only one coil in the unit, disable the second coil
                    self.ui.coil_1.setChecked(True)
                    if self.valve.coils < 2:
                        self.ui.coil_2.setChecked(False)
                        self.ui.coil_2.setEnabled(False)
                    else:
                        self.ui.coil_2.setEnabled(True)
                        self.ui.coil_2.setChecked(True)
                    return

    def energize(self):
        # Energize function for the energize button, but only if not running any test!
        if self.auto_mode:
            pass
        else:
            if self.ui.btn_energize.isChecked():
                coil1 = int(self.ui.coil_1.checkState() / 2)
                coil2 = int(self.ui.coil_2.checkState() / 2)
                self.data_q_out.put([coil1, coil2, 2, 2])
            else:
                self.data_q_out.put([0, 0, 2, 2])

    def start_tests(self):
        file_logger('Starting Tests')
        # Starts the testing thread
        self.ui.log_output.setHtml('')
        t = Thread(target=self.run_tests)
        t.daemon = True
        t.start()

    def run_tests(self):
        # Don't let the user try to start while running nor change the part number mid-test!
        self.ui.btn_start.setEnabled(False)
        self.ui.comboBox.setEnabled(False)
        line = '-----------------------------------------------'
        for test in self.valve.procedure:
            # Verify the test is selected to run by iterating through all the test items in
            # the test list and, if matching the current test name, verify the checked state
            for i in range(len(self.ui.listWidget)):
                if test[0] == self.ui.listWidget.item(i).text() and self.ui.listWidget.item(i).isSelected():
                    file_logger('Testing {}'.format(test[0]))
                    self.log_queue.put('<b>{1}\r\nRunning {0}\r\n{1}</b> '.format(test[0], line))
                    test[1](self.log_queue, self.test_queue, self.pps_out, self.data_q_out, self.response_queue)
                    # Tell the user of an escape or a skip
                    if self.valve.escape:
                        file_logger('Escaped'.format(test[0]))
                        self.log_queue.put('<b><font color="blue">Escaped</b></font> ')
                        self.ui.btn_start.setEnabled(True)
                        self.ui.comboBox.setEnabled(True)
                        self.valve.escape = False
                        # If escaping, break out of all loops
                    return
                elif self.valve.skip:
                    file_logger('Skipped'.format(test[0]))
                    self.log_queue.put('<b><font color="orange">Skipped</b></font> ')
                    self.valve.skip = False
                else:
                    file_logger('Test Successful')
                # Once the test is found, break out of the test name matching loop
                break
            # If the test is not selected, notify user by displaying 'Skipping'
            elif test[0] == self.ui.listWidget.item(i).text():
                self.log_queue.put('<b>{1}</b>\r\nSkipping {0}'.format(test[0], line))
                break
        # Re-enable starting tests and selecting part numbers
        self.ui.btn_start.setEnabled(True)
        self.ui.comboBox.setEnabled(True)


if __name__ == '__main__':
    freeze_support()

    #input_queue = Queue(10)
    #output_queue = Queue(10)
    #data_q_out = Queue(10)
    #pps_out = Queue(10)
    #response_queue = Queue(400)
    ## Initialize process pool
    #pool = Pool(processes=4, initializer=pool_init,
    #            initargs=(input_queue, output_queue, data_q_out, pps_out, response_queue))
    #
    ## Place the data producing functions into the process pool
    #pool.apply_async(func=data_q_producer)
    #pool.apply_async(func=compute)
    #pool.apply_async(func=pps_producer)

    file_logger('####### NEW RUN #######\n')
    app = QtWidgets.QApplication(sys.argv)
    window = MainGui()
    window.show()
    file_logger('####### END RUN #######\n')
    sys.exit(app.exec_())
4

1 回答 1

2

您需要先将以下代码添加到您的 main 中:

from multiprocessing import freeze_support
freeze_support()

请参阅此 stackoverflow帖子

于 2020-10-09T03:15:28.453 回答