Ankit Ranjan
Back to Deep Dives

Streams in Dart — Multiple Values Over Time

Streams deliver sequences of asynchronous events. From user input to network data to real-time updates, streams power reactive Dart applications.

May 24, 2026 7 topics 7 quiz questions
Share:
1

What is a Stream?

A Future delivers one value. But what if you need multiple values over time? That's what Streams are for.

Think about user input. A button can be tapped many times. A text field changes with every keystroke. A GPS sensor sends location updates continuously. These aren't single values — they're sequences of events arriving over time.

Future vs StreamFuture<T> — one valueawait fetchUser()1One request, one response, done.Stream<T> — many valueslisten to button taps123...Values keep arriving. Stream stays open.Completes onceEmits many times (may never complete)

// A Future returns one value
Future<User> fetchUser(int id) async { ... }

// A Stream returns multiple values over time
Stream<int> countDown(int from) async* {
  for (var i = from; i >= 0; i--) {
    await Future.delayed(Duration(seconds: 1));
    yield i;  // emit a value
  }
}
A Stream is like an asynchronous iterable. Just as you can loop over a List to get values, you can listen to a Stream to receive values as they arrive.

2

Single-subscription vs broadcast streams

Not all streams behave the same. Dart has two kinds, and mixing them up causes bugs.

Single-subscription streams can only be listened to once. They're designed for sequences where you don't want to miss any data — like reading a file or receiving an HTTP response body.

// Single-subscription: only one listener allowed
var stream = File('data.txt').openRead();

stream.listen((chunk) => print('Got \${chunk.length} bytes'));

// This throws StateError!
stream.listen((chunk) => print('Another listener'));
Broadcast streams allow multiple listeners. They're for events that happen regardless of who's listening — like button clicks or timer ticks.

// Broadcast: multiple listeners allowed
var controller = StreamController<int>.broadcast();

controller.stream.listen((n) => print('Listener A: \$n'));
controller.stream.listen((n) => print('Listener B: \$n'));

controller.add(1);  // Both listeners receive 1
controller.add(2);  // Both listeners receive 2
Single-subscription vs BroadcastSingle-subscriptionSourceListener (only one)Second listenerStateError: Stream already listened toFile streams, HTTP body, etc.Data is buffered until listener attaches.BroadcastSourceListener AListener BButton clicks, sensor data, etc.Events fire whether anyone listens or not.

Key difference: Single-subscription streams buffer data until someone listens. Broadcast streams fire events immediately — if no one is listening, the data is lost.

Converting between them. You can convert a single-subscription stream to broadcast with asBroadcastStream().

var singleStream = countDown(5);
var broadcast = singleStream.asBroadcastStream();

broadcast.listen((n) => print('A: \$n'));
broadcast.listen((n) => print('B: \$n'));  // Now this works

3

Listening to streams

There are two main ways to consume a stream: listen() for fine-grained control, and await for for simpler code.

The listen() method gives you full control. It returns a StreamSubscription that you can pause, resume, or cancel.

var subscription = stream.listen(
  (data) => print('Data: \$data'),     // onData
  onError: (err) => print('Error: \$err'),
  onDone: () => print('Stream closed'),
  cancelOnError: false,  // keep listening after errors
);

// Later...
subscription.pause();   // stop receiving events
subscription.resume();  // start again
subscription.cancel();  // unsubscribe completely
The await for loop is cleaner when you just want to process each value.

Future<void> processStream(Stream<int> stream) async {
  await for (var value in stream) {
    print('Got: \$value');
  }
  print('Stream finished');
}
Differences between listen() and await for:
listen() is non-blocking — code after it runs immediately
await for blocks until the stream closes
listen() gives you a subscription to control
await for has simpler syntax but less control

// listen() is non-blocking
stream.listen((v) => print(v));
print('This prints immediately');

// await for blocks
await for (var v in stream) {
  print(v);
}
print('This prints after stream closes');
Cancellation with await for. Breaking out of the loop cancels the subscription automatically.

await for (var value in stream) {
  if (value > 100) break;  // automatically cancels subscription
  print(value);
}

4

Stream transformations

Just like Lists have map() and where(), Streams have transformation methods. Each one returns a new Stream.

// Transform values
stream.map((x) => x * 2)

// Filter values
stream.where((x) => x > 10)

// Take only first N
stream.take(5)

// Skip first N
stream.skip(3)

// Remove consecutive duplicates
stream.distinct()
Chaining transformations. Build pipelines by chaining methods.

// From button tap events to processed data
buttonTaps
    .where((tap) => tap.button == 'submit')
    .map((tap) => tap.formData)
    .distinct()
    .take(10)
    .listen((data) => submitForm(data));
Stream Transformation PipelineSource[1,2,3,4,5].where(x > 2)[3,4,5].map(x * 10)[30,40,50].take(2)[30,40].listen()30, then 40, doneEach transformation returns a new Stream. The pipeline is lazy — values flow through on demand.

Aggregating to a single value. Some methods consume the entire stream.

// Get first element
var first = await stream.first;

// Get last element (waits for stream to close)
var last = await stream.last;

// Collect all into a List
var all = await stream.toList();

// Reduce to single value
var sum = await stream.reduce((a, b) => a + b);

