Add burst_queue (for now just to archive it)
This commit is contained in:
@@ -7,7 +7,7 @@
|
||||
<directory value="$PROJECT_DIR$" />
|
||||
<filePath value="$PROJECT_DIR$/cmd/main.go" />
|
||||
<method v="2">
|
||||
<option name="RunConfigurationTask" enabled="true" run_configuration_name="Redis" run_configuration_type="docker-deploy" />
|
||||
<option name="RunConfigurationTask" enabled="false" run_configuration_name="Redis" run_configuration_type="docker-deploy" />
|
||||
</method>
|
||||
</configuration>
|
||||
</component>
|
||||
2
go.mod
2
go.mod
@@ -20,6 +20,8 @@ require (
|
||||
|
||||
replace internal/session => ./internal/session
|
||||
|
||||
replace internal/burst_queue => ./internal/burst_queue
|
||||
|
||||
require (
|
||||
github.com/bytedance/sonic v1.9.1 // indirect
|
||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
|
||||
|
||||
88
internal/burst_queue/burst_queue.go
Normal file
88
internal/burst_queue/burst_queue.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package burst_queue
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BurstQueue is a queue that will flush its entries when it reaches a certain size or after a certain timeout.
|
||||
type BurstQueue struct {
|
||||
maxQueueSize uint
|
||||
burstTimeout time.Duration
|
||||
burstCallback func(entries []interface{}, elapsed time.Duration)
|
||||
entries []interface{}
|
||||
startTime time.Time
|
||||
flushMu sync.Mutex
|
||||
timerMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewBurstQueue creates a new BurstQueue.
|
||||
// maxQueueSize is the maximum number of entries to queue before flushing.
|
||||
// burstTimeout is the maximum time to wait before flushing.
|
||||
// burstCallback is the function to call when flushing.
|
||||
func NewBurstQueue(maxQueueSize uint, burstTimeout time.Duration, burstCallback func(entries []interface{}, elapsed time.Duration)) *BurstQueue {
|
||||
return &BurstQueue{
|
||||
maxQueueSize: maxQueueSize,
|
||||
burstTimeout: burstTimeout,
|
||||
burstCallback: burstCallback,
|
||||
}
|
||||
}
|
||||
|
||||
// startTimeout starts the timeout.
|
||||
func (bq *BurstQueue) startTimeout() {
|
||||
newStart := time.Now()
|
||||
|
||||
bq.timerMu.Lock()
|
||||
bq.startTime = newStart
|
||||
bq.timerMu.Unlock()
|
||||
|
||||
time.Sleep(bq.burstTimeout)
|
||||
|
||||
bq.timerMu.Lock()
|
||||
shouldFlush := bq.startTime.Equal(newStart) && !bq.startTime.IsZero()
|
||||
bq.timerMu.Unlock()
|
||||
|
||||
if shouldFlush {
|
||||
bq.FlushEntries()
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds an entry to the queue and restarts the timeout.
|
||||
func (bq *BurstQueue) Add(entry interface{}) {
|
||||
go bq.startTimeout()
|
||||
|
||||
bq.entries = append(bq.entries, entry)
|
||||
|
||||
if len(bq.entries) >= int(bq.maxQueueSize) {
|
||||
bq.FlushEntries()
|
||||
}
|
||||
}
|
||||
|
||||
// FlushEntries flushes the queue.
|
||||
func (bq *BurstQueue) FlushEntries() {
|
||||
bq.timerMu.Lock()
|
||||
if bq.startTime.IsZero() || bq.entries == nil {
|
||||
bq.timerMu.Unlock()
|
||||
return // not started yet or already flushed
|
||||
}
|
||||
actualTimeout := time.Since(bq.startTime)
|
||||
bq.startTime = time.Time{}
|
||||
bq.timerMu.Unlock()
|
||||
|
||||
bq.flushMu.Lock()
|
||||
bq.burstCallback(bq.entries, actualTimeout)
|
||||
bq.entries = nil
|
||||
bq.flushMu.Unlock()
|
||||
}
|
||||
|
||||
func ConvertSlice[T any](src []interface{}) []T {
|
||||
dst := make([]T, len(src))
|
||||
for i, v := range src {
|
||||
var ok bool
|
||||
if dst[i], ok = v.(T); !ok {
|
||||
log.Fatalf("item at index %d is not of type %T", i, dst[i])
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
3
internal/burst_queue/go.mod
Normal file
3
internal/burst_queue/go.mod
Normal file
@@ -0,0 +1,3 @@
|
||||
module burst_queue
|
||||
|
||||
go 1.21
|
||||
Reference in New Issue
Block a user