Каналы
Каналы (channels) - инструмент коммуникации между горутинами. Для определения канала применяется ключевое слово chan var intChannel chan int
type hchan struct {
qcount uint // количество элементов в буфере
dataqsiz uint // размерность буфера для буферизированного канала
buf unsafe.Pointer // ссылка на буфер с данными, записанными в канал, реализованный с помощью структуры данных “кольцевой буфер”
elemsize uint16 // размер одного элемента в канале
closed uint32 // закрыт или открыт канал в данный момент(из за работы с atomic не bool а uint32)
elemtype *_type // ссылка на тип данных в канале
sendx uint // индекс смещения записи - в какой элемент буфера будет следующая запись
recvx uint // индекс смещения чтения - с какого элемента буфера будет следующее чтение
recvq waitq // очередь заблокированных горутин(sudog), ожидающие чтения
sendq waitq // очердь заблокированных горутин(sudog), ожидающие записи
lock mutex // мьютекс, используемый для операций, изменяющих состояние канала
}
Как видно канал содержит в себе очереди(waitq
горутин(на чтение recvq
и запись sendq
)
waitq
- это структура, которая содержит ссылки на первый и последний элемент списка из sudog
горутин
type waitq struct {
first *sudog
last *sudog
}
sudog
представляет заблокированную горутину, ожидающую чтения или записи
type sudog struct {
g *g // ссылка на горутину
elem unsafe.Pointer // данные для записи
// ...
}
В общем случае, горутина захватывает lock mutex
, когда совершает какое-либо действие с каналом, кроме случаев lock-free проверок при неблокирующих вызовах(например при использовании select
). Closed — это флаг, который устанавливается в 1, если канал закрыт, и в 0, если не закрыт.
Канал может быть синхронным (небуферизированным) или асинхронным (буферезированным).
Синхронные(небуферизированные) каналы
package main
func main() {
ch := make(chan bool)
go func() {
ch <- true
}()
<-ch
}
Сразу после создания канал выглядит вот так:
Go не выделяет буфер для синхронных каналов, поэтому указатель на буфер равен nil и dataqsiz
равен нулю. Допустим, что первым действием будет чтение из канала (обратный пример, когда вначале идёт запись, будет рассмотрен в примере с буферизированным каналами). Вначале, текущая горутина произведёт некоторые проверки, такие как:
- закрыт ли канал
- буферизирован он или нет
- содержит ли гоуртины в send-очереди.
В нашем примере у канала нет ни буфера, ни ожидающих отправки горутин, поэтому горутина добавит сама себя в recvq
и заблокируется. На этом шаге наш канал будет выглядеть следующим образом:
Теперь у нас осталась только одна работающая горутина, которая пытается записать данные в канал. Все проверки повторяются снова, и когда горутина проверяет recvq
очередь, она находит ожидающую чтение горутину, удаляет её из очереди, записывает данные в её стек и снимает блокировку. Это единственное место во всём рантайме Go, когда одна горутина пишет напрямую в стек другой горутины. После этого шага, канал выглядит точно так же, как сразу после инициализации. Обе горутины завершаются и программа завершается.
Буферезированные каналы
package main
func main() {
ch := make(chan bool, 1)
ch <- true
go func() {
<-ch
}()
ch <- true
}
Допустим, что два значения были записаны в канал, и после этого один из элементов вычитан. И первым шагом идёт создание канала, который будет выглядеть вот так:
Разница в сравнении с синхронным каналом в том, что тут Go выделяет буфер и устанавливает значение dataqsiz
в единицу.
Следующим шагом будет отправка первого значения в канал. Чтобы сделать это, горутина сначала производит несколько проверок: пуста ли очередь recvq
, пуст ли буфер, достаточно ли места в буфере.
В нашем случае в буфере достаточно места и в очереди ожидания чтения нет горутин, поэтому горутина просто записывает элемент в буфер, увеличивает значение qcount
и продолжает исполнение далее. Канал в этот момент выглядит так:
На следующем шаге, горутина main отправляет следующее значение в канал. Когда буфер полон, буферизированный канал будет вести себя точно так же, как синхронный (небуферизированный) канал, тоесть горутина добавит себя в очередь ожидания и заблокируется, в результате чего, канал будет выглядеть следующим образом:
Сейчас горутина main заблокирована и Go запустил одну анонимную горутину, которая пытается прочесть значение из канала. И вот тут начинается хитрая часть. Go гарантирует, что канал работает по принципу FIFO очереди (спецификация), но горутина не может просто взять значение из буфера и продолжить исполнение. В этом случае горутина main заблокируется навсегда. Для решения этой ситуации, текущая горутина читает данные из буфера, затем добавляет значение из заблокированной горутины в буфер, разблокирует ожидающую горутину и удаляет её из очереди ожидания. (В случае же, если нет ожидающих горутину, она просто читает первое значение из буфера)
Select
Select
позволяет вам ждать нескольких операций на каналах
select {
case <-ch:
foo()
default:
bar()
}
-
Элементы(scase) внутри select сортируются в случайном порядке(перемешиваются):
-
Каждый из каналов блокируется мьютексом.
-
Происходит последовательная попытка взаимодействия (запись или чтение) с каналами, перечисленными внутри оператора select. При наличии секции default, чтение и запись происходят в неблокирующем режиме(об этом далее).
-
В случае, если ни один из каналов недоступен для взаимодействия, и секция default отсутствует, то текущая горутина переходит в состояние waiting до тех пор, пока какой-то из каналов не станет доступен.
-
С каналов снимается блокировка мьютексом.
Go запускает функцию со следующей сигнатурой:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... }
Здесь нас интересует параметр block. При наличии секции default в операторе выбора select, функции chansend и chanrecv вызываются с параметром block равным false, в итоге функции осуществляют быстрый возврат в случае, если записать в канал или прочитать из канала без ожидания не удалось
Закрытие канала
- выполняется проверка, что канал инициализирован(panic в случае, если канал не инициализирован);
- захватывается блокировка мьютекса;
- выполняется проверка, что канал не закрыт (panic в случае, если канал уже закрыт);
- значение поля closed канала (в структуре hchan) выставляется в true;
- все горутины, ожидающие чтения, получают default value в зависимости от типа данных в канале;
- все горутины, ожидающие записи, получают panic;
- мьютекс канала разблокируется;
- заблокированные горутины — разблокируются.
При чтении можно проверить закрыт канал или нет двумя способами:
val, ok :=<- someChan:
if !ok {}// канал закрыт
for val := range someChan {
// получено сообщение
}
// канал закрыт
При записи, нужно во первых обернуть в recover. Во вторых рекомендуется сначала "убить", писателей, а потом уже закрывать канал. Также One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.
It is worth collecting the channel axioms in one post:
- a send on a
nil
channel blocks forever (Spec: Send statements) - a receive from a
nil
channel blocks forever (Spec: Receive operator) - a send to a closed channel panics (Spec: Send statements)
- a receive from a closed channel returns the zero value immediately (Spec: Receive operator)
Дополнительно: