1. Python 字節碼解只在字節碼指令之間的綫程之間切換
  2. 全局解釋器鎖(GIL)一次只允許單個綫程執行
  3. 許多操作轉換為單個 bytecode 來執行

#

可以使用 dis 模塊來檢查操作是否編譯為單個字節碼指令

>>> def update_dict():
...     d['a'] = 1
...
>>> import dis
>>> dis.dis(update_dict)
  3           0 LOAD_CONST               1 (1)      # 將 co_consts [1] 壓入堆,也就是 1
              3 LOAD_GLOBAL              0 (d)      # 在全局中加載 d 符號,即 co_names [0], 壓入堆棧上
              6 LOAD_CONST               2 ('a')    # 將 co_consts [2] 壓入堆,也就是 a 符號
              9 STORE_SUBSCR                        # single bytecode instruction
             10 LOAD_CONST               0 (None)
             13 RETURN_VALUE

# Python’s swap is not atomic

在 PyMongo 的連接池中,確保并發的問題之一,如果一個綫程正在重置連接池,而另一個綫程正在使用該池,如何防止它們相互踩踏

class Pool(object):
    def __init__(self):
        self.sockets = set()
    def reset(self):
        # Close sockets before deleting them
        sockets, self.sockets = self.sockets, set()
        for sock_info in sockets: sock_info.close()

這裏有可能會有競爭條件,作者認爲 sockets, self.sockets = self.sockets, set () 是 atomic 的,即第一個進入的綫程 reset () 將 self.sockets 替換為一個空集,但其實并非如此,在測試時,偶爾會發生 racing,測試啓動了 40 個并發綫程,每個綫程查詢 MongoDB,調用 reset (),然後再次查詢 MongoDB

# 測試失敗原因
test_disconnect (test.test_pooling.TestPooling) ... Exception in thread Thread-45:
Traceback (most recent call last):
 < ... snip ... >
 File "pymongo/pool.py", line 159, in reset
   for sock_info in sockets: sock_info.close()
RuntimeError: Set changed size during iteration

作者認爲交換兩個對象的賦值操作時 atomic 的,但實際上它需要 6 個字節碼指令

sockets, self.sockets = self.sockets, set()

bytecode 為

0 LOAD_FAST                0 (self)
            3 LOAD_ATTR                0 (sockets)
            6 LOAD_GLOBAL              1 (set)
            9 CALL_FUNCTION            0
           12 ROT_TWO          <- this is the swap
           13 STORE_FAST               1 (sockets)
           16 LOAD_FAST                0 (self)
           19 STORE_ATTR               0 (sockets)

假設綫程 1(T1)正在執行這個函數,T1 將 self.sockets 和空集加載到它的堆棧上并交換它們,在它執行到 STORE_ATTR(self.sockets 實際被替換)時,它被 T2 打斷執行,T2 運行連接池代碼的其他部分,如:

def return_socket(self, sock_info):
    self.sockets.add(sock_info)

bytecode 為

24 LOAD_FAST                0 (self)
           27 LOAD_ATTR                1 (sockets)
           30 LOAD_ATTR                3 (add)
           33 LOAD_FAST                1 (sock_info)
           36 CALL_FUNCTION            1

假設 T2 執行到 LOAD_ATTR 1 字節碼,現在它的堆棧上有 self.sockets,但它被 T1 的 reset () 打斷執行,T1 將 self.sockets 替換為空集,但是,T1 的 “舊” 套接字集合和 T2 的 self.sockets 是同一組,T1 開始遍歷舊的套接字集合,并關閉它們

for sock_info in sockets: sock_info.close()

但是,它再次被 T2 打斷執行了,它確實通過 self.sockets.add (sock_info) 將集合的大小增加了,下次恢復 T1 時,它會嘗試繼續迭代,并引發了 “Set changed size during iteration” exception

你可能會想到,實際上兩個 Python 綫程不會如此頻繁的相互中斷,Python 解釋器在考慮切換綫程之前一次執行 100 個字節碼,在 T1 中是重複調用了 socket.close (),查看 socketmodule.c 的源代碼

static PyObject *
sock_close(PySocketSockObject *s, PyObject *Py_UNUSED(ignored))
{
    SOCKET_T fd;
    int res;
    fd = s->sock_fd;
    if (fd != INVALID_SOCKET) {
        s->sock_fd = INVALID_SOCKET;
        /* We do not want to retry upon EINTR: see
           http://lwn.net/Articles/576478/ and
           http://linux.derkeiler.com/Mailing-Lists/Kernel/2005-09/3000.html
           for more details. */
        Py_BEGIN_ALLOW_THREADS
        res = SOCKETCLOSE(fd);
        Py_END_ALLOW_THREADS
        /* bpo-30319: The peer can already have closed the connection.
           Python ignores ECONNRESET on close(). */
        if (res < 0 && errno != ECONNRESET) {
            return s->errorhandler();
        }
    }
    Py_RETURN_NONE;
}

Py_BEGIN_ALLOW_THREADS 宏會釋放 GIL 并通過 Py_END_ALLOW_THREADS 宏等待重新獲取它,在 Python 多綫程中,釋放 GIL 很可能會導致另一個正在等待 GIL 的綫程立即獲取它,所以在循環中調用 socket.close (),會使該綫程不斷被中斷,return_socket () 中的某個綫程獲得對集合的引用并對其進行修改,與 reset () 中的其他綫程交錯獲得同一集合,并對其進行迭代

David Beazley 關於 GIL 的演講,https://pyvideo.org/chipy/mindblowing-python-gil.html

解決方案也很明顯了,在 Python 層面加鎖,可以保證完整的運行字節碼

class Pool(object):
    def __init__(self):
        self.sockets = set()
        self.lock = threading.Lock()
    def reset(self):
        self.lock.acquire()
        try:
            # Close sockets before deleting them
            sockets, self.sockets = self.sockets, set()
        finally:
            self.lock.release()
        # Now only this thread can have a reference to this set of sockets
        for sock_info in sockets: sock_info.close()
   def return_socket(self, sock_info):
        self.lock.acquire()
        try:
            self.sockets.add(sock_info)
        finally:
            self.lock.release()

Python 中單字節碼指令是 atomic 的,如果使用這種原子性來避免互斥鎖,那麽代碼不僅更快、更簡單,還可以避免死鎖的風險

并非所有看起來都是 atomic 的,有需要時,可以使用 dis 模塊來檢查字節碼

原文章:https://emptysqua.re/blog/pythons-swap-is-not-atomic/

Edited on Views times

Give me a cup of [coffee]~( ̄▽ ̄)~*

小芳芳 WeChat Pay

WeChat Pay

小芳芳 Alipay

Alipay