Каналы

Каналы (channels) - инструмент коммуникации между горутинами. Для определения канала применяется ключевое слово chan var intChannel chan int

type hchan struct {
   qcount   uint           // количество элементов в буфере
   dataqsiz uint           // размерность буфера для буферизированного канала
   buf      unsafe.Pointer //  ссылка на буфер с данными, записанными в канал, реализованный с помощью структуры данных “кольцевой буфер”
   elemsize uint16 // размер одного элемента в канале
   closed   uint32 // закрыт или открыт канал в данный момент(из за работы с atomic не bool а uint32)
   elemtype *_type // ссылка на тип данных в канале
   sendx    uint   // индекс смещения записи - в какой  элемент буфера будет следующая запись
   recvx    uint   // индекс смещения чтения - с какого элемента буфера будет следующее чтение
   recvq    waitq  // очередь заблокированных горутин(sudog), ожидающие чтения
   sendq    waitq  // очердь заблокированных горутин(sudog), ожидающие записи
   lock mutex  // мьютекс, используемый для операций, изменяющих состояние канала
}

Как видно канал содержит в себе очереди(waitq горутин(на чтение recvq и запись sendq)

waitq - это структура, которая содержит ссылки на первый и последний элемент списка из sudog горутин

type waitq struct {
	first *sudog
	last  *sudog
}

sudog представляет заблокированную горутину, ожидающую чтения или записи


type sudog struct {
    g *g                    // ссылка на горутину
    elem     unsafe.Pointer // данные для записи
    // ...
}

В общем случае, горутина захватывает lock mutex, когда совершает какое-либо действие с каналом, кроме случаев lock-free проверок при неблокирующих вызовах(например при использовании select). Closed — это флаг, который устанавливается в 1, если канал закрыт, и в 0, если не закрыт.

Канал может быть синхронным (небуферизированным) или асинхронным (буферезированным).

Синхронные(небуферизированные) каналы

package main

func main() {
    ch := make(chan bool)
    go func() {
        ch <- true
    }()
    <-ch
}

Сразу после создания канал выглядит вот так:

img

Go не выделяет буфер для синхронных каналов, поэтому указатель на буфер равен nil и dataqsizравен нулю. Допустим, что первым действием будет чтение из канала (обратный пример, когда вначале идёт запись, будет рассмотрен в примере с буферизированным каналами). Вначале, текущая горутина произведёт некоторые проверки, такие как:

  • закрыт ли канал
  • буферизирован он или нет
  • содержит ли гоуртины в send-очереди.

В нашем примере у канала нет ни буфера, ни ожидающих отправки горутин, поэтому горутина добавит сама себя в recvq и заблокируется. На этом шаге наш канал будет выглядеть следующим образом:

img

Теперь у нас осталась только одна работающая горутина, которая пытается записать данные в канал. Все проверки повторяются снова, и когда горутина проверяет recvq очередь, она находит ожидающую чтение горутину, удаляет её из очереди, записывает данные в её стек и снимает блокировку. Это единственное место во всём рантайме Go, когда одна горутина пишет напрямую в стек другой горутины. После этого шага, канал выглядит точно так же, как сразу после инициализации. Обе горутины завершаются и программа завершается.

Буферезированные каналы

package main

func main() {
    ch := make(chan bool, 1)
    ch <- true
    go func() {
        <-ch
    }()
    ch <- true
}

Допустим, что два значения были записаны в канал, и после этого один из элементов вычитан. И первым шагом идёт создание канала, который будет выглядеть вот так:

img

Разница в сравнении с синхронным каналом в том, что тут Go выделяет буфер и устанавливает значение dataqsiz в единицу.

Следующим шагом будет отправка первого значения в канал. Чтобы сделать это, горутина сначала производит несколько проверок: пуста ли очередь recvq, пуст ли буфер, достаточно ли места в буфере.

В нашем случае в буфере достаточно места и в очереди ожидания чтения нет горутин, поэтому горутина просто записывает элемент в буфер, увеличивает значение qcount и продолжает исполнение далее. Канал в этот момент выглядит так:

img

На следующем шаге, горутина main отправляет следующее значение в канал. Когда буфер полон, буферизированный канал будет вести себя точно так же, как синхронный (небуферизированный) канал, тоесть горутина добавит себя в очередь ожидания и заблокируется, в результате чего, канал будет выглядеть следующим образом:

img

Сейчас горутина main заблокирована и Go запустил одну анонимную горутину, которая пытается прочесть значение из канала. И вот тут начинается хитрая часть. Go гарантирует, что канал работает по принципу FIFO очереди (спецификация), но горутина не может просто взять значение из буфера и продолжить исполнение. В этом случае горутина main заблокируется навсегда. Для решения этой ситуации, текущая горутина читает данные из буфера, затем добавляет значение из заблокированной горутины в буфер, разблокирует ожидающую горутину и удаляет её из очереди ожидания. (В случае же, если нет ожидающих горутину, она просто читает первое значение из буфера)

img

Select

Select позволяет вам ждать нескольких операций на каналах

select {
    case <-ch:
    foo()
    default:
    bar()
}
  1. Элементы(scase) внутри select сортируются в случайном порядке(перемешиваются):

  2. Каждый из каналов блокируется мьютексом.

  3. Происходит последовательная попытка взаимодействия (запись или чтение) с каналами, перечисленными внутри оператора select. При наличии секции default, чтение и запись происходят в неблокирующем режиме(об этом далее).

  4. В случае, если ни один из каналов недоступен для взаимодействия, и секция default отсутствует, то текущая горутина переходит в состояние waiting до тех пор, пока какой-то из каналов не станет доступен.

  5. С каналов снимается блокировка мьютексом.

Go запускает функцию со следующей сигнатурой:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... }

Здесь нас интересует параметр block. При наличии секции default в операторе выбора select, функции chansend и chanrecv вызываются с параметром block равным false, в итоге функции осуществляют быстрый возврат в случае, если записать в канал или прочитать из канала без ожидания не удалось

go_chan

Закрытие канала

  1. выполняется проверка, что канал инициализирован(panic в случае, если канал не инициализирован);
  2. захватывается блокировка мьютекса;
  3. выполняется проверка, что канал не закрыт (panic в случае, если канал уже закрыт);
  4. значение поля closed канала (в структуре hchan) выставляется в true;
  5. все горутины, ожидающие чтения, получают default value в зависимости от типа данных в канале;
  6. все горутины, ожидающие записи, получают panic;
  7. мьютекс канала разблокируется;
  8. заблокированные горутины — разблокируются.

При чтении можно проверить закрыт канал или нет двумя способами:

val, ok :=<- someChan:
if !ok  {}// канал закрыт
for val := range someChan {
    // получено сообщение
}
// канал закрыт

При записи, нужно во первых обернуть в recover. Во вторых рекомендуется сначала "убить", писателей, а потом уже закрывать канал. Также One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.

It is worth collecting the channel axioms in one post:

Дополнительно: