RabbitMQ’s Topic (topic)

#Connection

import (
    "context"
    "fmt"
    "log"
    "time"
    "strings"
    
    "github.com/streadway/amqp"
)
var addr = "amqp://admin:admin@rabbitmq1:5672/"
var (
    connect func(addr string) (*amqp.Connection, error) 
    handle func (addr string, ch chan *amqp.Error, conn *amqp.Connection) 
)

connect = func(addr string) (*amqp.Connection, error) {
    conn, err := amqp.Dial(addr);
    connErrorChan := conn.NotifyClose(make(chan *amqp.Error))
    go handle(addr, connErrorChan, conn)
    return conn, err
}

handle = func(addr string, ch chan *amqp.Error, conn *amqp.Connection) {
    err := <- ch
    fmt.Println("connection lost... ")
    t1 := time.Now()
    for {
        c, err := connect(addr)
        if err != nil {
          break
        }
        fmt.Printf("reconnected in %s...\n", time.Since(t1))
        conn = c
    }
}
conn, err := connect(addr)
if err != nil {
  log.Fatal(err)
}

#Code

type Queue struct {
    Name       string 
    Durable    bool
    Exclusive  bool
    AutoDelete bool
    NoWait     bool
    Args       amqp.Table
}
type Exchange struct {
    Name       string
    Type       string
    Durable    bool
    AutoDelete bool
    Internal   bool
    NoWait     bool 
    Args       amqp.Table
}
type PublishOptions struct {
    Exchange  string
    Routing   string
    Mandatory bool
    Immediate bool
}
type ConsumeOptions struct {
    Queue     string
    Name      string
    NoLocal   bool     
    NoAck     bool      
    Exclusive bool
    NoWait    bool
    Args      amqp.Table
}
func QueueDeclare(ch *amqp.Channel, q Queue) (amqp.Queue, error) {
    if q.Args == nil {
        q.Args = amqp.Table{}
    } 
    
    return ch.QueueDeclare(q.Name, q.Durable, q.AutoDelete, q.Exclusive, q.NoWait, q.Args)
}
func DeleteQueue(ch *amqp.Channel, name string) (string, error) {
    cnt, err := ch.QueueDelete(name, false, false, true)
    return fmt.Sprintf("Deleted %d messages", cnt), err
}
func CreateExchange(ch *amqp.Channel, e Exchange) error {
    if e.Args == nil {
        e.Args = amqp.Table{}
    } 
    
    return ch.ExchangeDeclare(e.Name, e.Type, e.Durable, e.AutoDelete, e.Internal, e.NoWait, e.Args)
}
func Publish(ch *amqp.Channel, p PublishOptions, m amqp.Publishing) error {
    return ch.Publish(p.Exchange, p.Routing, p.Mandatory, p.Immediate, m)
}
func Consume(ch *amqp.Channel, c ConsumeOptions) (<-chan amqp.Delivery, error) {
    if c.Args == nil {
        c.Args = amqp.Table{}
    } 
    
    return ch.Consume(c.Queue, c.Name, c.NoAck, c.Exclusive, c.NoLocal, c.NoWait, c.Args)
}
import "math/rand"
import "context"

func Consumer(done chan struct{}, consumer ConsumeOptions, messages <-chan amqp.Delivery) {
    fmt.Printf("Start Consumer: %s\n", consumer.Name)
    defer fmt.Printf("Exiting consumer: %s\n", consumer.Name)
 
    for {
        // random deley helps to switch consumers to other goroutines
        time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
            
        select {
            case m, ok := <-messages: 
            if !ok {
                continue
            }   
            
            if len(m.Body) == 0 {
                time.Sleep(time.Second)
                continue
            } 
            
            fmt.Printf("%s > %s\n", consumer.Name, string(m.Body))
            
            
            case <-done:
                return
            default:
        }       
    } 
}

#Example

done := make(chan struct{})

nameOfExchange := "fizbuzreal"

ch, err := conn.Channel()
if err != nil {
    log.Fatal("foo", err)
}

