我正在使用 boost 库来开发异步 udp 通信。在接收方接收到的数据正在被另一个线程处理。然后我的问题是,当我在另一个线程而不是接收器线程中读取接收到的数据时,它会给出修改后的数据或更新的数据,这不是应该的数据。我的代码在发送方和接收方处理无符号字符缓冲区数组。原因是我需要将无符号字符缓冲区视为数据包,例如缓冲区 [2] = Engine_start_ID
/* global buffer to store the incomming data
unsigned char received_buffer[200];
/*
global buffer accessed by another thread
which contains copy the received_buffer
*/
unsigned char read_hmi_buffer[200];
boost::mutex hmi_buffer_copy_mutex;
void udpComm::start_async_receive() {
udp_socket.async_receive_from(
boost::asio::buffer(received_buffer, max_length), remote_endpoint,
boost::bind(&udpComm::handle_receive_from, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
/* the data received is stored in the unsigned char received_buffer data buffer*/
void udpComm::handle_receive_from(const boost::system::error_code& error,
size_t bytes_recvd) {
if (!error && bytes_recvd > 0) {
received_bytes = bytes_recvd;
hmi_buffer_copy_mutex.lock();
memcpy(&read_hmi_buffer[0], &received_buffer[0], received_bytes);
hmi_buffer_copy_mutex.unlock();
/*data received here is correct 'cus i printed in the console
checked it
*/
cout<<(int)read_hmi_buffer[2]<<endl;
}
start_async_receive();
}
/* io_service is running in a thread
*/
void udpComm::run_io_service() {
udp_io_service.run();
usleep(1000000);
}
以上代码是运行一个线程的异步udp通信
/* 我的第二个线程函数是 */
void thread_write_to_datalink()
{ hmi_buffer_copy_mutex.lock();
/* here is my problem begins*/
cout<<(int)read_hmi_buffer[2]<<endl;
hmi_buffer_copy_mutex.unlock();
/* all data are already changed */
serial.write_to_serial(read_hmi_buffer, 6);
}
/* threads from my main function
are as below */
int main() {
receive_from_hmi.start_async_receive();
boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
&receive_from_hmi);
boost::thread thread_serial(&thread_write_to_datalink);
thread_serial.join();
thread_receive_from_hmi.join();
return 0;
}
/* Serial_manager 类包含用于从串口写入和读取的函数*/
#include <iostream>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace boost::asio;
class Serial_manager {
public:
Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name);
void open_serial_port();
void write_to_serial(void *data, int size);
size_t read_from_serial(void *data, int size);
void handle_serial_exception(std::exception &ex);
virtual ~Serial_manager();
void setDeviceName(char* deviceName);
protected:
io_service &port_io_service;
serial_port datalink_serial_port;
bool serial_port_open;
char *device_name;
};
void Serial_manager::setDeviceName(char* deviceName) {
device_name = deviceName;
}
Serial_manager::Serial_manager(boost::asio::io_service &serial_io_service,char *dev_name):
port_io_service(serial_io_service),
datalink_serial_port(serial_io_service) {
device_name = dev_name;
serial_port_open = false;
open_serial_port();
}
void Serial_manager::open_serial_port() {
bool temp_port_status = false;
bool serial_port_msg_printed = false;
do {
try {
datalink_serial_port.open(device_name);
temp_port_status = true;
} catch (std::exception &ex) {
if (!serial_port_msg_printed) {
std::cout << "Exception-check the serial port device "
<< ex.what() << std::endl;
serial_port_msg_printed = true;
}
datalink_serial_port.close();
temp_port_status = false;
}
} while (!temp_port_status);
serial_port_open = temp_port_status;
std::cout <<std::endl <<"serial port device opened successfully"<<std::endl;
datalink_serial_port.set_option(serial_port_base::baud_rate(115200));
datalink_serial_port.set_option(
serial_port_base::flow_control(
serial_port_base::flow_control::none));
datalink_serial_port.set_option(
serial_port_base::parity(serial_port_base::parity::none));
datalink_serial_port.set_option(
serial_port_base::stop_bits(serial_port_base::stop_bits::one));
datalink_serial_port.set_option(serial_port_base::character_size(8));
}
void Serial_manager::write_to_serial(void *data, int size) {
boost::asio::write(datalink_serial_port, boost::asio::buffer(data, size));
}
size_t Serial_manager::read_from_serial(void *data, int size) {
return boost::asio::read(datalink_serial_port, boost::asio::buffer(data, size));
}
void Serial_manager::handle_serial_exception(std::exception& ex) {
std::cout << "Exception-- " << ex.what() << std::endl;
std::cout << "Cannot access data-link, check the serial connection"
<< std::endl;
datalink_serial_port.close();
open_serial_port();
}
Serial_manager::~Serial_manager() {
// TODO Auto-generated destructor stub
}
我认为我的问题领域是线程同步和通知,如果你能帮助我,我会很高兴。你不应该担心发送者它工作得很好,因为我已经检查过它在接收器线程接收到数据。我希望你能理解我的问题。
编辑:这是修改。我的整个想法是为手动飞行控制开发一个模拟,所以根据我的设计,我有通过 udp 通信发送命令的客户端应用程序。在接收端打算使用 3 个线程。一个线程接收来自棒的输入,即void start_hotas()第二个线程是从发送者(客户端)接收命令的线程:void udpComm::run_io_service()和第三个是 void thread_write_to_datalink()。
/* a thread that listens for input from sticks*/
void start_hotas() {
Hotas_manager hotasobj;
__s16 event_value; /* value */
__u8 event_number; /* axis/button number */
while (1) {
hotasobj.readData_from_hotas();
event_number = hotasobj.getJoystickEvent().number;
event_value = hotasobj.getJoystickEvent().value;
if (hotasobj.isAxisPressed()) {
if (event_number == 0) {
aileron = (float) event_value / 32767;
} else if (event_number == 1) {
elevator = -(float) event_value / 32767;
} else if (event_number == 2) {
rudder = (float) event_value / 32767;
} else if (event_number == 3) {
brake_left = (float) (32767 - event_value) / 65534;
} else if (event_number == 4) {
} else if (event_number == 6) {
} else if (event_number == 10) {
} else if (event_number == 11) {
} else if (event_number == 12) {
}
} else if (hotasobj.isButtonPressed()) {
}
usleep(1000);
}
}
/*
* Hotas.h
*
* Created on: Jan 31, 2013
* Author: metec
*/
#define JOY_DEV "/dev/input/js0"
#include <iostream>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <linux/joystick.h>
bool message_printed = false;
bool message2_printed = false;
class Hotas_manager {
public:
Hotas_manager();
virtual ~Hotas_manager();
void open_hotas_device();
/*
*
* read from hotas input
* used to the updated event data and status of the joystick from the
* the file.
*
*/
void readData_from_hotas();
js_event getJoystickEvent() {
return joystick_event;
}
int getNumOfAxis() {
return num_of_axis;
}
int getNumOfButtons() {
return num_of_buttons;
}
bool isAxisPressed() {
return axis_pressed;
}
bool isButtonPressed() {
return button_pressed;
}
int* getAxis() {
return axis;
}
char* getButton() {
return button;
}
private:
int fd;
js_event joystick_event;
bool hotas_connected;
int num_of_axis;
int num_of_buttons;
int version;
char devName[80];
/*
* the the variables below indicates
* the state of the joystick.
*/
int axis[30];
char button[30];
bool button_pressed;
bool axis_pressed;
};
Hotas_manager::Hotas_manager() {
// TODO Auto-generated constructor stub
hotas_connected = false;
open_hotas_device();
std::cout << "joystick device detected" << std::endl;
}
Hotas_manager::~Hotas_manager() {
// TODO Auto-generated destructor stub
}
void Hotas_manager::open_hotas_device() {
bool file_open_error_printed = false;
while (!hotas_connected) {
if ((fd = open(JOY_DEV, O_RDONLY)) > 0) {
ioctl(fd, JSIOCGAXES, num_of_axis);
ioctl(fd, JSIOCGBUTTONS, num_of_buttons);
ioctl(fd, JSIOCGVERSION, version);
ioctl(fd, JSIOCGNAME(80), devName);
/*
* NON BLOCKING MODE
*/
ioctl(fd, F_SETFL, O_NONBLOCK);
hotas_connected = true;
} else {
if (!file_open_error_printed) {
std::cout << "hotas device not detected. check "
"whether it is "
"plugged" << std::endl;
file_open_error_printed = true;
}
close(fd);
hotas_connected = false;
}
}
}
void Hotas_manager::readData_from_hotas() {
int result;
result = read(fd, &joystick_event, sizeof(struct js_event));
if (result > 0) {
switch (joystick_event.type & ~JS_EVENT_INIT) {
case JS_EVENT_AXIS:
axis[joystick_event.number] = joystick_event.value;
axis_pressed = true;
button_pressed = false;
break;
case JS_EVENT_BUTTON:
button[joystick_event.number] = joystick_event.value;
button_pressed = true;
axis_pressed = false;
break;
}
message2_printed = false;
message_printed = false;
} else {
if (!message_printed) {
std::cout << "problem in reading the stick file" << std::endl;
message_printed = true;
}
hotas_connected = false;
open_hotas_device();
if (!message2_printed) {
std::cout << "stick re-connected" << std::endl;
message2_printed = true;
}
}
}
I updated the main function to run 3 threads .
int main() {
boost::asio::io_service receive_from_hmi_io;
udpComm receive_from_hmi(receive_from_hmi_io, 6012);
receive_from_hmi.setRemoteEndpoint("127.0.0.1", 6011);
receive_from_hmi.start_async_receive();
boost::thread thread_receive_from_hmi(&udpComm::run_io_service,
&receive_from_hmi);
boost::thread thread_serial(&thread_write_to_datalink);
boost::thread thread_hotas(&start_hotas);
thread_hotas.join();
thread_serial.join();
thread_receive_from_hmi.join();
return 0;
}
void thread_write_to_datalink()还写入来自 hotas_manager(joysticks) 的数据。
void thread_write_to_datalink() {
/*
* boost serial communication
*/
boost::asio::io_service serial_port_io;
Serial_manager serial(serial_port_io, (char*) "/dev/ttyUSB0");
cout << "aileron " << "throttle " << "elevator " << endl;
while (1) {
// commands from udp communication
serial.write_to_serial(read_hmi_buffer, 6);
// data come from joystick inputs
//cout << aileron<<" "<<throttle<<" "<<elevator<< endl;
memcpy(&buffer_manual_flightcontrol[4], &aileron, 4);
memcpy(&buffer_manual_flightcontrol[8], &throttle, 4);
memcpy(&buffer_manual_flightcontrol[12], &elevator, 4);
unsigned char temp;
try {
serial.write_to_serial(buffer_manual_flightcontrol, 32);
//serial.write_to_serial(buffer_manual_flightcontrol, 32);
} catch (std::exception& exp) {
serial.handle_serial_exception(exp);
}
try {
serial.write_to_serial(buffer_payloadcontrol, 20);
} catch (std::exception& exp) {
serial.handle_serial_exception(exp);
}
usleep(100000);
}
}
我的问题是如何更好地设计来同步这 3 个线程。如果您的回答说 您不需要使用 3 个线程,我需要您告诉我如何使用。