diff --git a/.idea/runConfigurations/go_build_InfantrySkillCalculator.xml b/.idea/runConfigurations/go_build_InfantrySkillCalculator.xml index 87e4f97..17fa215 100644 --- a/.idea/runConfigurations/go_build_InfantrySkillCalculator.xml +++ b/.idea/runConfigurations/go_build_InfantrySkillCalculator.xml @@ -7,7 +7,7 @@ - \ No newline at end of file diff --git a/go.mod b/go.mod index 28b53fa..a04ba88 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,8 @@ require ( replace internal/session => ./internal/session +replace internal/burst_queue => ./internal/burst_queue + require ( github.com/bytedance/sonic v1.9.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect diff --git a/go.work b/go.work index 0f4511e..ee395d6 100644 --- a/go.work +++ b/go.work @@ -4,4 +4,5 @@ use ( . internal/cache internal/session + internal/burst_queue ) \ No newline at end of file diff --git a/internal/burst_queue/burst_queue.go b/internal/burst_queue/burst_queue.go new file mode 100644 index 0000000..24ecbb8 --- /dev/null +++ b/internal/burst_queue/burst_queue.go @@ -0,0 +1,88 @@ +package burst_queue + +import ( + "log" + "sync" + "time" +) + +// BurstQueue is a queue that will flush its entries when it reaches a certain size or after a certain timeout. +type BurstQueue struct { + maxQueueSize uint + burstTimeout time.Duration + burstCallback func(entries []interface{}, elapsed time.Duration) + entries []interface{} + startTime time.Time + flushMu sync.Mutex + timerMu sync.Mutex +} + +// NewBurstQueue creates a new BurstQueue. +// maxQueueSize is the maximum number of entries to queue before flushing. +// burstTimeout is the maximum time to wait before flushing. +// burstCallback is the function to call when flushing. +func NewBurstQueue(maxQueueSize uint, burstTimeout time.Duration, burstCallback func(entries []interface{}, elapsed time.Duration)) *BurstQueue { + return &BurstQueue{ + maxQueueSize: maxQueueSize, + burstTimeout: burstTimeout, + burstCallback: burstCallback, + } +} + +// startTimeout starts the timeout. +func (bq *BurstQueue) startTimeout() { + newStart := time.Now() + + bq.timerMu.Lock() + bq.startTime = newStart + bq.timerMu.Unlock() + + time.Sleep(bq.burstTimeout) + + bq.timerMu.Lock() + shouldFlush := bq.startTime.Equal(newStart) && !bq.startTime.IsZero() + bq.timerMu.Unlock() + + if shouldFlush { + bq.FlushEntries() + } +} + +// Add adds an entry to the queue and restarts the timeout. +func (bq *BurstQueue) Add(entry interface{}) { + go bq.startTimeout() + + bq.entries = append(bq.entries, entry) + + if len(bq.entries) >= int(bq.maxQueueSize) { + bq.FlushEntries() + } +} + +// FlushEntries flushes the queue. +func (bq *BurstQueue) FlushEntries() { + bq.timerMu.Lock() + if bq.startTime.IsZero() || bq.entries == nil { + bq.timerMu.Unlock() + return // not started yet or already flushed + } + actualTimeout := time.Since(bq.startTime) + bq.startTime = time.Time{} + bq.timerMu.Unlock() + + bq.flushMu.Lock() + bq.burstCallback(bq.entries, actualTimeout) + bq.entries = nil + bq.flushMu.Unlock() +} + +func ConvertSlice[T any](src []interface{}) []T { + dst := make([]T, len(src)) + for i, v := range src { + var ok bool + if dst[i], ok = v.(T); !ok { + log.Fatalf("item at index %d is not of type %T", i, dst[i]) + } + } + return dst +} diff --git a/internal/burst_queue/go.mod b/internal/burst_queue/go.mod new file mode 100644 index 0000000..e463e15 --- /dev/null +++ b/internal/burst_queue/go.mod @@ -0,0 +1,3 @@ +module burst_queue + +go 1.21 \ No newline at end of file