From 06b9d553bc02f88553ced9822c55ae901e2ac28e Mon Sep 17 00:00:00 2001 From: zeripath Date: Mon, 30 Aug 2021 05:27:51 +0100 Subject: [PATCH] Timeout on flush in testing (#16864) * Timeout on flush in testing At the end of each test the queues are flushed. At present there is no limit on the length of time a flush can take which can lead to long flushes. However, if the CI task is cancelled we lose the log information as to where the long flush was taking place. This PR simply adds a default time limit of 2 minutes - at which point an error will be produced. This should allow us to more easily find the culprit. Signed-off-by: Andrew Thornton * return better error Signed-off-by: Andrew Thornton Co-authored-by: 6543 <6543@obermui.de> --- integrations/testlogger.go | 2 +- modules/queue/manager.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/integrations/testlogger.go b/integrations/testlogger.go index 9498ad655..ff408b314 100644 --- a/integrations/testlogger.go +++ b/integrations/testlogger.go @@ -121,7 +121,7 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() { fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), slowFlush) } }) - if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil { + if err := queue.GetManager().FlushAll(context.Background(), 2*time.Minute); err != nil { t.Errorf("Flushing queues failed with error %v", err) } timer.Stop() diff --git a/modules/queue/manager.go b/modules/queue/manager.go index a88933191..23e96155a 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -9,6 +9,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync" "time" @@ -169,7 +170,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error for { select { case <-ctx.Done(): - return ctx.Err() + mqs := m.ManagedQueues() + nonEmptyQueues := []string{} + for _, mq := range mqs { + if !mq.IsEmpty() { + nonEmptyQueues = append(nonEmptyQueues, mq.Name) + } + } + if len(nonEmptyQueues) > 0 { + return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", ")) + } + return nil default: } mqs := m.ManagedQueues()