0

我正在使用 boost 库来开发异步 udp 通信。在接收方接收到的数据正在被另一个线程处理。然后我的问题是,当我在另一个线程而不是接收器线程中读取接收到的数据时,它会给出修改后的数据或更新的数据,这不是应该的数据。我的代码在发送方和接收方处理无符号字符缓冲区数组。原因是我需要将无符号字符缓冲区视为数据包,例如缓冲区 [2] = Engine_start_ID

  /* global buffer to store the incomming data 
  unsigned char received_buffer[200];
  /*
     global buffer accessed by another thread
      which contains copy the received_buffer
  */
   unsigned char read_hmi_buffer[200];
   boost::mutex hmi_buffer_copy_mutex;


       void udpComm::start_async_receive() {
             udp_socket.async_receive_from(
             boost::asio::buffer(received_buffer, max_length), remote_endpoint,
             boost::bind(&udpComm::handle_receive_from, this,
             boost::asio::placeholders::error,
             boost::asio::placeholders::bytes_transferred));
      }
   /* the data received is stored in the unsigned char received_buffer data buffer*/

     void udpComm::handle_receive_from(const boost::system::error_code& error,
        size_t bytes_recvd) {
        if (!error && bytes_recvd > 0) {
            received_bytes = bytes_recvd;
                    hmi_buffer_copy_mutex.lock();
             memcpy(&read_hmi_buffer[0], &received_buffer[0], received_bytes);
                     hmi_buffer_copy_mutex.unlock();

                     /*data received here is correct  'cus i printed in the console
                      checked it
                     */
                    cout<<(int)read_hmi_buffer[2]<<endl;
       }

             start_async_receive();

       }
       /*  io_service  is running in a thread 
         */
       void udpComm::run_io_service() {
       udp_io_service.run();
        usleep(1000000);
         }

以上代码是运行一个线程的异步udp通信

/* 我的第二个线程函数是 */

        void  thread_write_to_datalink()
         {    hmi_buffer_copy_mutex.lock();
           /* here is my problem begins*/
              cout<<(int)read_hmi_buffer[2]<<endl;
             hmi_buffer_copy_mutex.unlock();

           /* all data are already changed */

              serial.write_to_serial(read_hmi_buffer, 6);

         }
         /* threads from my main function
          are as below */


           int main() {
                 receive_from_hmi.start_async_receive();
              boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
                &receive_from_hmi);
              boost::thread thread_serial(&thread_write_to_datalink);   
              thread_serial.join();
              thread_receive_from_hmi.join();
                 return 0;
        } 

/* Serial_manager 类包含用于从串口写入和读取的函数*/

  #include <iostream>
            #include <boost/thread.hpp>
            #include <boost/asio.hpp>
            #include <boost/date_time/posix_time/posix_time.hpp>


            using namespace boost::asio;

            class Serial_manager {
            public:

                Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name);
                void open_serial_port();
                void write_to_serial(void *data, int size);
                size_t read_from_serial(void *data, int size);
                void handle_serial_exception(std::exception &ex);
                virtual ~Serial_manager();
                void setDeviceName(char* deviceName);

            protected:
                io_service &port_io_service;
                serial_port datalink_serial_port;
                bool serial_port_open;
                char *device_name;

            };

            void Serial_manager::setDeviceName(char* deviceName) {
                device_name = deviceName;
            }
            Serial_manager::Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name):
                port_io_service(serial_io_service),
                datalink_serial_port(serial_io_service) {
                device_name = dev_name;
                serial_port_open = false;
                open_serial_port();
            }

            void Serial_manager::open_serial_port() {
                bool temp_port_status = false;
                bool serial_port_msg_printed = false;
                do {
                    try {
                        datalink_serial_port.open(device_name);
                        temp_port_status = true;
                    } catch (std::exception &ex) {
                        if (!serial_port_msg_printed) {
                            std::cout << "Exception-check the serial port device "
                                    << ex.what() << std::endl;
                            serial_port_msg_printed = true;
                        }
                        datalink_serial_port.close();
                        temp_port_status = false;
                    }

                } while (!temp_port_status);
                serial_port_open = temp_port_status;
                std::cout <<std::endl <<"serial port device opened successfully"<<std::endl;
                datalink_serial_port.set_option(serial_port_base::baud_rate(115200));
                datalink_serial_port.set_option(
                        serial_port_base::flow_control(
                                serial_port_base::flow_control::none));
                datalink_serial_port.set_option(
                        serial_port_base::parity(serial_port_base::parity::none));
                datalink_serial_port.set_option(
                        serial_port_base::stop_bits(serial_port_base::stop_bits::one));
                datalink_serial_port.set_option(serial_port_base::character_size(8));

            }

            void Serial_manager::write_to_serial(void *data, int size) {
                boost::asio::write(datalink_serial_port, boost::asio::buffer(data, size));
            }

            size_t Serial_manager::read_from_serial(void *data, int size) {
                return boost::asio::read(datalink_serial_port, boost::asio::buffer(data, size));
            }
            void Serial_manager::handle_serial_exception(std::exception& ex) {
                std::cout << "Exception-- " << ex.what() << std::endl;
                std::cout << "Cannot access data-link, check the serial connection"
                        << std::endl;
                datalink_serial_port.close();
                open_serial_port();
            }

            Serial_manager::~Serial_manager() {
                // TODO Auto-generated destructor stub
            }

