Python虚拟机使用一个全局解释器锁(Global Interpreter Lock,GIL)来互斥线程对虚拟机的使用。

GIL与线程调度

在一个线程拥有了GIL,获得解释器的访问权之后,其他线程必须等待它释放解释器的访问权。

Python的多线程在于拥有一套线程调度机制,默认执行100条指令后启动线程调度机制挂起线程,选择处于等待的下一个线程时,借用底层操作系统提供的线程调度机制决定下一个进入解释器的线程。

Python中的Thread

Python提供两个多线程机制接口:

  • thread
  • threading

thread 模块提供方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//threadmodule.c
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS,
start_new_doc},
{"start_new", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS,
start_new_doc},
{"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"allocate", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"exit_thread", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"exit", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main,
METH_NOARGS, interrupt_doc},
{"get_ident", (PyCFunction)thread_get_ident,
METH_NOARGS, get_ident_doc},
{"_count", (PyCFunction)thread__count,
METH_NOARGS, _count_doc},
{"stack_size", (PyCFunction)thread_stack_size,
METH_VARARGS,
stack_size_doc},
{NULL, NULL} /* sentinel */
};


Python线程创建

通过start_new_thread创建一个新线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
long ident;
if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
&func, &args, &keyw))
return NULL;
if (!PyCallable_Check(func)) {
PyErr_SetString(PyExc_TypeError,
"first arg must be callable");
return NULL;
}
if (!PyTuple_Check(args)) {
PyErr_SetString(PyExc_TypeError,
"2nd arg must be a tuple");
return NULL;
}
if (keyw != NULL && !PyDict_Check(keyw)) {
PyErr_SetString(PyExc_TypeError,
"optional 3rd arg must be a dictionary");
return NULL;
}
//1. 创建并初始化bootstate结构, 在boot中保存关于线程的一切信息,
boot = PyMem_NEW(struct bootstate, 1);
if (boot == NULL)
return PyErr_NoMemory();
boot->interp = PyThreadState_GET()->interp; //保存PyInterpreterState对象
boot->func = func;
boot->args = args;
boot->keyw = keyw;
boot->tstate = _PyThreadState_Prealloc(boot->interp);
if (boot->tstate == NULL) {
PyMem_DEL(boot);
return PyErr_NoMemory();
}
Py_INCREF(func);
Py_INCREF(args);
Py_XINCREF(keyw);
//2. 初始化多线程环境
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
//3. 创建操作系统原生线程
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
if (ident == -1) {
PyErr_SetString(ThreadError, "can't start new thread");
Py_DECREF(func);
Py_DECREF(args);
Py_XDECREF(keyw);
PyThreadState_Clear(boot->tstate);
PyMem_DEL(boot);
return NULL;
}
return PyInt_FromLong(ident);
}
  • 在Python虚拟机启动时,多线程机制并没有被激活,它只支持单线程,调用thread.start_new_thread,明确指示Python虚拟机创建新的线程,Python就能意识到用户需要多线程的支持,Python虚拟机会自动建立多线程机制需要的数据机构、环境以及GIL。

建立多线程环境

