Properly flush unique queues on startup (#23154)

There have been a number of reports of PRs being blocked whilst being
checked which have been difficult to debug. In investigating #23050 I
have realised that whilst the Warn there is somewhat of a miscall there
was a real bug in the way that the LevelUniqueQueue was being restored
on start-up of the PersistableChannelUniqueQueue.

Next there is a conflict in the setting of the internal leveldb queue
name - This wasn't being set so it was being overridden by other unique
queues.

This PR fixes these bugs and adds a testcase.

Thanks to @brechtvl  for noticing the second issue.

Fix #23050
and others

---------

Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: techknowlogick <techknowlogick@gitea.io>
This commit is contained in:
zeripath 2023-02-28 22:55:43 +00:00 committed by GitHub
parent 04347eb810
commit 27e49cd01c
Signed by: GitHub
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 332 additions and 21 deletions

@ -124,7 +124,10 @@ func (q *ChannelQueue) Shutdown() {
log.Trace("ChannelQueue: %s Flushing", q.name)
// We can't use Cleanup here because that will close the channel
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
count := atomic.LoadInt64(&q.numInQueue)
if count > 0 {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
}
return
}
log.Debug("ChannelQueue: %s Flushed", q.name)

@ -94,7 +94,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
@ -172,16 +173,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go func() {
for !q.IsEmpty() {
_ = q.internal.Flush(0)
for !lq.IsEmpty() {
_ = lq.Flush(0)
select {
case <-time.After(100 * time.Millisecond):
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
case <-lq.shutdownCtx.Done():
if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
}
return
}
}
@ -316,10 +319,22 @@ func (q *PersistableChannelQueue) Shutdown() {
// Redirect all remaining data in the chan to the internal channel
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
close(q.channelQueue.dataChan)
countOK, countLost := 0, 0
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
err := q.internal.Push(data)
if err != nil {
log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
countLost++
} else {
countOK++
}
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
if countLost > 0 {
log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
} else if countOK > 0 {
log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)

@ -39,7 +39,7 @@ func TestPersistableChannelQueue(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "first",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)
@ -135,7 +135,7 @@ func TestPersistableChannelQueue(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "second",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)
@ -227,7 +227,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "first",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)
@ -433,7 +433,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
Workers: 1,
BoostWorkers: 0,
MaxWorkers: 10,
Name: "second",
Name: "test-queue",
}, &testData{})
assert.NoError(t, err)
pausable, ok = queue.(Pausable)

@ -177,7 +177,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
go func() {
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
if !q.IsEmpty() {
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
}
return
}
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)