exchangeOptions := Exchange{
    Name: nameOfExchange,
    Type: "topic",
    Durable: true,
}
if err := CreateExchange(ch, exchangeOptions); err != nil {
    log.Fatal("Exchange Not Created", err)
}
// --- Queues Createion --------------------------------------------------------------
var queues = []string{}
var fizbuz = map[int]string{ 1:"all.#", 3:"*.fiz", 5:"*.buz", 15:"*.fiz.buz"}
for k, routingKey := range fizbuz {
    queue, err := QueueDeclare(ch, Queue{Exclusive:true})
    if err != nil {
        log.Fatal("bar", err)
    }

    if err = ch.QueueBind(queue.Name, routingKey, nameOfExchange, false, nil); err != nil {
        log.Fatal("ch.QueueBind", err)
    }
    
    queues = append(queues, queue.Name)
}
for i := 1; i <= 15; i++ {
    names := []string{"all"}
    if i % 3 == 0 {
        names = append(names, "fiz")
    }
    
    if i % 5 == 0{
        names = append(names, "buz")
    }
    key := strings.Join(names, ".")
     
    Publish(ch, PublishOptions{
            Routing: key,
            Exchange: nameOfExchange,
    }, amqp.Publishing{
        Body: []byte(fmt.Sprintf("Event (%s) #%d", key, i)),
            DeliveryMode: 2, 
    })
}
// --- Queues Consuming --------------------------------------------------------------
for i := range queues {
    cons := ConsumeOptions{
        Name:  fmt.Sprintf("consumer: %d", i),
        NoAck: true,
        Queue: queues[i],
    }
    chMsg, err := Consume(ch, cons)
    fmt.Printf("Consumer, %v\n", err)

    go Consumer(done, cons, chMsg)
}

time.Sleep(time.Second)

> Consumer, <nil>
> Start Consumer: consumer: 0
> consumer: 0 > Event (all) #1
> Consumer, <nil>
> consumer: 0 > Event (all) #2
> Start Consumer: consumer: 1
> Consumer, <nil>
> consumer: 0 > Event (all.fiz) #3
> Start Consumer: consumer: 2
> Consumer, <nil>
> Start Consumer: consumer: 3
> consumer: 3 > Event (all.fiz.buz) #15
> consumer: 1 > Event (all.fiz) #3
> consumer: 0 > Event (all) #4
> consumer: 2 > Event (all.buz) #5
> consumer: 1 > Event (all.fiz) #6
> consumer: 2 > Event (all.buz) #10
> consumer: 0 > Event (all.buz) #5
> consumer: 0 > Event (all.fiz) #6
> consumer: 1 > Event (all.fiz) #9
> consumer: 0 > Event (all) #7
> consumer: 0 > Event (all) #8
> consumer: 1 > Event (all.fiz) #12
> consumer: 0 > Event (all.fiz) #9
> consumer: 0 > Event (all.buz) #10
> consumer: 0 > Event (all) #11
> consumer: 0 > Event (all.fiz) #12
> consumer: 0 > Event (all) #13
> consumer: 0 > Event (all) #14
> consumer: 0 > Event (all.fiz.buz) #15
// --- Cleanup --------------------------------------------------------------
time.AfterFunc(6*time.Second, func(){
    fmt.Println("Cleanup...")
    close(done) 
    
    for i := range queues{
        info, err := ch.QueueInspect(queues[i])
        fmt.Printf("Queue To Be deleted: %#v, %v\n", info, err)
        DeleteQueue(ch, queues[i])
    }
    
    ch.Close()
})

<-done
// one more pause to get messages back to jupyter
time.Sleep(time.Second)

> Cleanup...
> Queue To Be deleted: amqp.Queue{Name:"amq.gen-FI-iFVqHj860A0tMxwXixQ", Messages:0, Consumers:1}, <nil>
> Exiting consumer: consumer: 0
> Queue To Be deleted: amqp.Queue{Name:"amq.gen-L2RVq5C208qDOIMYSwGbKA", Messages:0, Consumers:1}, <nil>
> Exiting consumer: consumer: 3
> Exiting consumer: consumer: 2
> Queue To Be deleted: amqp.Queue{Name:"amq.gen-J8ghh9AR6HnWzfjxaGtYVw", Messages:0, Consumers:1}, <nil>
> Exiting consumer: consumer: 1
> Queue To Be deleted: amqp.Queue{Name:"amq.gen-Law9XxOLd9bGcgb6uS5x-g", Messages:0, Consumers:1}, <nil>