Backpressure: контроль потока при медленном consumer

К задачам
Сложная
Concurrency

Условие задачи

Реализуй систему из producer'а и consumer'а. Producer каждую миллисекунду генерирует событие и отправляет в канал. Consumer обрабатывает событие за 100мс. Нужно избежать переполнения канала и неконтролируемого роста памяти. Реализуй backpressure через буферизованный канал или drop-логику.

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    // TODO: генерируй данные быстро
}

func consumer(ch <-chan int) {
    // TODO: потребляй медленно
}

func main() {
    ch := make(chan int, 10)

    go producer(ch)
    go consumer(ch)

    // TODO: пусть система поработает несколько секунд
    time.Sleep(3 * time.Second)
}
package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    // TODO: генерируй данные быстро
}

func consumer(ch <-chan int) {
    // TODO: потребляй медленно
}

func main() {
    ch := make(chan int, 10)

    go producer(ch)
    go consumer(ch)

    // TODO: пусть система поработает несколько секунд
    time.Sleep(3 * time.Second)
}

Подсказка

- Если буфер заполнен, продюсер может блокироваться. - Можно реализовать логику дропа: если буфер полон — пропускаем отправку. - Также можно добавить метрики: сколько дропов произошло.

Решение

Backpressure позволяет сбалансировать нагрузку между продюсером и потребителем и избежать перегрузки системы.

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    i := 0
    for {
        select {
        case ch <- i:
            i++
        default:
            // буфер переполнен — дропаем
            fmt.Println("Dropped")
        }
        time.Sleep(1 * time.Millisecond)
    }
}

func consumer(ch <-chan int) {
    for x := range ch {
        fmt.Println("Consumed:", x)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    ch := make(chan int, 10)

    go producer(ch)
    go consumer(ch)

    time.Sleep(3 * time.Second)
}
package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    i := 0
    for {
        select {
        case ch <- i:
            i++
        default:
            // буфер переполнен — дропаем
            fmt.Println("Dropped")
        }
        time.Sleep(1 * time.Millisecond)
    }
}

func consumer(ch <-chan int) {
    for x := range ch {
        fmt.Println("Consumed:", x)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    ch := make(chan int, 10)

    go producer(ch)
    go consumer(ch)

    time.Sleep(3 * time.Second)
}