Streams in Dart — Multiple Values Over Time
Streams deliver sequences of asynchronous events. From user input to network data to real-time updates, streams power reactive Dart applications.
What is a Stream?
A Future delivers one value. But what if you need multiple values over time? That's what Streams are for.
Think about user input. A button can be tapped many times. A text field changes with every keystroke. A GPS sensor sends location updates continuously. These aren't single values — they're sequences of events arriving over time.
// A Future returns one value
Future<User> fetchUser(int id) async { ... }
// A Stream returns multiple values over time
Stream<int> countDown(int from) async* {
for (var i = from; i >= 0; i--) {
await Future.delayed(Duration(seconds: 1));
yield i; // emit a value
}
}
A Stream is like an asynchronous iterable. Just as you can loop over a List to get values, you can listen to a Stream to receive values as they arrive.
Single-subscription vs broadcast streams
Not all streams behave the same. Dart has two kinds, and mixing them up causes bugs.
Single-subscription streams can only be listened to once. They're designed for sequences where you don't want to miss any data — like reading a file or receiving an HTTP response body.
// Single-subscription: only one listener allowed
var stream = File('data.txt').openRead();
stream.listen((chunk) => print('Got \${chunk.length} bytes'));
// This throws StateError!
stream.listen((chunk) => print('Another listener'));
Broadcast streams allow multiple listeners. They're for events that happen regardless of who's listening — like button clicks or timer ticks.
// Broadcast: multiple listeners allowed
var controller = StreamController<int>.broadcast();
controller.stream.listen((n) => print('Listener A: \$n'));
controller.stream.listen((n) => print('Listener B: \$n'));
controller.add(1); // Both listeners receive 1
controller.add(2); // Both listeners receive 2
Key difference: Single-subscription streams buffer data until someone listens. Broadcast streams fire events immediately — if no one is listening, the data is lost.
Converting between them. You can convert a single-subscription stream to broadcast with
asBroadcastStream().
var singleStream = countDown(5);
var broadcast = singleStream.asBroadcastStream();
broadcast.listen((n) => print('A: \$n'));
broadcast.listen((n) => print('B: \$n')); // Now this works
Listening to streams
There are two main ways to consume a stream: listen() for fine-grained control, and await for for simpler code.
The listen() method gives you full control. It returns a StreamSubscription that you can pause, resume, or cancel.
var subscription = stream.listen(
(data) => print('Data: \$data'), // onData
onError: (err) => print('Error: \$err'),
onDone: () => print('Stream closed'),
cancelOnError: false, // keep listening after errors
);
// Later...
subscription.pause(); // stop receiving events
subscription.resume(); // start again
subscription.cancel(); // unsubscribe completely
The await for loop is cleaner when you just want to process each value.
Future<void> processStream(Stream<int> stream) async {
await for (var value in stream) {
print('Got: \$value');
}
print('Stream finished');
}
Differences between listen() and await for:
•
listen() is non-blocking — code after it runs immediately
•
await for blocks until the stream closes
•
listen() gives you a subscription to control
•
await for has simpler syntax but less control
// listen() is non-blocking
stream.listen((v) => print(v));
print('This prints immediately');
// await for blocks
await for (var v in stream) {
print(v);
}
print('This prints after stream closes');
Cancellation with await for. Breaking out of the loop cancels the subscription automatically.
await for (var value in stream) {
if (value > 100) break; // automatically cancels subscription
print(value);
}
Stream transformations
Just like Lists have map() and where(), Streams have transformation methods. Each one returns a new Stream.
// Transform values
stream.map((x) => x * 2)
// Filter values
stream.where((x) => x > 10)
// Take only first N
stream.take(5)
// Skip first N
stream.skip(3)
// Remove consecutive duplicates
stream.distinct()
Chaining transformations. Build pipelines by chaining methods.
// From button tap events to processed data
buttonTaps
.where((tap) => tap.button == 'submit')
.map((tap) => tap.formData)
.distinct()
.take(10)
.listen((data) => submitForm(data));
Aggregating to a single value. Some methods consume the entire stream.
// Get first element
var first = await stream.first;
// Get last element (waits for stream to close)
var last = await stream.last;
// Collect all into a List
var all = await stream.toList();
// Reduce to single value
var sum = await stream.reduce((a, b) => a + b);
// Check conditions
var hasNegative = await stream.any((x) => x < 0);
var allPositive = await stream.every((x) => x > 0);
Warning: Methods like toList() and last wait for the stream to close. On an infinite stream, they never return.
StreamController — creating your own streams
StreamController is the bridge between imperative code and streams. You push values in with add(), and listeners receive them through the stream.
var controller = StreamController<int>();
// The stream side — what listeners see
controller.stream.listen((value) {
print('Received: \$value');
});
// The sink side — where you push values
controller.add(1);
controller.add(2);
controller.add(3);
controller.close(); // signal no more values
Handling errors. Use addError() to send errors through the stream.
controller.add(1);
controller.addError(Exception('Something went wrong'));
controller.add(2); // listeners still receive this
Broadcast controllers. By default, StreamController creates single-subscription streams. For multiple listeners, use the broadcast constructor.
var broadcast = StreamController<int>.broadcast();
broadcast.stream.listen((v) => print('A: \$v'));
broadcast.stream.listen((v) => print('B: \$v'));
broadcast.add(42); // Both receive 42
Cleanup is essential. Always close controllers when done to prevent memory leaks.
class CounterService {
final _controller = StreamController<int>.broadcast();
Stream<int> get countStream => _controller.stream;
void increment() => _controller.add(_count++);
void dispose() {
_controller.close(); // Important!
}
}
Back-pressure with onListen/onPause/onResume. For more control, use the callback parameters.
var controller = StreamController<int>(
onListen: () => print('Someone subscribed'),
onPause: () => print('Listener paused'),
onResume: () => print('Listener resumed'),
onCancel: () => print('Listener cancelled'),
);
async* generators and yield
We saw sync* generators in the functions episode — they produce synchronous iterables. The async variant, async*, produces Streams.
// sync* returns Iterable, uses yield
Iterable<int> countSync(int n) sync* {
for (var i = 0; i < n; i++) {
yield i;
}
}
// async* returns Stream, uses yield
Stream<int> countAsync(int n) async* {
for (var i = 0; i < n; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
yield* for delegation. Use yield* to forward all values from another stream.
Stream<int> countAndDouble(int n) async* {
yield* countAsync(n); // yield all values from countAsync
yield* countAsync(n).map((x) => x * 2); // then doubled values
}
When to use async* vs StreamController:
•
async* is simpler for producing values in a loop
•
StreamController gives more control — pause, resume, multiple listeners
•
async* creates single-subscription streams by default
// Real-world example: polling an API
Stream<StockPrice> watchStock(String symbol) async* {
while (true) {
var price = await fetchPrice(symbol);
yield price;
await Future.delayed(Duration(seconds: 5));
}
}
// Usage
await for (var price in watchStock('AAPL')) {
updateDisplay(price);
if (marketClosed) break; // cancels the stream
}
Practical patterns — debounce, throttle, and combining
Real applications need more than basic transformations. Let's look at common patterns.
Debounce — wait for pause in events. Useful for search-as-you-type where you don't want to fire a request on every keystroke.
// Using rxdart package
textFieldStream
.debounceTime(Duration(milliseconds: 300))
.listen((query) => search(query));
// Manual implementation
Stream<T> debounce<T>(Stream<T> source, Duration duration) async* {
Timer? timer;
T? lastValue;
var hasValue = false;
await for (var value in source) {
timer?.cancel();
lastValue = value;
hasValue = true;
timer = Timer(duration, () {});
await timer.tick; // simplified
if (hasValue) {
yield lastValue as T;
hasValue = false;
}
}
}
Throttle — limit event frequency. Emit at most one value per time window.
// Limit scroll events to 60fps
scrollEvents
.throttleTime(Duration(milliseconds: 16))
.listen((pos) => updateParallax(pos));
Combining streams. Merge multiple sources into one.
// Merge: interleave events from multiple streams
var combined = StreamGroup.merge([stream1, stream2, stream3]);
// Zip: pair up values (waits for both)
var zipped = Rx.zip2(namesStream, agesStream,
(name, age) => '\$name is \$age');
// CombineLatest: emit when any stream emits
var latest = Rx.combineLatest2(tempStream, humidityStream,
(temp, humidity) => 'Temp: \$temp, Humidity: \$humidity');
Error recovery. Handle errors without stopping the stream.
stream
.handleError((error) => print('Caught: \$error'))
.listen((data) => process(data));
// Or retry on failure
stream
.onErrorRetry(maxRetries: 3)
.listen((data) => process(data));
Disposing properly in Flutter. Always cancel subscriptions when your widget is disposed.
class _MyWidgetState extends State<MyWidget> {
late StreamSubscription _subscription;
@override
void initState() {
super.initState();
_subscription = dataStream.listen(_onData);
}
@override
void dispose() {
_subscription.cancel(); // Prevent memory leaks!
super.dispose();
}
void _onData(data) => setState(() => _data = data);
}
Test your understanding
7 questions
Seven questions covering Streams, StreamControllers, and async generators.