89 lines
2.1 KiB
Go
89 lines
2.1 KiB
Go
package burst_queue
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// BurstQueue is a queue that will flush its entries when it reaches a certain size or after a certain timeout.
|
|
type BurstQueue struct {
|
|
maxQueueSize uint
|
|
burstTimeout time.Duration
|
|
burstCallback func(entries []interface{}, elapsed time.Duration)
|
|
entries []interface{}
|
|
startTime time.Time
|
|
flushMu sync.Mutex
|
|
timerMu sync.Mutex
|
|
}
|
|
|
|
// NewBurstQueue creates a new BurstQueue.
|
|
// maxQueueSize is the maximum number of entries to queue before flushing.
|
|
// burstTimeout is the maximum time to wait before flushing.
|
|
// burstCallback is the function to call when flushing.
|
|
func NewBurstQueue(maxQueueSize uint, burstTimeout time.Duration, burstCallback func(entries []interface{}, elapsed time.Duration)) *BurstQueue {
|
|
return &BurstQueue{
|
|
maxQueueSize: maxQueueSize,
|
|
burstTimeout: burstTimeout,
|
|
burstCallback: burstCallback,
|
|
}
|
|
}
|
|
|
|
// startTimeout starts the timeout.
|
|
func (bq *BurstQueue) startTimeout() {
|
|
newStart := time.Now()
|
|
|
|
bq.timerMu.Lock()
|
|
bq.startTime = newStart
|
|
bq.timerMu.Unlock()
|
|
|
|
time.Sleep(bq.burstTimeout)
|
|
|
|
bq.timerMu.Lock()
|
|
shouldFlush := bq.startTime.Equal(newStart) && !bq.startTime.IsZero()
|
|
bq.timerMu.Unlock()
|
|
|
|
if shouldFlush {
|
|
bq.FlushEntries()
|
|
}
|
|
}
|
|
|
|
// Add adds an entry to the queue and restarts the timeout.
|
|
func (bq *BurstQueue) Add(entry interface{}) {
|
|
go bq.startTimeout()
|
|
|
|
bq.entries = append(bq.entries, entry)
|
|
|
|
if len(bq.entries) >= int(bq.maxQueueSize) {
|
|
bq.FlushEntries()
|
|
}
|
|
}
|
|
|
|
// FlushEntries flushes the queue.
|
|
func (bq *BurstQueue) FlushEntries() {
|
|
bq.timerMu.Lock()
|
|
if bq.startTime.IsZero() || bq.entries == nil {
|
|
bq.timerMu.Unlock()
|
|
return // not started yet or already flushed
|
|
}
|
|
actualTimeout := time.Since(bq.startTime)
|
|
bq.startTime = time.Time{}
|
|
bq.timerMu.Unlock()
|
|
|
|
bq.flushMu.Lock()
|
|
bq.burstCallback(bq.entries, actualTimeout)
|
|
bq.entries = nil
|
|
bq.flushMu.Unlock()
|
|
}
|
|
|
|
func ConvertSlice[T any](src []interface{}) []T {
|
|
dst := make([]T, len(src))
|
|
for i, v := range src {
|
|
var ok bool
|
|
if dst[i], ok = v.(T); !ok {
|
|
log.Fatalf("item at index %d is not of type %T", i, dst[i])
|
|
}
|
|
}
|
|
return dst
|
|
}
|