From 9e567a7b4fe821af4fad882552ec06713fb8d291 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Tue, 4 Mar 2025 13:11:08 +0000 Subject: [PATCH 1/2] Revert "Add: exponential backoff for CAS operations on floats (#1661)" This reverts commit a934c3595116c0a1d5b21318718d8a0ab9d7c009. --- prometheus/atomic_update.go | 50 --------- prometheus/atomic_update_test.go | 167 ------------------------------- prometheus/counter.go | 10 +- prometheus/gauge.go | 10 +- prometheus/histogram.go | 10 +- prometheus/summary.go | 25 +++-- 6 files changed, 36 insertions(+), 236 deletions(-) delete mode 100644 prometheus/atomic_update.go delete mode 100644 prometheus/atomic_update_test.go diff --git a/prometheus/atomic_update.go b/prometheus/atomic_update.go deleted file mode 100644 index b65896a31..000000000 --- a/prometheus/atomic_update.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "math" - "sync/atomic" - "time" -) - -// atomicUpdateFloat atomically updates the float64 value pointed to by bits -// using the provided updateFunc, with an exponential backoff on contention. -func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) { - const ( - // both numbers are derived from empirical observations - // documented in this PR: https://github.com/prometheus/client_golang/pull/1661 - maxBackoff = 320 * time.Millisecond - initialBackoff = 10 * time.Millisecond - ) - backoff := initialBackoff - - for { - loadedBits := atomic.LoadUint64(bits) - oldFloat := math.Float64frombits(loadedBits) - newFloat := updateFunc(oldFloat) - newBits := math.Float64bits(newFloat) - - if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { - break - } else { - // Exponential backoff with sleep and cap to avoid infinite wait - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } - } -} diff --git a/prometheus/atomic_update_test.go b/prometheus/atomic_update_test.go deleted file mode 100644 index 0233bc71f..000000000 --- a/prometheus/atomic_update_test.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "math" - "sync" - "sync/atomic" - "testing" - "unsafe" -) - -var output float64 - -func TestAtomicUpdateFloat(t *testing.T) { - var val float64 = 0.0 - bits := (*uint64)(unsafe.Pointer(&val)) - var wg sync.WaitGroup - numGoroutines := 100000 - increment := 1.0 - - for i := 0; i < numGoroutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - atomicUpdateFloat(bits, func(f float64) float64 { - return f + increment - }) - }() - } - - wg.Wait() - expected := float64(numGoroutines) * increment - if val != expected { - t.Errorf("Expected %f, got %f", expected, val) - } -} - -// Benchmark for atomicUpdateFloat with single goroutine (no contention). -func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) { - var val float64 = 0.0 - bits := (*uint64)(unsafe.Pointer(&val)) - - for i := 0; i < b.N; i++ { - atomicUpdateFloat(bits, func(f float64) float64 { - return f + 1.0 - }) - } - - output = val -} - -// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff -func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) { - var val float64 = 0.0 - bits := (*uint64)(unsafe.Pointer(&val)) - - for i := 0; i < b.N; i++ { - for { - loadedBits := atomic.LoadUint64(bits) - newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) - if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { - break - } - } - } - - output = val -} - -// Benchmark varying the number of goroutines. -func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) { - var val float64 = 0.0 - bits := (*uint64)(unsafe.Pointer(&val)) - b.SetParallelism(numGoroutines) - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - atomicUpdateFloat(bits, func(f float64) float64 { - return f + 1.0 - }) - } - }) - - output = val -} - -func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) { - var val float64 = 0.0 - bits := (*uint64)(unsafe.Pointer(&val)) - b.SetParallelism(numGoroutines) - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - for { - loadedBits := atomic.LoadUint64(bits) - newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0) - if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { - break - } - } - } - }) - - output = val -} - -func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 1) -} - -func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 1) -} - -func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 2) -} - -func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 2) -} - -func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 4) -} - -func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 4) -} - -func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 8) -} - -func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 8) -} - -func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 16) -} - -func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 16) -} - -func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) { - benchmarkAtomicUpdateFloatConcurrency(b, 32) -} - -func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) { - benchmarkAtomicNoBackoffFloatConcurrency(b, 32) -} diff --git a/prometheus/counter.go b/prometheus/counter.go index 2996aef6a..4ce84e7a8 100644 --- a/prometheus/counter.go +++ b/prometheus/counter.go @@ -134,9 +134,13 @@ func (c *counter) Add(v float64) { return } - atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 { - return oldVal + v - }) + for { + oldBits := atomic.LoadUint64(&c.valBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + v) + if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) { + return + } + } } func (c *counter) AddWithExemplar(v float64, e Labels) { diff --git a/prometheus/gauge.go b/prometheus/gauge.go index aa1846365..dd2eac940 100644 --- a/prometheus/gauge.go +++ b/prometheus/gauge.go @@ -120,9 +120,13 @@ func (g *gauge) Dec() { } func (g *gauge) Add(val float64) { - atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 { - return oldVal + val - }) + for { + oldBits := atomic.LoadUint64(&g.valBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + val) + if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) { + return + } + } } func (g *gauge) Sub(val float64) { diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 1a279035b..c453b754a 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -1647,9 +1647,13 @@ func waitForCooldown(count uint64, counts *histogramCounts) { // atomicAddFloat adds the provided float atomically to another float // represented by the bit pattern the bits pointer is pointing to. func atomicAddFloat(bits *uint64, v float64) { - atomicUpdateFloat(bits, func(oldVal float64) float64 { - return oldVal + v - }) + for { + loadedBits := atomic.LoadUint64(bits) + newBits := math.Float64bits(math.Float64frombits(loadedBits) + v) + if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) { + break + } + } } // atomicDecUint32 atomically decrements the uint32 p points to. See diff --git a/prometheus/summary.go b/prometheus/summary.go index 76a9e12f4..ac5203c6f 100644 --- a/prometheus/summary.go +++ b/prometheus/summary.go @@ -471,9 +471,13 @@ func (s *noObjectivesSummary) Observe(v float64) { n := atomic.AddUint64(&s.countAndHotIdx, 1) hotCounts := s.counts[n>>63] - atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { - return oldVal + v - }) + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + v) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + break + } + } // Increment count last as we take it as a signal that the observation // is complete. atomic.AddUint64(&hotCounts.count, 1) @@ -515,13 +519,14 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error { // Finally add all the cold counts to the new hot counts and reset the cold counts. atomic.AddUint64(&hotCounts.count, count) atomic.StoreUint64(&coldCounts.count, 0) - - // Use atomicUpdateFloat to update hotCounts.sumBits atomically. - atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 { - return oldVal + sum.GetSampleSum() - }) - atomic.StoreUint64(&coldCounts.sumBits, 0) - + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum()) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } return nil } From 689f5900164029a73c9a2be6068e1db615107c1f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Tue, 4 Mar 2025 13:14:20 +0000 Subject: [PATCH 2/2] Cut 1.21.1 Signed-off-by: bwplotka --- CHANGELOG.md | 4 ++++ VERSION | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fd402d1e..4e2519dd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Unreleased +## 1.21.1 / 2025-03-04 + +* [BUGFIX] prometheus: Revert of `Inc`, `Add` and `Observe` cumulative metric CAS optimizations (#1661), causing regressions on low contention cases. + ## 1.21.0 / 2025-02-17 :warning: This release contains potential breaking change if you upgrade `github.com/prometheus/common` to 0.62+ together with client_golang. :warning: diff --git a/VERSION b/VERSION index 3500250a4..284497740 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.21.0 +1.21.1