Queue

Queue #

The INFINI Framework provides a pluggable queue abstraction for asynchronous message passing between components. Queues decouple producers from consumers, enabling reliable data pipelines, buffering, and background processing. The framework ships with multiple backend implementations — disk-based persistence, in-memory queues, Kafka, and Redis — all accessed through a unified API.

Queue Interfaces #

The queue system defines three levels of capability through separate interfaces in core/queue/.

QueueAPI #

QueueAPI is the base interface that every queue backend must implement. It provides lifecycle management and basic push operations.

type QueueAPI interface {
    Init(cfg *QueueConfig) error
    Close(queueID string) error
    Push(queueID string, data []byte) error
    Destroy(queueID string) error
}
MethodDescription
Init(cfg *QueueConfig) errorInitializes a queue from the given configuration. Creates underlying storage or connections as needed.
Close(queueID string) errorCloses the queue, flushing any pending data. The queue can be reopened later.
Push(queueID string, data []byte) errorPushes a raw byte message onto the queue.
Destroy(queueID string) errorPermanently destroys the queue and all its data.

SimpleQueueAPI #

SimpleQueueAPI extends QueueAPI with synchronous pop and depth inspection. Use this for straightforward single-consumer scenarios.

type SimpleQueueAPI interface {
    QueueAPI
    Pop(queueID string, timeout time.Duration) (data []byte, err error)
    Depth(queueID string) int64
}
MethodDescription
Pop(queueID string, timeout time.Duration) ([]byte, error)Removes and returns the next message from the queue. Blocks up to timeout if the queue is empty.
Depth(queueID string) int64Returns the number of messages currently in the queue.

AdvancedQueueAPI #

AdvancedQueueAPI extends QueueAPI with producer/consumer lifecycle management, enabling multi-consumer patterns, offset tracking, and consumer groups.

type AdvancedQueueAPI interface {
    QueueAPI
    AcquireConsumer(k *QueueConfig, consumer *ConsumerConfig, clientID string) (ConsumerAPI, error)
    ReleaseConsumer(k *QueueConfig, consumer *ConsumerConfig, clientID string) error
    AcquireProducer(k *QueueConfig) (ProducerAPI, error)
    ReleaseProducer(k *QueueConfig) error
    GetQueues() []QueueConfig
}
MethodDescription
AcquireConsumer(k, consumer, clientID)Creates or retrieves a consumer bound to the specified queue and consumer group. The clientID identifies this particular consumer instance.
ReleaseConsumer(k, consumer, clientID)Releases the consumer, freeing any held resources and allowing rebalancing.
AcquireProducer(k)Creates or retrieves a producer for the specified queue.
ReleaseProducer(k)Releases the producer and its resources.
GetQueues() []QueueConfigReturns the configurations of all queues managed by this backend.

Producer and Consumer APIs #

Producers and consumers are stateful handles returned by the AdvancedQueueAPI. They provide a focused interface for writing and reading messages.

ProducerAPI #

type ProducerAPI interface {
    Push(data []byte) error
    Close() error
}
MethodDescription
Push(data []byte) errorSends a message to the queue this producer is bound to.
Close() errorCloses the producer and releases its resources.

ConsumerAPI #

type ConsumerAPI interface {
    FetchMessages(ctx context.Context, numOfMessages int) (messages []Message, timeout bool, err error)
    CommitOffset(offset Offset) error
    Close() error
}
MethodDescription
FetchMessages(ctx, numOfMessages)Fetches up to numOfMessages from the queue. Returns the messages, a flag indicating whether the call timed out, and any error. The ctx parameter supports cancellation.
CommitOffset(offset Offset) errorCommits the consumer’s read offset, marking messages as processed. This prevents redelivery after restart.
Close() errorCloses the consumer and releases its resources.

Queue Configuration #

Queues are configured using the QueueConfig struct. Configuration can come from YAML files or be constructed programmatically.

type QueueConfig struct {
    Source   string                 `config:"source" json:"source,omitempty"`
    ID       string                 `config:"id"     json:"id,omitempty"`
    Name     string                 `config:"name"   json:"name,omitempty"`
    Group    string                 `config:"group"  json:"group,omitempty"`
    Type     string                 `config:"type"   json:"type,omitempty"`
    Labels   map[string]interface{} `config:"label"  json:"label,omitempty"`
}
FieldTypeDescription
SourcestringIdentifies the origin or owning component of the queue.
IDstringUnique identifier for the queue instance. Auto-generated if not provided.
NamestringHuman-readable name for the queue. Used in configuration references and logging.
GroupstringLogical grouping for related queues. Useful for organizing queues by function or tenant.
TypestringSelects the queue backend implementation (e.g., disk, memory, kafka, redis).
Labelsmap[string]interface{}Arbitrary key-value metadata attached to the queue for filtering and categorization.

YAML Configuration #

Queues are defined under the queue section in the application’s YAML configuration file:

queue:
  - name: "my_queue"
    type: "disk"
    label:
      type: "logs"

  - name: "fast_queue"
    type: "memory"
    label:
      type: "metrics"

Registering Queue Backends #

Queue backends register themselves using Register or RegisterDefaultHandler. Registration typically happens inside a Go init() function so that importing the package is sufficient to activate the backend.

// Register a named queue backend
queue.Register(name string, h QueueAPI)

// Register the default handler used when no type is specified
queue.RegisterDefaultHandler(h QueueAPI)

Registration Pattern #

package my_queue

import "infini.sh/framework/core/queue"

func init() {
    handler := &MyQueueBackend{}
    queue.Register("my_backend", handler)
}

Once registered, the backend can be selected by setting type: "my_backend" in the queue configuration.

Producing Messages #

The framework provides package-level functions for producing messages without managing producer lifecycles manually.

Simple Push #

For one-off or low-frequency writes, use the package-level Push function via IniQueue and direct push:

cfg := &queue.QueueConfig{
    Name: "my_queue",
}

// Initialize the queue
err := queue.IniQueue(cfg)
if err != nil {
    log.Error("failed to initialize queue:", err)
    return
}

// Push a message
err = queue.Push(cfg, []byte("hello world"))
if err != nil {
    log.Error("failed to push message:", err)
}

Using a Producer #

For high-throughput scenarios, acquire a dedicated producer to amortize connection and buffer overhead:

cfg := &queue.QueueConfig{
    Name: "my_queue",
}

// Acquire a producer
producer, err := queue.AcquireProducer(cfg)
if err != nil {
    log.Error("failed to acquire producer:", err)
    return
}
defer queue.ReleaseProducer(cfg)

// Push multiple messages
for i := 0; i < 1000; i++ {
    data := []byte(fmt.Sprintf("message-%d", i))
    if err := producer.Push(data); err != nil {
        log.Error("push failed:", err)
        break
    }
}

Consuming Messages #

Consumers read messages from a queue with offset tracking, supporting at-least-once delivery semantics.

cfg := &queue.QueueConfig{
    Name: "my_queue",
}

consumerCfg := &queue.ConsumerConfig{
    Group: "my_consumer_group",
    Name:  "worker-1",
}

// Acquire a consumer
consumer, err := queue.AcquireConsumer(cfg, consumerCfg, "client-001")
if err != nil {
    log.Error("failed to acquire consumer:", err)
    return
}
defer queue.ReleaseConsumer(cfg, consumerCfg, "client-001")

// Fetch and process messages
ctx := context.Background()
for {
    messages, timeout, err := consumer.FetchMessages(ctx, 100)
    if err != nil {
        log.Error("fetch failed:", err)
        break
    }

    if timeout || len(messages) == 0 {
        continue
    }

    for _, msg := range messages {
        // Process the message
        processMessage(msg)
    }

    // Commit the offset of the last message
    lastMsg := messages[len(messages)-1]
    if err := consumer.CommitOffset(lastMsg.Offset); err != nil {
        log.Error("commit failed:", err)
    }
}

Checking Consumer Lag #

Use HasLag to determine whether a queue has unconsumed messages:

cfg := &queue.QueueConfig{
    Name: "my_queue",
}

if queue.HasLag(cfg) {
    log.Info("queue has pending messages")
}

Queue Backends #

The framework ships with four queue backend implementations. Each backend is activated by importing its package.

Disk Queue (Default) #

The disk queue provides persistent, file-based message storage. Messages survive application restarts, making it suitable for reliable data pipelines.

Activation:

import _ "infini.sh/framework/modules/queue/disk_queue"

Configuration:

queue:
  - name: "persistent_queue"
    type: "disk"

disk_queue:
  max_msg_size: 20485760          # Maximum message size in bytes (default ~20MB)
  max_bytes_per_file: 209715200   # Maximum segment file size (default ~200MB)
  sync_every_records: 10000       # Sync to disk every N records
  retention:
    max_num_of_local_files: 20    # Maximum number of segment files to retain
ParameterTypeDescription
max_msg_sizeintMaximum allowed size for a single message in bytes. Messages exceeding this limit are rejected.
max_bytes_per_fileintMaximum size of each segment file on disk. When reached, a new segment file is created.
sync_every_recordsintNumber of records to write before forcing a disk sync. Lower values increase durability at the cost of throughput.
retention.max_num_of_local_filesintMaximum number of segment files to keep. Older segments are removed when this limit is exceeded.

Memory Queue #

The memory queue stores messages in RAM for maximum throughput. Messages are lost on application restart. Use this for transient data, caching, or scenarios where speed matters more than durability.

Activation:

import _ "infini.sh/framework/modules/queue/mem_queue"

Configuration:

queue:
  - name: "fast_queue"
    type: "memory"

Kafka Queue #

The Kafka backend delegates to an external Apache Kafka cluster. Use this when you need distributed messaging, replication, and integration with the broader Kafka ecosystem.

Activation:

import _ "infini.sh/framework/modules/queue/kafka_queue"

Configuration:

queue:
  - name: "distributed_queue"
    type: "kafka"

Redis Queue #

The Redis backend uses Redis as the message broker. Use this for lightweight distributed queuing when a Redis instance is already available.

Activation:

import _ "infini.sh/framework/modules/queue/redis"

Configuration:

queue:
  - name: "redis_queue"
    type: "redis"

Complete Example #

Below is a complete example demonstrating queue initialization, producing, and consuming messages using the disk queue backend.

Application Setup #

package main

import (
    "context"
    "fmt"
    "time"

    log "github.com/cihub/seelog"
    "infini.sh/framework/core/queue"
    _ "infini.sh/framework/modules/queue/disk_queue"
)

func main() {
    // Define queue configuration
    cfg := &queue.QueueConfig{
        Name: "example_queue",
        Type: "disk",
        Labels: map[string]interface{}{
            "type": "demo",
        },
    }

    // Initialize the queue
    if err := queue.IniQueue(cfg); err != nil {
        log.Errorf("failed to init queue: %v", err)
        return
    }

    // Start a producer goroutine
    go produce(cfg)

    // Start a consumer
    consume(cfg)
}

func produce(cfg *queue.QueueConfig) {
    producer, err := queue.AcquireProducer(cfg)
    if err != nil {
        log.Errorf("failed to acquire producer: %v", err)
        return
    }
    defer queue.ReleaseProducer(cfg)

    for i := 0; i < 100; i++ {
        msg := []byte(fmt.Sprintf(`{"event": "click", "seq": %d}`, i))
        if err := producer.Push(msg); err != nil {
            log.Errorf("push error: %v", err)
            return
        }
    }
    log.Info("finished producing 100 messages")
}

func consume(cfg *queue.QueueConfig) {
    consumerCfg := &queue.ConsumerConfig{
        Group: "demo_group",
        Name:  "demo_consumer",
    }

    consumer, err := queue.AcquireConsumer(cfg, consumerCfg, "client-1")
    if err != nil {
        log.Errorf("failed to acquire consumer: %v", err)
        return
    }
    defer queue.ReleaseConsumer(cfg, consumerCfg, "client-1")

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    total := 0
    for {
        messages, timeout, err := consumer.FetchMessages(ctx, 10)
        if err != nil {
            log.Errorf("fetch error: %v", err)
            break
        }

        if timeout {
            log.Info("fetch timed out, stopping consumer")
            break
        }

        for _, msg := range messages {
            fmt.Printf("received: %s\n", string(msg.Data))
            total++
        }

        if len(messages) > 0 {
            last := messages[len(messages)-1]
            if err := consumer.CommitOffset(last.Offset); err != nil {
                log.Errorf("commit error: %v", err)
            }
        }
    }

    log.Infof("consumed %d messages total", total)
}

YAML Configuration #

queue:
  - name: "example_queue"
    type: "disk"
    label:
      type: "demo"

disk_queue:
  max_msg_size: 20485760
  max_bytes_per_file: 209715200
  sync_every_records: 10000
  retention:
    max_num_of_local_files: 20

pipeline:
  - name: queue_demo
    auto_start: true
    keep_running: false
    processor:
      - echo:
          message: "queue demo pipeline started"
Edit Edit this page