Asynchronous Operations with Async/Await

Asynchronous Operations with Async/Await

In Swift, there are several options to execute multiple asynchronous tasks concurrently without blocking the calling thread. DispatchGroup, DispatchSemaphore, and async let are great for independent tasks that can fire and forget. However, if you have asynchronous tasks with complex inter-dependencies, you can leverage the power of OperationQueue with async operations. This allows you to cancel, suspend, resume operations, and monitor the progress.

However, Swift’s Operation class does not directly support async tasks. In this post, we will implement a new reusable type that includes support to Swift’s new concurrency model. However, if you’re new to operations, I recommend reviewing the Swift documentation regarding Operationhere to gain some understanding before diving into the content below.

Creating Asynchronous Operation

To create an asynchronous operation, start by subclassing Operation and overriding the isAsynchronous property:

class AsyncOperation: Operation {

    override var isAsynchronous: Bool { true }

}

However, this alone won’t make AsyncOperation fully asynchronous as the task might be marked as finished as soon as the operation is added to the operation queue. Look at the example below:

class AsyncOperation: Operation {

    override var isAsynchronous: Bool { return true }

    override func main() {
        DispatchQueue.global().asyncAfter(deadline: .now() + DispatchTimeInterval.seconds(3)) {
            print("Executing")
        }
    }
}

let queue = OperationQueue()
let operation = AsyncOperation()
queue.addOperations([operation], waitUntilFinished: true)
print("Finish Execution")

// Prints ->
// Finish Execution
// Executing

Here, the OperationQueue marks the task as completed while the operation is still in progress. This is because isFinished and isExecuting statuses are still managed by the superclass, which will consider the task as finished as soon as the main function is completed. To fix this, we need to do two things.

  1. Override the start method and avoid calling super.start().

  2. Manage isFinished and isExecuting within the subclass in a thread-safe manner.

Managing the State

To manage the state locally we need to override both isExecuting and isFinished properties. We will use a concurrent DispatchQueue to read and update those properties in a thread-safe way. Also, we use KVO support to make sure OperationQueue gets updated correctly when we change the status locally.

This is how it will look like once we override the isExecuting property.


    private let lockQueue = DispatchQueue(label: "com.asyncOperation", attributes: .concurrent)

    private var _isExecuting = false /// Local property to manage status
    override var isExecuting: Bool {
        get {
            lockQueue.sync { _isExecuting }
        }
        set {
            willChangeValue(forKey: "isExecuting")
            lockQueue.sync(flags: .barrier) {
                _isExecuting = newValue
            }
            didChangeValue(forKey: "isExecuting")
        }
    }

Similarly, we need to override the isFinished property. Finally, we include finish() method that allows us to update the status to finished once the task is completed.

In the end, our subclass will look like this.


class AsyncOperation: Operation {
    override var isAsynchronous: Bool { true }

    private let lockQueue = DispatchQueue(label: "com.asyncOperation", attributes: .concurrent)

    private(set) var _isExecuting = false
    override var isExecuting: Bool {
        get {
            lockQueue.sync { _isExecuting }
        }
        set {
            willChangeValue(forKey: "isExecuting")
            lockQueue.sync(flags: .barrier) {
               _isExecuting = newValue
            }
            didChangeValue(forKey: "isExecuting")
        }
    }

    private(set) var _isFinished = false
    override var isFinished: Bool {
        get {
            lockQueue.sync { _isFinished }
        } set {
            willChangeValue(forKey: "isFinished")
            lockQueue.sync(flags: .barrier) {
                _isFinished = newValue
            }
            didChangeValue(forKey: "isFinished")
        }
    }

    override func main() {
        /// Implementation
    }

    override func start() {
        guard !isCancelled else {
            finish()
            return
        }
        isFinished = false
        isExecuting = true
        main()
    }

    func finish() {
        isFinished = true
        isExecuting = false
    }
}

Extend to support Async/Await

Still AsyncOperation type is incomplete as we haven't implemented the main method. Let's go ahead and do that!

Basically, we need to extend AsyncOperation class to accept an asynchronous function as an input and output the result once the task is completed.

There can be two types of asynchronous functions, functions that will throw an error and functions that don't throw an error. But note that non-throwing function is a subtype of the throwing function type with the same method signature. This allows us to define one input type to support both throwing and non-throwing tasks.

The type of the output will be inferred from the signature of the input function type. We use Result type to return success and error scenarios in a generic way. With all of these, the implementation will look as follows.


class AsyncOperation<Success>: Operation {
    private let task: (() async throws -> Success)
    private var result: Result<Success,Error>?

    init(task: @escaping () async throws -> Success) {
        self.task = task
    }

    func getResult() -> Result<Success,Swift.Error>? {
        result
    }
    ...

Now it is time to implement the main method and execute the task that has been assigned. At the end of the task execution, we will call finish().

override func main() {
    guard !isCancelled else { return }
    Task { [weak self] in
        guard let self else { return }
        do {
            let result = try await task()
            self.result = .success(result)
        } catch {
            self.result = .failure(error)
        }
        finish()
    }
}

Putting all of this together, we end up with a subclass that looks like this.

class AsyncOperation<Success>: Operation {

    let task: (() async throws -> Success)
    private var result: Result<Success, Error>?
    override var isAsynchronous: Bool { true }
    private let lockQueue = DispatchQueue(label: "com.asyncOperation", attributes: .concurrent)

    init(task: @escaping () async throws -> Success) {
        self.task = task
    }

    private(set) var _isExecuting = false
    override var isExecuting: Bool {
        get {
            lockQueue.sync {
                _isExecuting
            }
        }
        set {
            willChangeValue(forKey: "isExecuting")
            lockQueue.sync(flags: .barrier) {
               _isExecuting = newValue
            }
            didChangeValue(forKey: "isExecuting")
        }
    }

    private(set) var _isFinished = false
    override var isFinished: Bool {
        get {
            lockQueue.sync {
                _isFinished
            }
        } set {
            willChangeValue(forKey: "isFinished")
            lockQueue.sync(flags: .barrier) {
                _isFinished = newValue
            }
            didChangeValue(forKey: "isFinished")
        }
    }

    override func main() {
        guard !isCancelled else { return }
        Task { [weak self] in
            guard let self else { return }
            do {
                let result = try await task()
                self.result = .success(result)
            } catch {
                self.result = .failure(error)
            }
            finish()
        }
    }

    override func start() {
        guard !isCancelled else {
            finish()
            return
        }
        isFinished = false
        isExecuting = true
        main()
    }

    func finish() {
        isFinished = true
        isExecuting = false
    }

    func getResult() -> Result<Success,Swift.Error>? {
        result
    }
}

Now let's verify whether our implementation is working correctly.

let operation = AsyncOperation {
    try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
    print("Executing")
    return "Hello World!"
}

let queue = OperationQueue()
queue.addOperations([operation], waitUntilFinished: true)
print("Finished")

switch operation.getResult() {
case .success(let value):
    print(value)
default:
    break
}

// Prints ->
// Executing
// Finished
// Hello World!

This is superb and exactly what we are looking for!!!

Conclusion

That's it!

We've now created a reusable asynchronous operation that integrates seamlessly with Swift’s concurrency model. This powerful tool allows you to manage complex asynchronous tasks with ease, ensuring that your operations are executed efficiently and correctly. In the next post, we will delve into monitoring the progress of the OperationQueue handling these asynchronous tasks. Stay tuned for more insights and advanced techniques.

Cheers!!!