fix: make config read/write thread safe (#3432)

* fix: make config read/write thread safe and fix some other similar issues

1. The config config has a lock, and safe methods to update and read the attributes
2. The config has methods to retrieve copies of specific attributes, such as the extyensions config, the auth config, and the authz config.
These are needed, as the config object may mutate in the middle of an auth/authz requests, and we avoid partial configuration being applied for that request.
3. Fix an issue with the monitoring server not stopping when the controller is shut down.
4. Fix an issue with the HTPasswdWatcher not stopping when the background tasks are supposed to finish.
5. Fix some tests using hardcoded ports.

Moved some of the methods which were on the main config to the auth, access control and extension configs

Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
This commit is contained in:
Andrei Aaron
2025-10-18 11:20:58 +03:00
committed by GitHub
parent 2402296e9a
commit dfb5d1df54
41 changed files with 6029 additions and 661 deletions
+96
View File
@@ -1,14 +1,18 @@
package common
import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/phayes/freeport"
@@ -143,6 +147,23 @@ func GetFreePort() string {
return strconv.Itoa(port)
}
// GetFreePorts returns multiple unique free ports, useful for cluster tests.
func GetFreePorts(count int) []string {
// Use the freeport library's GetFreePorts function which guarantees uniqueness
intPorts, err := freeport.GetFreePorts(count)
if err != nil {
panic(err)
}
// Convert to strings
ports := make([]string, count)
for i, port := range intPorts {
ports[i] = strconv.Itoa(port)
}
return ports
}
func GetBaseURL(port string) string {
return fmt.Sprintf(BaseURL, port)
}
@@ -233,3 +254,78 @@ func ContainSameElements[T comparable](list1, list2 []T) bool {
return true
}
// ThreadSafeLogBuffer is a thread-safe wrapper around bytes.Buffer for concurrent log capture.
type ThreadSafeLogBuffer struct {
buffer *bytes.Buffer
mutex sync.RWMutex
}
// NewThreadSafeLogBuffer creates a new thread-safe log buffer.
func NewThreadSafeLogBuffer() *ThreadSafeLogBuffer {
return &ThreadSafeLogBuffer{
buffer: &bytes.Buffer{},
}
}
// Write implements io.Writer interface with thread safety.
func (tsb *ThreadSafeLogBuffer) Write(p []byte) (int, error) {
tsb.mutex.Lock()
defer tsb.mutex.Unlock()
return tsb.buffer.Write(p)
}
// String returns the buffer contents as a string with thread safety.
func (tsb *ThreadSafeLogBuffer) String() string {
tsb.mutex.RLock()
defer tsb.mutex.RUnlock()
return tsb.buffer.String()
}
// WaitForLogMessages waits for a specific number of log messages to appear in the log buffer
// within the given timeout. This is useful for verifying goroutine termination or other
// asynchronous operations that log specific messages.
//
// Parameters:
// - logBuffer: A ThreadSafeLogBuffer that captures log output
// - message: The log message to search for (e.g., "htpasswd watcher terminating...")
// - minCount: Minimum number of occurrences to wait for
// - timeout: Maximum time to wait for the messages
//
// Returns:
// - true if at least minCount messages were found within the timeout
// - false if the timeout was reached before finding enough messages
func WaitForLogMessages(logBuffer *ThreadSafeLogBuffer, message string, minCount int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
logOutput := logBuffer.String()
actualCount := strings.Count(logOutput, message)
if actualCount >= minCount {
return true
}
time.Sleep(10 * time.Millisecond)
}
return false
}
// CreateLogCapturingWriter creates a multi-writer that captures log output to a thread-safe buffer
// while also writing to the original writer (typically os.Stdout). This is useful for
// tests that need to programmatically verify log messages.
//
// Parameters:
// - originalWriter: The original writer to continue writing to (e.g., os.Stdout)
//
// Returns:
// - A ThreadSafeLogBuffer that captures the log output
// - An io.Writer that writes to both the original writer and the buffer
func CreateLogCapturingWriter(originalWriter io.Writer) (*ThreadSafeLogBuffer, io.Writer) {
logBuffer := NewThreadSafeLogBuffer()
multiWriter := io.MultiWriter(originalWriter, logBuffer)
return logBuffer, multiWriter
}
+140
View File
@@ -58,3 +58,143 @@ func TestControllerManager(t *testing.T) {
So(func() { ctlrManager.RunServer() }, ShouldPanic)
})
}
func TestWaitForLogMessages(t *testing.T) {
Convey("Test WaitForLogMessages", t, func() {
Convey("should return true when message count reaches minimum", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write some log messages
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Server started successfully\n"))
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Processing request\n"))
_, _ = logBuffer.Write([]byte("Starting server\n"))
// Wait for "Starting server" message to appear at least 3 times
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 3, 100*time.Millisecond)
So(result, ShouldBeTrue)
})
Convey("should return false when message count never reaches minimum", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write some log messages (only 1 occurrence of target message)
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Server started successfully\n"))
_, _ = logBuffer.Write([]byte("Processing request\n"))
// Wait for "Starting server" message to appear at least 3 times
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 3, 50*time.Millisecond)
So(result, ShouldBeFalse)
})
Convey("should return true immediately when count already meets requirement", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write messages before calling WaitForLogMessages
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Starting server\n"))
// Wait for "Starting server" message to appear at least 3 times
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 3, 100*time.Millisecond)
So(result, ShouldBeTrue)
})
Convey("should handle empty log buffer", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Wait for any message in empty buffer
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 1, 50*time.Millisecond)
So(result, ShouldBeFalse)
})
Convey("should handle partial message matches", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write messages with partial matches
_, _ = logBuffer.Write([]byte("Starting server process\n"))
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("Starting server instance\n"))
// Wait for exact "Starting server" message (not partial matches)
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 2, 100*time.Millisecond)
So(result, ShouldBeTrue)
})
Convey("should timeout after specified duration", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write only one message
_, _ = logBuffer.Write([]byte("Starting server\n"))
// Wait for 3 occurrences with short timeout
start := time.Now()
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 3, 10*time.Millisecond)
duration := time.Since(start)
So(result, ShouldBeFalse)
So(duration, ShouldBeGreaterThanOrEqualTo, 10*time.Millisecond)
So(duration, ShouldBeLessThan, 50*time.Millisecond) // Should timeout quickly
})
Convey("should handle concurrent writes", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Simulate concurrent writes
go func() {
for i := 0; i < 5; i++ {
_, _ = logBuffer.Write([]byte("Starting server\n"))
time.Sleep(5 * time.Millisecond)
}
}()
// Wait for messages to appear
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 3, 100*time.Millisecond)
So(result, ShouldBeTrue)
})
Convey("should handle case-sensitive message matching", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write messages with different cases
_, _ = logBuffer.Write([]byte("Starting server\n"))
_, _ = logBuffer.Write([]byte("starting server\n"))
_, _ = logBuffer.Write([]byte("STARTING SERVER\n"))
// Wait for exact case match
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 2, 100*time.Millisecond)
So(result, ShouldBeFalse) // Only 1 exact match
})
Convey("should handle zero minimum count", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Wait for 0 occurrences (should always return true)
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 0, 100*time.Millisecond)
So(result, ShouldBeTrue)
})
Convey("should handle very short timeout", func() {
logBuffer := tcommon.NewThreadSafeLogBuffer()
// Write a message
_, _ = logBuffer.Write([]byte("Starting server\n"))
// Wait with very short timeout
result := tcommon.WaitForLogMessages(logBuffer, "Starting server", 1, 1*time.Millisecond)
So(result, ShouldBeTrue) // Should find it immediately
})
})
}