@ -8,10 +8,13 @@ import (
"testing"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)
func TestChannelUniqueQueue(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
handleChan := make(chan *testData)
handle := func(data ...Data) []Data {
for _, datum := range data {
@ -52,6 +55,8 @@ func TestChannelUniqueQueue(t *testing.T) {
}
func TestChannelUniqueQueue_Batch(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
handleChan := make(chan *testData)
handle := func(data ...Data) []Data {
for _, datum := range data {
@ -98,6 +103,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
}
func TestChannelUniqueQueue_Pause(t *testing.T) {
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
lock := sync.Mutex{}
var queue Queue
var err error

@ -94,7 +94,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
},
Workers: 0,
},
DataDir: config.DataDir,
DataDir: config.DataDir,
QueueName: config.Name + "-level",
}
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
@ -209,17 +210,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
atTerminate(q.Terminate)
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
// Just run the level queue - we shut it down once it's flushed
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
go luq.Run(func(_ func()) {}, func(_ func()) {})
go func() {
_ = q.internal.Flush(0)
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
_ = luq.Flush(0)
for !luq.IsEmpty() {
_ = luq.Flush(0)
select {
case <-time.After(100 * time.Millisecond):
case <-luq.shutdownCtx.Done():
if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
}
return
}
}
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
luq.Shutdown()
GetManager().Remove(luq.qid)
}()
} else {
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
_ = q.internal.Flush(0)
q.internal.(*LevelUniqueQueue).Shutdown()
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
}
@ -285,8 +298,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
// Redirect all remaining data in the chan to the internal channel
close(q.channelQueue.dataChan)
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
countOK, countLost := 0, 0
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
err := q.internal.(*LevelUniqueQueue).Push(data)
if err != nil {
log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
countLost++
} else {
countOK++
}
}
if countLost > 0 {
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
} else if countOK > 0 {
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

@ -0,0 +1,259 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package queue
import (
"fmt"
"strconv"
"sync"
"testing"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)
func TestPersistableChannelUniqueQueue(t *testing.T) {
tmpDir := t.TempDir()
fmt.Printf("TempDir %s\n", tmpDir)
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
// Common function to create the Queue
newQueue := func(name string, handle func(data ...Data) []Data) Queue {
q, err := NewPersistableChannelUniqueQueue(handle,
PersistableChannelUniqueQueueConfiguration{
Name: name,
DataDir: tmpDir,
QueueLength: 200,
MaxWorkers: 1,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 1,
Workers: 0,
}, "task-0")
assert.NoError(t, err)
return q
}
// runs the provided queue and provides some timer function
type channels struct {
readyForShutdown chan struct{} // closed when shutdown functions have been assigned
readyForTerminate chan struct{} // closed when terminate functions have been assigned
signalShutdown chan struct{} // Should close to signal shutdown
doneShutdown chan struct{} // closed when shutdown function is done
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
}
runQueue := func(q Queue, lock *sync.Mutex) *channels {
chans := &channels{
readyForShutdown: make(chan struct{}),
readyForTerminate: make(chan struct{}),
signalShutdown: make(chan struct{}),
doneShutdown: make(chan struct{}),
}
go q.Run(func(atShutdown func()) {
go func() {
lock.Lock()
select {
case <-chans.readyForShutdown:
default:
close(chans.readyForShutdown)
}
lock.Unlock()
<-chans.signalShutdown
atShutdown()
close(chans.doneShutdown)
}()
}, func(atTerminate func()) {
lock.Lock()
defer lock.Unlock()
select {
case <-chans.readyForTerminate:
default:
close(chans.readyForTerminate)
}
chans.queueTerminate = append(chans.queueTerminate, atTerminate)
})
return chans
}
// call to shutdown and terminate the queue associated with the channels
doTerminate := func(chans *channels, lock *sync.Mutex) {
<-chans.readyForTerminate
lock.Lock()
callbacks := []func(){}
callbacks = append(callbacks, chans.queueTerminate...)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}
mapLock := sync.Mutex{}
executedInitial := map[string][]string{}
hasInitial := map[string][]string{}
fillQueue := func(name string, done chan struct{}) {
t.Run("Initial Filling: "+name, func(t *testing.T) {
lock := sync.Mutex{}
startAt100Queued := make(chan struct{})
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
handle := func(data ...Data) []Data {
<-startAt100Queued
for _, datum := range data {
s := datum.(string)
mapLock.Lock()
executedInitial[name] = append(executedInitial[name], s)
mapLock.Unlock()
if s == "task-20" {
close(stopAt20Shutdown)
}
}
return nil
}
q := newQueue(name, handle)
// add 100 tasks to the queue
for i := 0; i < 100; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
}
close(startAt100Queued)
chans := runQueue(q, &lock)
<-chans.readyForShutdown
<-stopAt20Shutdown
close(chans.signalShutdown)
<-chans.doneShutdown
_ = q.Push("final")
// check which tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
mapLock.Lock()
hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
mapLock.Unlock()
}
}
if has, _ := q.(UniqueQueue).Has("final"); has {
mapLock.Lock()
hasInitial[name] = append(hasInitial[name], "final")
mapLock.Unlock()
} else {
assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
}
doTerminate(chans, &lock)
mapLock.Lock()
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
mapLock.Unlock()
})
close(done)
}
doneA := make(chan struct{})
doneB := make(chan struct{})
go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
<-doneA
<-doneB
executedEmpty := map[string][]string{}
hasEmpty := map[string][]string{}
emptyQueue := func(name string, done chan struct{}) {
t.Run("Empty Queue: "+name, func(t *testing.T) {
lock := sync.Mutex{}
stop := make(chan struct{})
// collect the tasks that have been executed
handle := func(data ...Data) []Data {
lock.Lock()
for _, datum := range data {
mapLock.Lock()
executedEmpty[name] = append(executedEmpty[name], datum.(string))
mapLock.Unlock()
if datum.(string) == "final" {
close(stop)
}
}
lock.Unlock()
return nil
}
q := newQueue(name, handle)
chans := runQueue(q, &lock)
<-chans.readyForShutdown
<-stop
close(chans.signalShutdown)
<-chans.doneShutdown
// check which tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
mapLock.Lock()
hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
mapLock.Unlock()
}
}
doTerminate(chans, &lock)
mapLock.Lock()
assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
assert.Equal(t, 0, len(hasEmpty[name]))
mapLock.Unlock()
})
close(done)
}
doneA = make(chan struct{})
doneB = make(chan struct{})
go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
<-doneA
<-doneB
mapLock.Lock()
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
// reset and rerun
executedInitial = map[string][]string{}
hasInitial = map[string][]string{}
executedEmpty = map[string][]string{}
hasEmpty = map[string][]string{}
mapLock.Unlock()
doneA = make(chan struct{})
doneB = make(chan struct{})
go fillQueue("QueueA", doneA)
go fillQueue("QueueB", doneB)
<-doneA
<-doneB
doneA = make(chan struct{})
doneB = make(chan struct{})
go emptyQueue("QueueA", doneA)
go emptyQueue("QueueB", doneB)
<-doneA
<-doneB
mapLock.Lock()
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
mapLock.Unlock()
}