Backpressure: контроль потока при медленном consumer
Условие задачи
Реализуй систему из producer'а и consumer'а. Producer каждую миллисекунду генерирует событие и отправляет в канал. Consumer обрабатывает событие за 100мс. Нужно избежать переполнения канала и неконтролируемого роста памяти. Реализуй backpressure через буферизованный канал или drop-логику.
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
// TODO: генерируй данные быстро
}
func consumer(ch <-chan int) {
// TODO: потребляй медленно
}
func main() {
ch := make(chan int, 10)
go producer(ch)
go consumer(ch)
// TODO: пусть система поработает несколько секунд
time.Sleep(3 * time.Second)
}
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
// TODO: генерируй данные быстро
}
func consumer(ch <-chan int) {
// TODO: потребляй медленно
}
func main() {
ch := make(chan int, 10)
go producer(ch)
go consumer(ch)
// TODO: пусть система поработает несколько секунд
time.Sleep(3 * time.Second)
}
Подсказка
- Если буфер заполнен, продюсер может блокироваться. - Можно реализовать логику дропа: если буфер полон — пропускаем отправку. - Также можно добавить метрики: сколько дропов произошло.
Решение
Backpressure позволяет сбалансировать нагрузку между продюсером и потребителем и избежать перегрузки системы.
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
i := 0
for {
select {
case ch <- i:
i++
default:
// буфер переполнен — дропаем
fmt.Println("Dropped")
}
time.Sleep(1 * time.Millisecond)
}
}
func consumer(ch <-chan int) {
for x := range ch {
fmt.Println("Consumed:", x)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
ch := make(chan int, 10)
go producer(ch)
go consumer(ch)
time.Sleep(3 * time.Second)
}
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
i := 0
for {
select {
case ch <- i:
i++
default:
// буфер переполнен — дропаем
fmt.Println("Dropped")
}
time.Sleep(1 * time.Millisecond)
}
}
func consumer(ch <-chan int) {
for x := range ch {
fmt.Println("Consumed:", x)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
ch := make(chan int, 10)
go producer(ch)
go consumer(ch)
time.Sleep(3 * time.Second)
}