Concurrency using Async/Await

7 minute read

Overview

1func download(url: URL, completionHandler: @escaping (Data) -> ()) {
2    let task = URLSession.shared.dataTask(with: url) { data, _, error in
3        if let data = data {
4            completionHandler(data)
5        }
6    }
7    task.resume()
8}

⬇⬇⬇

1func download(url: URL) async throws -> Data {
2    let result = try await URLSession.shared.data(from: url)
3    return result.0 // the response data
4}
  • async code runs IN ORDER. No inversion as using completion handler
  • async code can return value to its caller.
  • async code can throw error

await

  • used to call async methods.
  • cause our code to pause and wait until the call to async method finishes. BUT, it does NOT block.
  • use await on background thread, but also legal to put it on main thread.

but, only in an async context, how about calling an async method from a normal method? e.g. call download(from:) in viewDidLoad() of UIViewController

👇

Tasks

  • This is a unit of asynchronous work
  • Call Task.init(operation:), it inherits the characteristics of its surroundings
 1// Ⓜ️  RUN ON MAIN THREAD
 2override func viewDidLoad() {
 3    super.viewDidLoad()
 4    
 5    Task {
 6    // Ⓜ️ STILL RUN ON MAIN THREAD
 7        do {
 8            let imgUrl = "https://cdn.pixabay.com/photo/2015/04/23/22/00/tree-736885__480.jpg"
 9            let data = try await download(url: URL(string: imgUrl)!)
10            print(data)
11        } catch(let error) {
12            print(error.localizedDescription)
13        }
14    }
15    print(imgUrl)
16}
17
18func download(url: URL) async throws -> Data {
19    let result = try await URLSession.shared.data(from: url)
20    return result.0 // the response data
21}
22	
23///  CONSOLE
24https://cdn.pixabay.com/photo/2015/04/23/22/00/tree-736885__480.jpg
2544304 bytes
  • Call Task.detached(operation:), it cuts off relationship to the surrounding context, running on its own background thread
 1// Ⓜ️ RUN ON MAIN THREAD
 2override func viewDidLoad() {
 3    super.viewDidLoad()
 4    
 5    Task.detached {
 6        // 🅱️ NOW, RUN ON BACKGROUND THREAD
 7        ...
 8    }
 9    print(imgUrl)
10}

Wrapping a Completion Handler

⁉️Want to adopt structured concurrency but keep old-fashioned async code based on completion handler?

✅ Wrap it with

  • withUnsafeContinuation(_:)
  • withUnsafeThrowingContinuation(_:)
  • withCheckedContinuation(_:)
  • withCheckedThrowingContinuation(_:)
 1override func viewDidLoad() {
 2    super.viewDidLoad()
 3    
 4    let imgUrl = "https://cdn.pixabay.com/photo/2015/04/23/22/00/tree-736885__480.jpg"
 5    Task {
 6        do {
 7            let data = try await download(url: URL(string: imgUrl)!)
 8            print(data)
 9        } catch(let error) {
10            print(error.localizedDescription)
11        }
12    }
13    print(imgUrl)
14}
15
16func download(url: URL) async throws -> Data {
17    return try await withUnsafeThrowingContinuation { continuation in
18        download(url: url) { data in
19            continuation.resume(returning: data) // ⚠️ call resume ONLY ONCE
20        }
21    }
22    
23//        let result = try await URLSession.shared.data(from: url)
24//        return result.0 // the response data
25}
26
27// The old-fashion async functino with completion handler
28func download(url: URL, completionHandler: @escaping (Data) -> ()) {
29    let task = URLSession.shared.dataTask(with: url) { data, _, error in
30        if let data = data {
31            completionHandler(data)
32        }
33    }
34    task.resume()
35}

UnsafeContinuation is basically the same kind of object that forms the basis of the await keyword itself.

  • when we wait for an async function call using await, a continuation object is created and stored behind the scenes
  • when the async function finishes, a method of that continuation object is called, and our code resumes. So what we’re doing in this wrapper method is enacting manually the same sort of wait-and-resume architecture that is performed for us automatically when we say await.

Continuation? A mechanism to interface between synchronous code and an asynchronous stream

Do multiple concurrent tasks with async let

Don’t do this, it is serial, not concurrent:

1func fetchTwoURLs() async throws -> (Data, Data) {
2    let url1 = URL(string:"https://www.apeth.com/pep/manny.jpg")!
3    let url2 = URL(string:"https://www.apeth.com/pep/moe.jpg")!
4    let data1 = try await self.download(url: url1)
5    let data2 = try await self.download(url: url2) /// 😢 this one waits for the above ☝ to be done 
6    return (data1, data2)
7}

Do this instead, using async let:

1func fetchTwoURLs() async throws -> (Data, Data) {
2    let url1 = URL(string:"https://www.tinhpv.com/pep/manny.jpg")!
3    let url2 = URL(string:"https://www.tinh.com/pep/moe.jpg")!
4    async let data1 = self.download(url: url1) /// (1)
5    async let data2 = self.download(url: url2) /// (2)
6    return (try await data1, try await data2) /// try wait (data1, data2)
7}
  • when we know the number of tasks to do (if perform an indeterminate number of tasks, use task group)
  • Useful when we want to return the results of both download(url:) calls as a single result. Both calls need to finish before return statement.
  • ⚠️ if the async function throws an error, it does NOT interrupt code. For e.g., when (1) fails, throws error, (2) still goes ahead and still be awaited. The thrown error from the subtask doesn’t interrupt our code until later, when we use try.

