- reactor.go
/*
* Author, Copyright: Oleg Borodin <onborodin@gmail.com>
*
*/
package main
import (
"fmt"
"time"
"math/rand"
)
type Message struct {
Id int64
Subject string
Body string
}
type Report struct {
WorkerId int64
MessageId int64
Err error
}
type Worker struct {
Id int64
Mailbox chan Message
Reports chan Report
}
func NewWorker(id int64, reports chan Report) *Worker {
mailbox := make(chan Message, 10)
return &Worker {
Id: id,
Mailbox: mailbox,
Reports: reports,
}
}
type Reactor struct {
Mailbox chan Message
Reports chan Report
Workers []*Worker
}
func NewReactor() *Reactor {
mailbox := make(chan Message, 10)
reports := make(chan Report, 10)
// Create worrker array
workers := make([]*Worker, 0)
for i := 0; i < 10; i++ {
workers = append(workers, NewWorker(int64(i), reports))
}
return &Reactor {
Mailbox: mailbox,
Reports: reports,
Workers: workers,
}
}
func (this *Reactor) Loop() {
// Start worker threads
for _, item := range this.Workers {
go item.Loop()
}
for {
select {
case message := <- this.Mailbox:
fmt.Println("new job", message.Id)
// Simple random scheduler
workerId := rand.Intn(10)
this.Workers[workerId].Mailbox <- message
case report := <- this.Reports:
fmt.Println("new report", report.MessageId, "from worker", report.WorkerId)
//case <-time.After(time.Second * 1):
// fmt.Println("reactor timeout")
}
}
}
func (this *Worker) Loop() {
for {
select {
case message := <- this.Mailbox:
fmt.Println("worker", this.Id, "have new work", message.Id)
// Hard work
time.Sleep(rand.Intn(400) * time.Millisecond)
// Report result
this.Reports <- Report{ WorkerId: this.Id, MessageId: message.Id, Err: nil }
//case <-time.After(time.Second * 1):
// fmt.Println("worker timeout")
}
}
}
func main() {
reactor := NewReactor()
// Start reactor
go reactor.Loop()
// Make incoming messages
for {
id := time.Now().UnixNano()
message := Message{
Id: id,
Subject: "foo",
Body: "bar",
}
time.Sleep(100 * time.Millisecond)
fmt.Println("new message:", message.Id)
reactor.Mailbox <- message
}
}
Output
$ go run reactor2.go
new message: 1610667202836643810
new job 1610667202836643810
worker 1 have new work 1610667202836643810
new message: 1610667202943911256
new job 1610667202943911256
worker 7 have new work 1610667202943911256
new report 1610667202943911256 from worker 7
new message: 1610667203051048937
new job 1610667203051048937
worker 1 have new work 1610667203051048937
new report 1610667202836643810 from worker 1
new message: 1610667203154040232
new job 1610667203154040232
worker 5 have new work 1610667203154040232
new message: 1610667203256222056
new report 1610667203051048937 from worker 1
new job 1610667203256222056
worker 6 have new work 1610667203256222056
new report 1610667203154040232 from worker 5
new message: 1610667203361862402
new report 1610667203256222056 from worker 6
new job 1610667203361862402
worker 4 have new work 1610667203361862402
new message: 1610667203462710013
new job 1610667203462710013
worker 2 have new work 1610667203462710013
new report 1610667203361862402 from worker 4
new message: 1610667203564491658
new job 1610667203564491658
worker 8 have new work 1610667203564491658
new report 1610667203564491658 from worker 8
new message: 1610667203666838902
new job 1610667203666838902
worker 1 have new work 1610667203666838902
new report 1610667203462710013 from worker 2
new message: 1610667203769782557
new job 1610667203769782557
worker 7 have new work 1610667203769782557
new message: 1610667203870944714
new job 1610667203870944714
worker 5 have new work 1610667203870944714
new report 1610667203666838902 from worker 1
new message: 1610667203973248182
new job 1610667203973248182
worker 8 have new work 1610667203973248182
new message: 1610667204075115775
new job 1610667204075115775
worker 7 have new work 1610667204075115775
new report 1610667203769782557 from worker 7
new report 1610667203870944714 from worker 5
new message: 1610667204176946959
new job 1610667204176946959
new report 1610667203973248182 from worker 8
new message: 1610667204278434592
new job 1610667204278434592
worker 8 have new work 1610667204278434592
new message: 1610667204379519344
new job 1610667204379519344
worker 5 have new work 1610667204379519344
worker 7 have new work 1610667204176946959
new report 1610667204075115775 from worker 7
new report 1610667204176946959 from worker 7
new message: 1610667204481507375
new job 1610667204481507375
worker 7 have new work 1610667204481507375
new report 1610667204481507375 from worker 7
new message: 1610667204582071843
new job 1610667204582071843
worker 9 have new work 1610667204582071843
new report 1610667204278434592 from worker 8
new message: 1610667204683128531
new job 1610667204683128531
worker 7 have new work 1610667204683128531
new report 1610667204379519344 from worker 5
new report 1610667204582071843 from worker 9
new message: 1610667204783957724
new job 1610667204783957724
worker 5 have new work 1610667204783957724
new message: 1610667204884887144
new job 1610667204884887144
worker 3 have new work 1610667204884887144
new report 1610667204683128531 from worker 7
new message: 1610667204985404459
new job 1610667204985404459
worker 4 have new work 1610667204985404459
new report 1610667204783957724 from worker 5
new message: 1610667205085962637
new job 1610667205085962637
new report 1610667204985404459 from worker 4
worker 3 have new work 1610667205085962637
new report 1610667204884887144 from worker 3
new message: 1610667205186408166
new job 1610667205186408166
worker 8 have new work 1610667205186408166
new message: 1610667205286996043
new job 1610667205286996043
worker 9 have new work 1610667205286996043
new report 1610667205085962637 from worker 3
new message: 1610667205387453459
new job 1610667205387453459
worker 7 have new work 1610667205387453459
new report 1610667205286996043 from worker 9
new message: 1610667205487725718
new job 1610667205487725718
worker 9 have new work 1610667205487725718
new report 1610667205387453459 from worker 7
new report 1610667205186408166 from worker 8
new message: 1610667205588102438
new job 1610667205588102438
worker 0 have new work 1610667205588102438
new report 1610667205487725718 from worker 9
new message: 1610667205688668394
new job 1610667205688668394
worker 8 have new work 1610667205688668394
new message: 1610667205788911406
new job 1610667205788911406
worker 3 have new work 1610667205788911406
new report 1610667205688668394 from worker 8
^Csignal: interrupt