我认为我的问题领域是线程同步和通知,如果你能帮助我,我会很高兴。你不应该担心发送者它工作得很好,因为我已经检查过它在接收器线程接收到数据。我希望你能理解我的问题。

编辑:这是修改。我的整个想法是为手动飞行控制开发一个模拟,所以根据我的设计,我有通过 udp 通信发送命令的客户端应用程序。在接收端打算使用 3 个线程。一个线程接收来自棒的输入,即void start_hotas()第二个线程是从发送者(客户端)接收命令的线程:void udpComm::run_io_service()和第三个是 void thread_write_to_datalink()

              /* a thread that listens for input from sticks*/
            void start_hotas() {
        Hotas_manager hotasobj;
        __s16 event_value; /* value */
        __u8 event_number; /* axis/button number */
        while (1) {
            hotasobj.readData_from_hotas();


            event_number = hotasobj.getJoystickEvent().number;
            event_value = hotasobj.getJoystickEvent().value;
            if (hotasobj.isAxisPressed()) {
                if (event_number == 0) {
                    aileron = (float) event_value / 32767;

                } else if (event_number == 1) {
                    elevator = -(float) event_value / 32767;

                } else if (event_number == 2) {
                    rudder = (float) event_value / 32767;

                } else if (event_number == 3) {
                    brake_left = (float) (32767 - event_value) /    65534;


                } else if (event_number == 4) {


                } else if (event_number == 6) {


                } else if (event_number == 10) {


                } else if (event_number == 11) {

                } else if (event_number == 12) {



                }
            } else if (hotasobj.isButtonPressed()) {



            }
            usleep(1000);

        }

    }


            /*
     * Hotas.h
     *
     *  Created on: Jan 31, 2013
     *      Author: metec
     */

    #define JOY_DEV  "/dev/input/js0"
    #include <iostream>
    #include <boost/thread.hpp>
    #include <boost/asio.hpp>
    #include <boost/date_time/posix_time/posix_time.hpp>
    #include <linux/joystick.h>

    bool message_printed = false;
    bool message2_printed = false;

    class Hotas_manager {
    public:
        Hotas_manager();
        virtual ~Hotas_manager();
        void open_hotas_device();

        /*
         *
         * read from hotas input
         * used to the updated event data and status of the joystick from the
         * the file.
         *
         */
        void readData_from_hotas();

        js_event getJoystickEvent() {
            return joystick_event;
        }

        int getNumOfAxis() {
            return num_of_axis;
        }

        int getNumOfButtons() {
            return num_of_buttons;
        }

        bool isAxisPressed() {
            return axis_pressed;
        }

        bool isButtonPressed() {
            return button_pressed;
        }

        int* getAxis() {
            return axis;
        }

        char* getButton() {
            return button;
        }

    private:
        int fd;
        js_event joystick_event;
        bool hotas_connected;
        int num_of_axis;
        int num_of_buttons;
        int version;
        char devName[80];

        /*
         * the the variables below indicates
         * the state of the joystick.
         */
        int axis[30];
        char button[30];
        bool button_pressed;
        bool axis_pressed;
    };

    Hotas_manager::Hotas_manager() {
        // TODO Auto-generated constructor stub
        hotas_connected = false;
        open_hotas_device();
        std::cout << "joystick device detected" << std::endl;

    }

    Hotas_manager::~Hotas_manager() {
        // TODO Auto-generated destructor stub

    }

    void Hotas_manager::open_hotas_device() {
        bool file_open_error_printed = false;
        while (!hotas_connected) {
            if ((fd = open(JOY_DEV, O_RDONLY)) > 0) {
                ioctl(fd, JSIOCGAXES, num_of_axis);
                ioctl(fd, JSIOCGBUTTONS, num_of_buttons);
                ioctl(fd, JSIOCGVERSION, version);
                ioctl(fd, JSIOCGNAME(80), devName);
                /*
                 * NON BLOCKING MODE
                 */
                ioctl(fd, F_SETFL, O_NONBLOCK);
                hotas_connected = true;
            } else {
                if (!file_open_error_printed) {
                    std::cout << "hotas device not detected. check "
                            "whether it is "
                            "plugged" << std::endl;
                    file_open_error_printed = true;
                }
                close(fd);
                hotas_connected = false;
            }
        }

    }

    void Hotas_manager::readData_from_hotas() {
        int result;
        result = read(fd, &joystick_event, sizeof(struct js_event));
        if (result > 0) {
            switch (joystick_event.type & ~JS_EVENT_INIT) {
            case JS_EVENT_AXIS:
                axis[joystick_event.number] = joystick_event.value;
                axis_pressed = true;
                button_pressed = false;
                break;
            case JS_EVENT_BUTTON:
                button[joystick_event.number] = joystick_event.value;
                button_pressed = true;
                axis_pressed = false;
                break;
            }
            message2_printed = false;
            message_printed = false;

        } else {
            if (!message_printed) {
                std::cout << "problem in reading the stick file" << std::endl;
                message_printed = true;
            }
            hotas_connected = false;
            open_hotas_device();
            if (!message2_printed) {
                std::cout << "stick re-connected" << std::endl;
                message2_printed = true;
            }

        }

    }