another style for writing async let:

 1func fetchTwoURLs() async throws -> (Data, Data) {
 2    let url1 = URL(string:"https://www.tinhpv.com/pep/manny.jpg")!
 3    let url2 = URL(string:"https://www.tinh.com/pep/moe.jpg")!
 4//    async let data1 = self.download(url: url1) /// (1)
 5//    async let data2 = self.download(url: url2) /// (2)
 6
 7        
 8    async let data1 = {         
 9        return try await download(url: url1)
10    }()
11                     
12    async let data2 = {
13        return try await download(url: url2)
14    }()    
15    
16    return (try await data1, try await data2) /// try wait (data1, data2)
17}

Task Groups

use 1 of these 2 methods:

  • withTaskGroup(of:returning:body:)
  • withThrowingTaskGroup(of:returning:body:) e.g.:
 1func fetchData(from urls: [URL]) async throws -> [Data] {
 2    var result: [Data] = []
 3    
 4    try await withThrowingTaskGroup(of: Data.self) { group in
 5        for url in urls {
 6            group.addTask {
 7                return try await self.download(url: url)
 8            }
 9        }
10        
11        // loop through the group as an async sequence
12        // get the value from each subtasks and append to the list result.
13        for try await data in group {
14            result.append(data)
15        }
16    }
17    return result
18}

… But result is not in order as the url array that being passed in because it is concurrent. so rather than return an array, we return a dictionary.

 1func fetchData(from urls: [URL]) async throws -> [URL: Data] {
 2    var result: [URL: Data] = [:]
 3    
 4    try await withThrowingTaskGroup(of: [URL: Data].self) { group in
 5        for url in urls {
 6            group.addTask {
 7                // this return here have to match with `of` type
 8                return [url: try await self.download(url: url)]
 9            }
10        }
11        
12        for try await dictionaryData in group {
13            // merge dictionaryData to result dictionary
14            // if there is a duplicate key, keep the key of result
15            result.merge(dictionaryData) { current, _ in current }
16        }
17    }
18    
19    return result
20}

Full code 🐣

 1func fetchData(from urls: [URL]) async throws -> [URL: Data] {
 2    var result: [URL: Data] = [:]
 3    
 4    return try await withThrowingTaskGroup(of: [URL: Data].self, returning: [URL: Data].self) { group in
 5        for url in urls {
 6            group.addTask {
 7                // this return here have to match with `of` type
 8                return [url: try await self.download(url: url)]
 9            }
10        }
11        
12        for try await dictionaryData in group {
13            // merge dictionaryData to result dictionary
14            // if there is a duplicate key, keep the key of result
15            result.merge(dictionaryData) { current, _ in current }
16        }
17        
18        return result
19    }
20}
  • specify returning, denoting that this block will return with the type of returning ([URL: Data] dictionary as above)

Asynchronous Sequence

  • task group as above is an asynchronous sequence
  • an asynchronous sequence is a type that conforms to AsyncSequence protocol
  • values of an asynchronous sequence are generated ASYNCHRONOUSLY

An AsyncSequence may have all, some, or none of its values available when you first use it. Instead, you use await to receive values as they become available.

  • for await loop pauses after each iteration → resumes when the next value of the sequence is generated → performing another iteration → waiting again, until the sequence signals that it has terminated.

Create an Asynchronous Sequence

use the AsyncStream initializer (or AsyncThrowingStream)

  • as a stream of elements that could potentially result in a thrown error.
  • values deliver over time, and the stream can be closed by a finish event.
  • a finish event could either be a success ✅ or a failure ❌ once an error occurs.

An asynchronous sequence generated from a closure that calls a continuation

  • AsyncStream conforms to AsyncSequence, providing a convenient way to create an asynchronous sequence without manually implementing an asynchronous iterator.
  • In particular, an asynchronous stream is well-suited to adapt CALLBACK- or DELEGATION-based APIs to participate with async-await.
 1class TextFieldStreamer: NSObject, UITextFieldDelegate {
 2    var values: AsyncStream<UITextField>
 3    var continuation: AsyncStream<UITextField>.Continuation?
 4    
 5    override init() {
 6        var myContinuation: AsyncStream<UITextField>.Continuation?
 7        self.values = AsyncStream { continuation in
 8            myContinuation = continuation
 9        }
10        super.init()
11        self.continuation = myContinuation
12    }
13    
14    func textFieldDidChangeSelection(_ textField: UITextField) {
15        self.continuation?.yield(textField)
16    }
17}
18
19/// USAGE
20textField.delegate = self.textFieldStreamer
21Task {
22     for await textField in textFieldStreamer.values {
23	 print(textField.text ?? "")
24     }
25}

Another example from avanderlee.com, function download return an async sequence with value of type Status

 1extension FileDownloader {
 2    func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
 3        return AsyncThrowingStream { continuation in
 4            do {
 5                try self.download(url, progressHandler: { progress in
 6                    continuation.yield(.downloading(progress))
 7                }, completion: { result in
 8                    switch result {
 9                    case .success(let data):
10                        continuation.yield(.finished(data))
11                        continuation.finish()
12                    case .failure(let error):
13                        continuation.finish(throwing: error)
14                    }
15                })
16            } catch {
17                continuation.finish(throwing: error)
18            }
19        }
20    }
21}

☝ If using AsyncStream instead of AsyncThrowingStream, it accepts 1 parameter only and cannot finish with error as continuation.finish(throwing: error)

func download(_ url: URL) -> AsyncStream<Status> {}


🙏🏻📚 Learned from

  1. https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html
  2. https://www.avanderlee.com/swift/async-await/
  3. https://www.avanderlee.com/swift/async-let-asynchronous-functions-in-parallel/
  4. https://www.avanderlee.com/swift/asyncthrowingstream-asyncstream/