多线程环境的建立,主要就是创建GIL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//pythread.h
typedef void *PyThread_type_lock;
//ceval.c
static PyThread_type_lock interpreter_lock = 0; /* This is the GIL */
static PyThread_type_lock pending_lock = 0; /* for pending calls */
static long main_thread = 0;
void
PyEval_InitThreads(void)
{
if (interpreter_lock)
return;
interpreter_lock = PyThread_allocate_lock(); //创建GIL(PNRMUTEX aLock)thread_nt.h
PyThread_acquire_lock(interpreter_lock, 1);
main_thread = PyThread_get_thread_ident();
}
PyThread_type_lock
PyThread_allocate_lock(void)
{
PNRMUTEX aLock;
dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();
aLock = AllocNonRecursiveMutex() ;
dprintf(("%ld: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));
return (PyThread_type_lock) aLock;
}
//thread_nt.c
typedef struct NRMUTEX {
LONG owned ;
DWORD thread_id ;
HANDLE hevent ; //event内核对象
} NRMUTEX, *PNRMUTEX ;

  • 在PyEval_InitThreads通过PyThread_allocate_lock成功地创建了GIL之后,当前线程就开始遵循Python的多线程机制的规则;调用任何Python C API之前,必须首先获得GIL。因此PyEval_InitThreads紧接着通过PyThread_acquire_lock尝试获得GIL。最后调用PyThead_get_thread_ident(),获得当前Python主线程id,并将其赋给main_thread。

创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
HANDLE hThread;
unsigned threadID;
callobj *obj;
dprintf(("%ld: PyThread_start_new_thread called\n",
PyThread_get_thread_ident()));
if (!initialized)
PyThread_init_thread();
obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
if (!obj)
return -1;
obj->func = func;
obj->arg = arg;
#if defined(MS_WINCE)
hThread = CreateThread(NULL,
Py_SAFE_DOWNCAST(_pythread_stacksize, Py_ssize_t, SIZE_T),
bootstrap, obj, 0, &threadID);
#else
hThread = (HANDLE)_beginthreadex(0,
Py_SAFE_DOWNCAST(_pythread_stacksize,
Py_ssize_t, unsigned int),
bootstrap, obj,
0, &threadID);
#endif
if (hThread == 0) {
#if defined(MS_WINCE)
/* Save error in variable, to prevent PyThread_get_thread_ident
from clobbering it. */
unsigned e = GetLastError();
dprintf(("%ld: PyThread_start_new_thread failed, win32 error code %u\n",
PyThread_get_thread_ident(), e));
#else
/* I've seen errno == EAGAIN here, which means "there are
* too many threads".
*/
int e = errno;
dprintf(("%ld: PyThread_start_new_thread failed, errno %d\n",
PyThread_get_thread_ident(), e));
#endif
threadID = (unsigned)-1;
HeapFree(GetProcessHeap(), 0, obj);
}
else {
dprintf(("%ld: PyThread_start_new_thread succeeded: %p\n",
PyThread_get_thread_ident(), (void*)hThread));
CloseHandle(hThread);
}
return (long) threadID;
}
  • 主线程为执行程序时操作系统创建,主线程调用PyThread_start_new_thread创建子线程(CreateThread)。Win32下thread的API:_beginThread完成线程创建,bootstrap调用子线程定义(如函数),_beginThreadz在子线程中发生,顺利返回后,主线程挂起,等待obj.done。
  • 主线程调用用PyThread_start_new_thread需要返回子线程的线程id,这个id只有在子线程被激活后在子线程获取,一旦子线程设置obj->id,就会设法唤醒主线程。主线程获得id后继续执行之后的字节码,主线程掌握GIL,子线程进入等待GIL状态,等待线程调度。
  • 子线程创建自身线程状态对象后, 通过_PyGILState_NotrThreadState将这个对象放入线程状态对象链表。当前活动的Python线程不一定是获得了GIL的线程,主线程和子线程都是win32原生线程,操作系统可能在主线程和子线程之间切换。当所有的线程都完成了初始化之后,操作系统线程调度和Python线程调度才会统一,Python线程调度迫使当前活动线程释放GIL,并通知所有等待GIL event内核对象的线程,触发操作系统线程调度。

Python线程调度

标准调度

Python的线程之间切换由线程调度机制掌握,python线程调度机制内建在python解释器的核心PyEval_EvalFrameEx中。

主线程先获得GIL, 并执行PyEval_EvalFrameEx函数代码, 这是子线程在t_bootstrap中调用PyEval_AcquireThread, 通过调用PyThread_acquire_lock申请GIL, 但由于GIL被主线程调用, 子线程被挂起. 主线程不断执行字节码, _Py_Ticker不断减一, 当减到0, 将当前维护线程状态置NULL, 然后释放GIL,此时子线程被操作系统的线程调度唤醒, 从而进入PyEval_EvalFrameEx. 对于主线程虽然失去了GIL, 但是没被挂起, 所以可以被再次切换为活动线程, 再次申请GIL, 由于被子线程占有, 主线程将自身挂起.

阻塞调度

线程A通过某些操作(如等待输入), 将自身阻塞, python应将等待GIL的线程B唤醒。

Python子线程的销毁

主线程销毁必须要销毁python的运行时环境, 子线程的销毁不需要进行这些动作,对线程状态对象中维护的东西进行引用计数的维护,随后Python释放GIL。

Python线程的用户级互斥和同步

Python的线程在GIL的控制之下,线程之间,对整个Python解释器,对Python提供的C API的访问,都是互斥的,这可以看作是Python内核级互斥机制。但这种互斥我们不能控制的,我们还需要另一种可控的互斥机制————用户级互斥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//threadmodule.c
/* Lock objects */
typedef struct {
PyObject_HEAD
PyThread_type_lock lock_lock; //Event内核对象
PyObject *in_weakreflist;
} lockobject;
static PyObject *
thread_PyThread_allocate_lock(PyObject *self)
{
return (PyObject *) newlockobject();
}
static lockobject *
newlockobject(void)
{
lockobject *self;
self = PyObject_New(lockobject, &Locktype);
if (self == NULL)
return NULL;
self->lock_lock = PyThread_allocate_lock();
self->in_weakreflist = NULL;
if (self->lock_lock == NULL) {
Py_DECREF(self);
PyErr_SetString(ThreadError, "can't allocate lock");
return NULL;
}
return self;
}
  • 对thread.allocate的调用仅仅通过newlockobject创建了一个lockobject对象,Python整个用户级线程同步机制就在这个对象基础上实现。

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import threading
import time
count = 0
def reader():
global count
while True:
lock1.acquire()
count += 1
if count == 1:
lock2.acquire()
lock1.release()
print 'reading...'
time.sleep(2)
lock1.acquire()
count -= 1
if count == 0:
lock2.release()
lock1.release()
def writer():
while True:
lock2.acquire()
print 'writing...'
time.sleep(2)
lock2.release()
lock1 = threading.Lock() # 变量count锁
lock2 = threading.Lock() # 写锁
t1 = threading.Thread(target=reader)
t2 = threading.Thread(target=reader)
t3 = threading.Thread(target=writer)
t4 = threading.Thread(target=writer)
t1.start()
t2.start()
t3.start()
t4.start()