I updated the main function to run 3 threads .




    int main() {


                boost::asio::io_service receive_from_hmi_io;
                udpComm receive_from_hmi(receive_from_hmi_io, 6012);
                receive_from_hmi.setRemoteEndpoint("127.0.0.1", 6011);
                receive_from_hmi.start_async_receive();
                boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
                        &receive_from_hmi);
                boost::thread thread_serial(&thread_write_to_datalink);
                boost::thread thread_hotas(&start_hotas);
                thread_hotas.join();
                thread_serial.join();
                thread_receive_from_hmi.join();
                return 0;
                 }

void thread_write_to_datalink()还写入来自 hotas_manager(joysticks) 的数据。

            void thread_write_to_datalink() {

                /*
                 * boost serial  communication
                 */
                boost::asio::io_service serial_port_io;
                Serial_manager serial(serial_port_io, (char*) "/dev/ttyUSB0");   


                cout << "aileron  " << "throttle   " << "elevator   " << endl;
                while (1) { 

                        // commands from udp communication      

                      serial.write_to_serial(read_hmi_buffer, 6);

                    // data come from joystick inputs

                    //cout << aileron<<"    "<<throttle<<"    "<<elevator<< endl;
                    memcpy(&buffer_manual_flightcontrol[4], &aileron, 4);
                    memcpy(&buffer_manual_flightcontrol[8], &throttle, 4);
                    memcpy(&buffer_manual_flightcontrol[12], &elevator, 4);

                    unsigned char temp;


                    try {
                        serial.write_to_serial(buffer_manual_flightcontrol, 32);
                        //serial.write_to_serial(buffer_manual_flightcontrol, 32);
                    } catch (std::exception& exp) {
                        serial.handle_serial_exception(exp);
                    }

                    try {
                        serial.write_to_serial(buffer_payloadcontrol, 20);
                    } catch (std::exception& exp) {
                        serial.handle_serial_exception(exp);
                    }

                    usleep(100000);

                }

            }

我的问题是如何更好地设计来同步这 3 个线程。如果您的回答说 您不需要使用 3 个线程,我需要您告诉我如何使用。

4

2 回答 2

0

您需要使您的访问read_hmi_buffer 同步。

因此,每当一段代码在该缓冲区中读取或写入时,您需要一个互斥锁(std::mutex,或 windows 等效项)来锁定。pthread_mutex_t

有关该概念的一些解释以及其他教程的链接,请参阅此问题

于 2013-03-08T16:09:00.800 回答
0

让我们从多线程中备份一点,您的程序混合了同步和异步操作。你不需要这样做,因为它只会引起混乱。您可以将从 UDP 套接字读取的缓冲区异步写入串行端口。这一切都可以通过运行 的单个线程来实现io_service,从而消除了任何并发问题。

您将需要添加缓冲区管理以使从套接字读取的数据async_write在串行端口的生命周期内保持在范围内,以异步 UDP 服务器为例。还要研究文档,特别是对缓冲区寿命的要求async_write

缓冲区

包含要写入的数据的一个或多个缓冲区。尽管缓冲区对象可以根据需要被复制,但底层内存块的所有权由调用者保留,它必须保证它们在调用处理程序之前保持有效。

一旦你完成了那个设计,那么你就可以转向更高级的技术,例如线程池或多个io_services。

于 2013-03-12T02:20:27.573 回答