0

我的 Python 扩展模块遇到了不可预知的行为,该模块包装了一个 C++ 库,该库启动一个新的 pthread,并在完成一些工作后生成回调回调用者。我已将其大大简化为一个仍然演示此问题的简单示例。以下有时会生成一个Fatal Python error: PyEval_SaveThread: NULL tstate,通常相当快。有时它在 SIGSEGV 上tupledealoc。偶尔会出现这种死锁。我不知道为什么。有没有人有任何想法?

这是我的python测试代码

import mymod
from time import sleep
from random import randrange

def my_cb1(s):
    print("Python cb %s" % (s));

for x in range(1,1000):
    num_cb = randrange(5) + 1
    print("Starting %d" % mymod.doit(my_cb1, "myid" + str(x), num_cb))

while True:
    sleep(1)

扩展模块是:

#include <pthread.h>

#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <stddef.h>

#include <iostream>
#include <map>
#include <deque>
#include <mutex>
#include <functional>
#include <thread>

static std::map<std::string, PyObject *> cb_map;
static std::mutex map_mtx;

struct fake_cb_info
{
  fake_cb_info() = delete;
  fake_cb_info(const unsigned long &num_cb, const std::string &id) :
    num_cb(num_cb), id(id)
  {
  }
  const unsigned long num_cb;
  const std::string id;
};
static std::deque<struct fake_cb_info> deq;
static std::mutex deq_mtx;

static bool is_worker_thread_running = false;
static std::thread worker_thread;

typedef std::function<void(const std::string &id, const std::string &s)> doit_cb_t;
static void internal_cb(const std::string &id, const std::string &s)
{
  std::scoped_lock<std::mutex> lk(map_mtx);

  if (0 != cb_map.count(id))
  {
      PyGILState_STATE gstate;
      gstate = PyGILState_Ensure();

      PyObject *arglist = Py_BuildValue("(s)", s.c_str());
      PyObject *result = PyObject_CallObject(cb_map.at(id), arglist);
      Py_DECREF(arglist);

      if (NULL == result)
      {
          if (NULL == PyErr_Occurred())
          {
            std::cerr << "Unknown error occurred in C callback" << std::endl;
          }
          else
          {
              PyErr_Print();
          }
      }
      else
      {
        Py_DECREF(result);
      }

      PyGILState_Release(gstate);
  }
  else
  {
    std::cerr << "Unknown callback id " << id << std::endl;
  }
}

void static worker()
{
  size_t x = 0;
  while(true)
  {
    std::scoped_lock<std::mutex> lk(deq_mtx);
    if (deq.size() == 0)
    {
      usleep(1000);
      continue;
    }

    auto info = deq.front();
    deq.pop_front();
    for (unsigned long i=0; i<info.num_cb; i++)
    {
      internal_cb(info.id, std::to_string(x++));
    }
  }
}

PyObject * _wrap_doit(void *self, PyObject *args, PyObject *kwargs)
{
    PyObject *py_retval;
    PyThreadState *py_thread_state = NULL;
    PyObject *cb;
    const char *id = NULL;
    Py_ssize_t id_len;
    std::string id_std;
    unsigned long num_callbacks;
    const char *keywords[] = {"cb_func", "id", "num_cb", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwargs, (char *) "Os#k", (char **) keywords, &cb, &id, &id_len, &num_callbacks))
    {
        abort();
    }
    if (!PyCallable_Check(cb))
    {
        abort();
    }

    id_std = std::string(id, id_len);

    {
      std::scoped_lock<std::mutex> lk(map_mtx);
      if (0 == cb_map.count(id_std))
      {
        Py_INCREF(cb);
        cb_map.insert(std::make_pair(id_std, cb));

        // N.B. The corresponding Py_DECREF for the callback function PyObject
        // is intentionally not here. It is in another extension module method
        // that is not listed here (just trying to keep this example as small
        // and lean as possible)
      }
      else
      {
        std::cerr << "Only one callback for ID!" << std::endl;
        abort();
      }
    }

    if (PyEval_ThreadsInitialized ())
    {
      std::cout << "Saving thread" << std::endl;
      py_thread_state = PyEval_SaveThread();
    }

    {
      // Stash away the info so that we will know how many callbacks to
      // generate and sleep a bit. This is to simulate a real external library
      // doing work which will, in turn, generate callbacks
      struct fake_cb_info info(num_callbacks, id_std);
      std::scoped_lock<std::mutex> lk(deq_mtx);
      deq.push_back(info);

      if (!is_worker_thread_running)
      {
        std::cout << "@@@@ Creating a new thread\n";
        worker_thread = std::thread(&worker);
        pthread_setname_np(worker_thread.native_handle(), "worker_thread");
        worker_thread.detach();
        is_worker_thread_running = true;
      }

      usleep(10000);
    }

    if (py_thread_state)
    {
      std::cout << "Restoring thread" << std::endl;
      PyEval_RestoreThread(py_thread_state);
    }

    py_retval = Py_BuildValue((char *) "k", num_callbacks);
    return py_retval;
}

static PyMethodDef mymod_functions[] = {
    {
      (char *) "doit",
      (PyCFunction) _wrap_doit,
      METH_KEYWORDS | METH_VARARGS,
      "Generate requested number of multi-threaded callbacks.\n doit(callback_fn, id, num_callbacks)"
    },
    {NULL, NULL, 0, NULL}
};

static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "mymod",
    "pthread test module",
    -1,
    mymod_functions,
};

#define MOD_ERROR NULL
#define MOD_INIT(name) PyObject* PyInit_##name(void)
#define MOD_RETURN(val) val

#if defined(__cplusplus)
extern "C"
#endif

#if defined(__GNUC__) && __GNUC__ >= 4
__attribute__ ((visibility("default")))
#endif


MOD_INIT(mymod)
{
    PyObject *m = PyModule_Create(&moduledef);
    if (m == NULL) {
        return MOD_ERROR;
    }
    return MOD_RETURN(m);
}


如果您想跳过setup.py编译扩展模块的步骤,这是我用来构建它的 shell 脚本

#!/bin/bash -eux

obj='test_python_cmodule.o'
lib='mymod.cpython-36m-x86_64-linux-gnu.so'
if [ -f $obj ]; then
  rm $obj
fi
if [ -f $lib ]; then
  rm $lib
fi

g++ -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC -I.  -I/usr/include/python3.6m -c test_python_cmodule.cpp -o $obj -std=c++17 -Wno-unused-variable -g -O0

x86_64-linux-gnu-gcc -pthread -shared -Wl,-O1 -Wl,-Bsymbolic-functions -Wl,-Bsymbolic-functions -Wl,-z,relro -Wl,-Bsymbolic-functions -Wl,-z,relro -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 $obj -lpthread -o $lib -lstdc++
4

1 回答 1

0

观察到该问题仅发生在 Python 3.6 而不是较新版本的情况导致了解决方案(发布在 comp.lang.python 上),即在这些较旧版本的 Python 中,PyEval_InitThreads()必须在主线程中至少显式调用一次。在 Python3.7 中,它由 Python 自动调用,实际函数在 Python 3.9 中已弃用,预计在 Python 3.11 中从公共 API 中完全删除

于 2020-10-28T17:03:21.223 回答