Modern Concurrency: Beyond the Basics

Oct 20 2022 · Swift 5.5, iOS 15, Xcode 13.4

Part 1: AsyncStream & Continuations

02. AsyncStream

Episode complete

Play next episode

Next
About this episode

Leave a rating/review

See forum comments
Cinema mode Mark complete Download course materials
Previous episode: 01. Introduction Next episode: 03. Using AsyncStream to Count Down

Get immediate access to this and 4,000+ other videos and books.

Take your career further with a Kodeco Personal Plan. With unlimited access to over 40+ books and 4,000+ professional videos in a single subscription, it's simply the best investment you can make in your development career.

Learn more Already a subscriber? Sign in.

Notes: 02. AsyncStream

  • For more about push-based vs pull-based AsyncStreams, see AsyncSequence & AsyncStream Tutorial for iOS.
  • This video uses Xcode 14’s Task.sleep(until:clock:). If you use Xcode 13, replace this with Task.sleep(nanoseconds: 1_000_000_000).

Heads up... You've reached locked video content where the transcript will be shown as obfuscated text.

In this episode, you’ll learn about AsyncStream, an easier way to create a custom AsyncSequence.

Custom AsyncSequence from episode 4 of Getting Started

In the preceding course, you created this simple typewriter with a custom AsyncSequence that “types” a phrase, adding a character every second.

struct Typewriter: AsyncSequence {
  typealias Element = String
  let phrase: String

  func makeAsyncIterator() -> TypewriterIterator {
    return TypewriterIterator(phrase)
  }
}

struct TypewriterIterator: AsyncIteratorProtocol {
  typealias Element = String
  let phrase: String
  var index: String.Index

  init(_ phrase: String) {
    self.phrase = phrase
    self.index = phrase.startIndex
  }

  mutating func next() async throws -> String? {
    guard index < phrase.endIndex else { return nil }
    try await Task.sleep(until: .now + .seconds(1),
                         clock: .continuous)
    defer { index = phrase.index(after: index) }
    return String(phrase[phrase.startIndex...index])
  }
}
Task {
  for try await item in Typewriter(phrase: "Hello, world!") {
    print(item)
  }
  print("AsyncSequence Done")
}
He
Hel
Hell
Hello
Hello,
Hello, 
Hello, w
Hello, wo
Hello, wor
Hello, worl
Hello, world
Hello, world!
AsyncSequence Done

AsyncStream Typewriter

In the next section, the starter has already set up test phrase and index properties:

let phrase = "Hello, world!"
var index = phrase.startIndex
let stream_pull = AsyncStream<String> {

}
let stream_pull = AsyncStream<String> {
  🟩
  guard index < phrase.endIndex else { return nil }
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
  defer { index = phrase.index(after: index) }
  return String(phrase[phrase.startIndex...index])
  🟥
}
let stream_pull = 🟩AsyncThrowingStream<String, Error🟥> {
do {
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
} catch {
  return nil
}
Task {
  for try await item in stream_pull {
    print(item)
  }
  print("Pull AsyncStream Done")
}

AsyncStream: pull or push?

Option-click AsyncStream then Open in Developer Documentation. Scroll down to Topics.

// These two lines are already above the pull-based stream code
let phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in  // closure receives a continuation
  Task {  // wrap any asynchronous code in a Task
  }
}
while index < phrase.endIndex {  // while index hasn't reached the end of phrase

}
while index < phrase.endIndex {
  do {
    try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
  } catch {
    // delete return nil
  }
}
continuation.yield(String(phrase[phrase.startIndex...index])) // copy pull-based return value
index = phrase.index(after: index)  // copy index increment from pull-based defer
continuation.finish()
// These two lines are already above the pull-based stream code
var phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in
  Task {
    while index < phrase.endIndex {
      do {
        try await Task.sleep(until: .now + .seconds(1),
                             clock: .continuous)
        continuation.yield(String(phrase[phrase.startIndex...index]))
        index = phrase.index(after: index)
      } catch {
        continuation.finish()
      }
    }
    continuation.finish()
  }
}

🟩Task {
  for try await item in stream_push {  // change pull to push
    print(item)
  }
  print("Push AsyncStream Done")  // change pull to push
}🟥