package burst_queue import ( "log" "sync" "time" ) // BurstQueue is a queue that will flush its entries when it reaches a certain size or after a certain timeout. type BurstQueue struct { maxQueueSize uint burstTimeout time.Duration burstCallback func(entries []interface{}, elapsed time.Duration) entries []interface{} startTime time.Time flushMu sync.Mutex timerMu sync.Mutex } // NewBurstQueue creates a new BurstQueue. // maxQueueSize is the maximum number of entries to queue before flushing. // burstTimeout is the maximum time to wait before flushing. // burstCallback is the function to call when flushing. func NewBurstQueue(maxQueueSize uint, burstTimeout time.Duration, burstCallback func(entries []interface{}, elapsed time.Duration)) *BurstQueue { return &BurstQueue{ maxQueueSize: maxQueueSize, burstTimeout: burstTimeout, burstCallback: burstCallback, } } // startTimeout starts the timeout. func (bq *BurstQueue) startTimeout() { newStart := time.Now() bq.timerMu.Lock() bq.startTime = newStart bq.timerMu.Unlock() time.Sleep(bq.burstTimeout) bq.timerMu.Lock() shouldFlush := bq.startTime.Equal(newStart) && !bq.startTime.IsZero() bq.timerMu.Unlock() if shouldFlush { bq.FlushEntries() } } // Add adds an entry to the queue and restarts the timeout. func (bq *BurstQueue) Add(entry interface{}) { go bq.startTimeout() bq.entries = append(bq.entries, entry) if len(bq.entries) >= int(bq.maxQueueSize) { bq.FlushEntries() } } // FlushEntries flushes the queue. func (bq *BurstQueue) FlushEntries() { bq.timerMu.Lock() if bq.startTime.IsZero() || bq.entries == nil { bq.timerMu.Unlock() return // not started yet or already flushed } actualTimeout := time.Since(bq.startTime) bq.startTime = time.Time{} bq.timerMu.Unlock() bq.flushMu.Lock() bq.burstCallback(bq.entries, actualTimeout) bq.entries = nil bq.flushMu.Unlock() } func ConvertSlice[T any](src []interface{}) []T { dst := make([]T, len(src)) for i, v := range src { var ok bool if dst[i], ok = v.(T); !ok { log.Fatalf("item at index %d is not of type %T", i, dst[i]) } } return dst }