Files
2024-01-23 18:41:00 +01:00

89 lines
2.1 KiB
Go

package burst_queue
import (
"log"
"sync"
"time"
)
// BurstQueue is a queue that will flush its entries when it reaches a certain size or after a certain timeout.
type BurstQueue struct {
maxQueueSize uint
burstTimeout time.Duration
burstCallback func(entries []interface{}, elapsed time.Duration)
entries []interface{}
startTime time.Time
flushMu sync.Mutex
timerMu sync.Mutex
}
// NewBurstQueue creates a new BurstQueue.
// maxQueueSize is the maximum number of entries to queue before flushing.
// burstTimeout is the maximum time to wait before flushing.
// burstCallback is the function to call when flushing.
func NewBurstQueue(maxQueueSize uint, burstTimeout time.Duration, burstCallback func(entries []interface{}, elapsed time.Duration)) *BurstQueue {
return &BurstQueue{
maxQueueSize: maxQueueSize,
burstTimeout: burstTimeout,
burstCallback: burstCallback,
}
}
// startTimeout starts the timeout.
func (bq *BurstQueue) startTimeout() {
newStart := time.Now()
bq.timerMu.Lock()
bq.startTime = newStart
bq.timerMu.Unlock()
time.Sleep(bq.burstTimeout)
bq.timerMu.Lock()
shouldFlush := bq.startTime.Equal(newStart) && !bq.startTime.IsZero()
bq.timerMu.Unlock()
if shouldFlush {
bq.FlushEntries()
}
}
// Add adds an entry to the queue and restarts the timeout.
func (bq *BurstQueue) Add(entry interface{}) {
go bq.startTimeout()
bq.entries = append(bq.entries, entry)
if len(bq.entries) >= int(bq.maxQueueSize) {
bq.FlushEntries()
}
}
// FlushEntries flushes the queue.
func (bq *BurstQueue) FlushEntries() {
bq.timerMu.Lock()
if bq.startTime.IsZero() || bq.entries == nil {
bq.timerMu.Unlock()
return // not started yet or already flushed
}
actualTimeout := time.Since(bq.startTime)
bq.startTime = time.Time{}
bq.timerMu.Unlock()
bq.flushMu.Lock()
bq.burstCallback(bq.entries, actualTimeout)
bq.entries = nil
bq.flushMu.Unlock()
}
func ConvertSlice[T any](src []interface{}) []T {
dst := make([]T, len(src))
for i, v := range src {
var ok bool
if dst[i], ok = v.(T); !ok {
log.Fatalf("item at index %d is not of type %T", i, dst[i])
}
}
return dst
}