Skip to content

Deprecate NIOAtomics in favor of Atomics #2204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 1, 2022
11 changes: 8 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@

import PackageDescription

let swiftAtomics: PackageDescription.Target.Dependency = .product(name: "Atomics", package: "swift-atomics")

var targets: [PackageDescription.Target] = [
.target(name: "NIOCore",
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows"]),
.target(name: "_NIODataStructures"),
.target(name: "NIOEmbedded",
dependencies: ["NIOCore",
"NIOConcurrencyHelpers",
"_NIODataStructures"]),
"_NIODataStructures",
swiftAtomics]),
.target(name: "NIOPosix",
dependencies: ["CNIOLinux",
"CNIODarwin",
"CNIOWindows",
"NIOConcurrencyHelpers",
"NIOCore",
"_NIODataStructures"]),
"_NIODataStructures",
swiftAtomics]),
.target(name: "NIO",
dependencies: ["NIOCore",
"NIOEmbedded",
Expand Down Expand Up @@ -87,7 +91,7 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIOPosix", "NIOCore"],
exclude: ["README.md"]),
.target(name: "NIOTestUtils",
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1"]),
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", swiftAtomics]),
.executableTarget(name: "NIOCrashTester",
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]),
.executableTarget(name: "NIOAsyncAwaitDemo",
Expand Down Expand Up @@ -135,6 +139,7 @@ let package = Package(
.library(name: "NIOTestUtils", targets: ["NIOTestUtils"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
],
targets: targets
)
2 changes: 2 additions & 0 deletions Sources/NIOConcurrencyHelpers/NIOAtomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ extension UInt: NIOAtomicPrimitive {
/// By necessity, all atomic values are references: after all, it makes no
/// sense to talk about managing an atomic value when each time it's modified
/// the thread that modified it gets a local copy!
@available(*, deprecated, message:"please use ManagedAtomic from https://github.com/apple/swift-atomics instead")
public final class NIOAtomic<T: NIOAtomicPrimitive> {
@usableFromInline
typealias Manager = ManagedBufferPointer<Void, T.AtomicWrapper>
Expand Down Expand Up @@ -313,6 +314,7 @@ public final class NIOAtomic<T: NIOAtomicPrimitive> {
}

#if compiler(>=5.5) && canImport(_Concurrency)
@available(*, deprecated)
extension NIOAtomic: Sendable {

}
Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOConcurrencyHelpers/atomics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fileprivate func sys_sched_yield() {
/// Atomic primitives are useful when building constructs that need to
/// communicate or cooperate across multiple threads. In the case of
/// SwiftNIO this usually involves communicating across multiple event loops.
@available(*, deprecated, message: "please use UnsafeAtomic from https://github.com/apple/swift-atomics instead")
public struct UnsafeEmbeddedAtomic<T: AtomicPrimitive> {
@usableFromInline
internal let value: OpaquePointer
Expand Down Expand Up @@ -173,7 +174,7 @@ public struct UnsafeEmbeddedAtomic<T: AtomicPrimitive> {
/// By necessity, all atomic values are references: after all, it makes no
/// sense to talk about managing an atomic value when each time it's modified
/// the thread that modified it gets a local copy!
@available(*, deprecated, message:"please use NIOAtomic instead")
@available(*, deprecated, message:"please use ManagedAtomic from https://github.com/apple/swift-atomics instead")
public final class Atomic<T: AtomicPrimitive> {
@usableFromInline
internal let embedded: UnsafeEmbeddedAtomic<T>
Expand Down
15 changes: 8 additions & 7 deletions Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import Atomics
import Dispatch
import _NIODataStructures
import NIOCore
Expand Down Expand Up @@ -62,9 +63,9 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {

/// The current "time" for this event loop. This is an amount in nanoseconds.
/// As we need to access this from any thread, we store this as an atomic.
private let _now = NIOAtomic<UInt64>.makeAtomic(value: 0)
private let _now = ManagedAtomic<UInt64>(0)
internal var now: NIODeadline {
return NIODeadline.uptimeNanoseconds(self._now.load())
return NIODeadline.uptimeNanoseconds(self._now.load(ordering: .relaxed))
}

/// This is used to derive an identifier for this loop.
Expand All @@ -80,7 +81,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
// arbitrary threads. This is required by the EventLoop protocol and cannot be avoided.
// Specifically, Scheduled<T> creation requires us to be able to define the cancellation
// operation, so the task ID has to be created early.
private let scheduledTaskCounter = NIOAtomic<UInt64>.makeAtomic(value: 0)
private let scheduledTaskCounter = ManagedAtomic<UInt64>(0)
private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()

/// Keep track of where promises are allocated to ensure we can identify their source if they leak.
Expand Down Expand Up @@ -143,7 +144,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
@discardableResult
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise()
let taskID = self.scheduledTaskCounter.add(1)
let taskID = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed)

let scheduled = Scheduled(promise: promise, cancellationTask: {
if self.inEventLoop {
Expand Down Expand Up @@ -270,7 +271,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {

// Set the time correctly before we call into user code, then
// call in for all tasks.
self._now.store(nextTask.readyTime.uptimeNanoseconds)
self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed)

for task in tasks {
task.task()
Expand All @@ -280,7 +281,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
}

// Finally ensure we got the time right.
self._now.store(newTime.uptimeNanoseconds)
self._now.store(newTime.uptimeNanoseconds, ordering: .relaxed)

continuation.resume()
}
Expand Down Expand Up @@ -311,7 +312,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
internal func drainScheduledTasksByRunningAllCurrentlyScheduledTasks() {
var currentlyScheduledTasks = self.scheduledTasks
while let nextTask = currentlyScheduledTasks.pop() {
self._now.store(nextTask.readyTime.uptimeNanoseconds)
self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed)
nextTask.task()
}
// Just fail all the remaining scheduled tasks. Despite having run all the tasks that were
Expand Down
13 changes: 7 additions & 6 deletions Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct SocketChannelLifecycleManager {
// MARK: Types
Expand All @@ -35,7 +36,7 @@ private struct SocketChannelLifecycleManager {
// MARK: properties
private let eventLoop: EventLoop
// this is queried from the Channel, ie. must be thread-safe
internal let isActiveAtomic: NIOAtomic<Bool>
internal let isActiveAtomic: ManagedAtomic<Bool>
// these are only to be accessed on the EventLoop

// have we seen the `.readEOF` notification
Expand All @@ -50,9 +51,9 @@ private struct SocketChannelLifecycleManager {
self.eventLoop.assertInEventLoop()
switch (oldValue, self.currentState) {
case (_, .activated):
self.isActiveAtomic.store(true)
self.isActiveAtomic.store(true, ordering: .relaxed)
case (.activated, _):
self.isActiveAtomic.store(false)
self.isActiveAtomic.store(false, ordering: .relaxed)
default:
()
}
Expand All @@ -63,7 +64,7 @@ private struct SocketChannelLifecycleManager {
// isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
internal init(
eventLoop: EventLoop,
isActiveAtomic: NIOAtomic<Bool>,
isActiveAtomic: ManagedAtomic<Bool>,
supportReconnect: Bool
) {
self.eventLoop = eventLoop
Expand Down Expand Up @@ -238,7 +239,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
private let closePromise: EventLoopPromise<Void>
internal let selectableEventLoop: SelectableEventLoop
private let _offEventLoopLock = Lock()
private let isActiveAtomic: NIOAtomic<Bool> = .makeAtomic(value: false)
private let isActiveAtomic: ManagedAtomic<Bool> = .init(false)
// just a thread-safe way of having something to print about the socket from any thread
internal let socketDescription: String

Expand Down Expand Up @@ -345,7 +346,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan

// This is `Channel` API so must be thread-safe.
public var isActive: Bool {
return self.isActiveAtomic.load()
return self.isActiveAtomic.load(ordering: .relaxed)
}

// This is `Channel` API so must be thread-safe.
Expand Down
9 changes: 5 additions & 4 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import NIOCore
import NIOConcurrencyHelpers
import Dispatch
import Atomics

struct NIORegistration: Registration {
enum ChannelType {
Expand All @@ -33,7 +34,7 @@ struct NIORegistration: Registration {
var registrationID: SelectorRegistrationID
}

private let nextEventLoopGroupID = NIOAtomic.makeAtomic(value: 0)
private let nextEventLoopGroupID = ManagedAtomic(0)

/// Called per `NIOThread` that is created for an EventLoop to do custom initialization of the `NIOThread` before the actual `EventLoop` is run on it.
typealias ThreadInitializer = (NIOThread) -> Void
Expand Down Expand Up @@ -62,7 +63,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private static let threadSpecificEventLoop = ThreadSpecificVariable<SelectableEventLoop>()

private let myGroupID: Int
private let index = NIOAtomic<Int>.makeAtomic(value: 0)
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: Lock = Lock()
private var runState: RunState = .running
Expand Down Expand Up @@ -148,7 +149,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
/// - threadInitializers: The `ThreadInitializer`s to use.
internal init(threadInitializers: [ThreadInitializer],
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
let myGroupID = nextEventLoopGroupID.add(1)
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`.
Expand Down Expand Up @@ -187,7 +188,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
///
/// - returns: The next `EventLoop` to use.
public func next() -> EventLoop {
return eventLoops[abs(index.add(1) % eventLoops.count)]
return eventLoops[abs(index.loadThenWrappingIncrement(ordering: .relaxed) % eventLoops.count)]
}

/// Returns the current `EventLoop` if we are on an `EventLoop` of this `MultiThreadedEventLoopGroup` instance.
Expand Down
9 changes: 5 additions & 4 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct PendingDatagramWrite {
var data: ByteBuffer
Expand Down Expand Up @@ -400,7 +400,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private var state = PendingDatagramWritesState()

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal let channelWritabilityFlag = ManagedAtomic<Bool>(true)
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true
Expand Down Expand Up @@ -452,7 +452,8 @@ final class PendingDatagramWritesManager: PendingWritesManager {
assert(self.isOpen)
self.state.append(pendingWrite)

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
if self.state.bytes > waterMark.high &&
channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged {
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
Expand Down Expand Up @@ -550,7 +551,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
let (promise, result) = self.state.didWrite(data, messages: messages)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
channelWritabilityFlag.store(true, ordering: .relaxed)
}

self.fulfillPromise(promise)
Expand Down
15 changes: 8 additions & 7 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct PendingStreamWrite {
var data: IOData
Expand Down Expand Up @@ -283,7 +283,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal let channelWritabilityFlag = ManagedAtomic(true)
internal var publishedWritability = true

internal var writeSpinCount: UInt = 16
Expand Down Expand Up @@ -315,7 +315,8 @@ final class PendingStreamWritesManager: PendingWritesManager {
assert(self.isOpen)
self.state.append(.init(data: data, promise: promise))

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
if self.state.bytes > waterMark.high &&
channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged {
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
Expand Down Expand Up @@ -364,7 +365,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
channelWritabilityFlag.store(true, ordering: .relaxed)
}

promise?.succeed(())
Expand Down Expand Up @@ -459,7 +460,7 @@ internal protocol PendingWritesManager: AnyObject {
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
var channelWritabilityFlag: NIOAtomic<Bool> { get }
var channelWritabilityFlag: ManagedAtomic<Bool> { get }

/// Represents the writability state the last time we published a writability change to the `Channel`.
/// This is used in `triggerWriteOperations` to determine whether we need to trigger a writability
Expand All @@ -470,7 +471,7 @@ internal protocol PendingWritesManager: AnyObject {
extension PendingWritesManager {
// This is called from `Channel` API so must be thread-safe.
var isWritable: Bool {
return self.channelWritabilityFlag.load()
return self.channelWritabilityFlag.load(ordering: .relaxed)
}

internal func triggerWriteOperations(triggerOneWriteOperation: (WriteMechanism) throws -> OneWriteOperationResult) throws -> OverallWriteResult {
Expand Down Expand Up @@ -514,6 +515,6 @@ extension PendingWritesManager {
extension PendingStreamWritesManager: CustomStringConvertible {
var description: String {
return "PendingStreamWritesManager { isFlushPending: \(self.isFlushPending), " +
/* */ "writabilityFlag: \(self.channelWritabilityFlag.load())), state: \(self.state) }"
/* */ "writabilityFlag: \(self.channelWritabilityFlag.load(ordering: .relaxed))), state: \(self.state) }"
}
}
7 changes: 4 additions & 3 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Dispatch
import NIOCore
import NIOConcurrencyHelpers
import _NIODataStructures
import Atomics

/// Execute the given closure and ensure we release all auto pools if needed.
@inlinable
Expand Down Expand Up @@ -73,7 +74,7 @@ internal final class SelectableEventLoop: EventLoop {
// This may only be read/written while holding the _tasksLock.
internal var _pendingTaskPop = false
@usableFromInline
internal var scheduledTaskCounter = NIOAtomic.makeAtomic(value: UInt64(0))
internal var scheduledTaskCounter = ManagedAtomic<UInt64>(0)
@usableFromInline
internal var _scheduledTasks = PriorityQueue<ScheduledTask>()

Expand Down Expand Up @@ -276,7 +277,7 @@ Further information:
@inlinable
internal func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise()
let task = ScheduledTask(id: self.scheduledTaskCounter.add(1), {
let task = ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), {
do {
promise.succeed(try task())
} catch let err {
Expand Down Expand Up @@ -317,7 +318,7 @@ Further information:
@inlinable
internal func execute(_ task: @escaping () -> Void) {
// nothing we can do if we fail enqueuing here.
try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.add(1), task, { error in
try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), task, { error in
// do nothing
}, .now()))
}
Expand Down
Loading