// Check conditions
var hasNegative = await stream.any((x) => x < 0);
var allPositive = await stream.every((x) => x > 0);
Warning: Methods like toList() and last wait for the stream to close. On an infinite stream, they never return.

5

StreamController — creating your own streams

StreamController is the bridge between imperative code and streams. You push values in with add(), and listeners receive them through the stream.

var controller = StreamController<int>();

// The stream side — what listeners see
controller.stream.listen((value) {
  print('Received: \$value');
});

// The sink side — where you push values
controller.add(1);
controller.add(2);
controller.add(3);
controller.close();  // signal no more values
Handling errors. Use addError() to send errors through the stream.

controller.add(1);
controller.addError(Exception('Something went wrong'));
controller.add(2);  // listeners still receive this
Broadcast controllers. By default, StreamController creates single-subscription streams. For multiple listeners, use the broadcast constructor.

var broadcast = StreamController<int>.broadcast();

broadcast.stream.listen((v) => print('A: \$v'));
broadcast.stream.listen((v) => print('B: \$v'));

broadcast.add(42);  // Both receive 42
Cleanup is essential. Always close controllers when done to prevent memory leaks.

class CounterService {
  final _controller = StreamController<int>.broadcast();

  Stream<int> get countStream => _controller.stream;

  void increment() => _controller.add(_count++);

  void dispose() {
    _controller.close();  // Important!
  }
}
Back-pressure with onListen/onPause/onResume. For more control, use the callback parameters.

var controller = StreamController<int>(
  onListen: () => print('Someone subscribed'),
  onPause: () => print('Listener paused'),
  onResume: () => print('Listener resumed'),
  onCancel: () => print('Listener cancelled'),
);

6

async* generators and yield

We saw sync* generators in the functions episode — they produce synchronous iterables. The async variant, async*, produces Streams.

// sync* returns Iterable, uses yield
Iterable<int> countSync(int n) sync* {
  for (var i = 0; i < n; i++) {
    yield i;
  }
}

// async* returns Stream, uses yield
Stream<int> countAsync(int n) async* {
  for (var i = 0; i < n; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}
yield* for delegation. Use yield* to forward all values from another stream.

Stream<int> countAndDouble(int n) async* {
  yield* countAsync(n);      // yield all values from countAsync
  yield* countAsync(n).map((x) => x * 2);  // then doubled values
}
async* Generator Flowasync* functionStream<int> count() async* {await delay(1s);yield 1;yield 2;yield 3;returnsStream<int>emits: 1, 2, 3(1 second apart)listenawait for (var n in count())receives each value asit's yielded

When to use async* vs StreamController:
async* is simpler for producing values in a loop
StreamController gives more control — pause, resume, multiple listeners
async* creates single-subscription streams by default

// Real-world example: polling an API
Stream<StockPrice> watchStock(String symbol) async* {
  while (true) {
    var price = await fetchPrice(symbol);
    yield price;
    await Future.delayed(Duration(seconds: 5));
  }
}

// Usage
await for (var price in watchStock('AAPL')) {
  updateDisplay(price);
  if (marketClosed) break;  // cancels the stream
}

7

Practical patterns — debounce, throttle, and combining

Real applications need more than basic transformations. Let's look at common patterns.

Debounce — wait for pause in events. Useful for search-as-you-type where you don't want to fire a request on every keystroke.

// Using rxdart package
textFieldStream
    .debounceTime(Duration(milliseconds: 300))
    .listen((query) => search(query));

// Manual implementation
Stream<T> debounce<T>(Stream<T> source, Duration duration) async* {
  Timer? timer;
  T? lastValue;
  var hasValue = false;

  await for (var value in source) {
    timer?.cancel();
    lastValue = value;
    hasValue = true;
    timer = Timer(duration, () {});
    await timer.tick;  // simplified
    if (hasValue) {
      yield lastValue as T;
      hasValue = false;
    }
  }
}
Throttle — limit event frequency. Emit at most one value per time window.

// Limit scroll events to 60fps
scrollEvents
    .throttleTime(Duration(milliseconds: 16))
    .listen((pos) => updateParallax(pos));
Combining streams. Merge multiple sources into one.

// Merge: interleave events from multiple streams
var combined = StreamGroup.merge([stream1, stream2, stream3]);

// Zip: pair up values (waits for both)
var zipped = Rx.zip2(namesStream, agesStream,
    (name, age) => '\$name is \$age');

// CombineLatest: emit when any stream emits
var latest = Rx.combineLatest2(tempStream, humidityStream,
    (temp, humidity) => 'Temp: \$temp, Humidity: \$humidity');
Error recovery. Handle errors without stopping the stream.

stream
    .handleError((error) => print('Caught: \$error'))
    .listen((data) => process(data));

// Or retry on failure
stream
    .onErrorRetry(maxRetries: 3)
    .listen((data) => process(data));
Disposing properly in Flutter. Always cancel subscriptions when your widget is disposed.

class _MyWidgetState extends State<MyWidget> {
  late StreamSubscription _subscription;

  @override
  void initState() {
    super.initState();
    _subscription = dataStream.listen(_onData);
  }

  @override
  void dispose() {
    _subscription.cancel();  // Prevent memory leaks!
    super.dispose();
  }

  void _onData(data) => setState(() => _data = data);
}

Test your understanding

7 questions

Seven questions covering Streams, StreamControllers, and async generators.

Search

Loading search...