diff --git a/book/src/libs/sync/channel.md b/book/src/libs/sync/channel.md index 04d1bd6..ae6fef6 100644 --- a/book/src/libs/sync/channel.md +++ b/book/src/libs/sync/channel.md @@ -1,13 +1,17 @@ # channel -`channel` is used for communication between produces and consumers. `channel` is +rust_core supports two types of channels, "local" channels (same isolate) and "isolate" channels (different isolates). + +## Local Channels + +`channel` is used for communication between produces and consumers on the **same** isolate. `channel` is similar to `StreamController` except it buffers data until read and will never throw. In more detail, `channel` returns a `Sender` and `Receiver`. Each item `T` sent by the `Sender` will only be seen once by the `Receiver`. If the `Sender` calls `close` while the `Receiver`'s buffer is not empty, the `Receiver` will still yield the remaining items in the buffer until empty. -## Examples -### Single Sender, Single Receiver +### Examples +#### Single Sender, Single Receiver In this example, a single sender sends data to a single receiver. The receiver retrieves the data and processes it. ```dart @@ -120,4 +124,96 @@ void main() async { print(value); // Outputs: 1, 2, 3 } } +``` + +## Isolate Channels + +`isolateChannel` is used to bi-directional isolate communication with channels. The returned +`Sender` and `Receiver` can communicate with the spawned isolate and +the spawned isolate is passed a `Sender` and `Receiver` to communicate with the original isolate. +Each item `T` sent by the `Sender` will only be seen once by the `Receiver`. If the `Sender` calls `close` while the `Receiver`'s buffer +is not empty, the `Receiver` will still yield the remaining items in the buffer until empty. +Types that can be sent over a `SendPort` as defined [here](https://api.flutter.dev/flutter/dart-isolate/SendPort/send.html) +are allow to be sent between isolates. Otherwise a `toIsolateCodec` and/or a `fromIsolateCodec` can be passed +to encode and decode the messages. + +### Examples + +#### Simple String Communication +This example demonstrates a simple string message being sent and received between the main isolate and a spawned isolate. + +```dart +void main() async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + assert((await rx2.recv()).unwrap() == "hello"); + tx2.send("hi"); + }, toIsolateCodec: const StringCodec(), fromIsolateCodec: const StringCodec()); + + tx1.send("hello"); + expect((await rx1.recv()).unwrap(), "hi"); +} +``` +#### Different Codecs for Communication +This example demonstrates using different codecs for communication between the main isolate and a spawned isolate. + +```dart +void main() async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + assert((await rx2.recv()).unwrap() == "hello"); + tx2.send(1); + }, toIsolateCodec: const StringCodec(), fromIsolateCodec: const IntCodec()); + + tx1.send("hello"); + expect((await rx1.recv()).unwrap(), 1); +} +``` +#### No Codecs +This example demonstrates communication without specifying codecs, relying on the default codecs. + +```dart +void main() async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + assert((await rx2.recv()).unwrap() == "hello"); + tx2.send(1); + }); + + tx1.send("hello"); + expect((await rx1.recv()).unwrap(), 1); +} +``` +#### Bi-directional Send and Receive +This example demonstrates a more complex scenario where multiple messages are sent and received in both directions. + +```dart +void main() async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + await Future.delayed(Duration(milliseconds: 100)); + tx2.send((await rx2.recv()).unwrap() * 10); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send((await rx2.recv()).unwrap() * 10); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send(6); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send((await rx2.recv()).unwrap() * 10); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send((await rx2.recv()).unwrap() * 10); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send(7); + await Future.delayed(Duration(milliseconds: 100)); + tx2.send((await rx2.recv()).unwrap() * 10); + }, toIsolateCodec: const IntCodec(), fromIsolateCodec: const IntCodec()); + + tx1.send(1); + expect(await rx1.recv().unwrap(), 10); + tx1.send(2); + expect(await rx1.recv().unwrap(), 20); + tx1.send(3); + expect(await rx1.recv().unwrap(), 6); + tx1.send(4); + expect(await rx1.recv().unwrap(), 30); + tx1.send(5); + expect(await rx1.recv().unwrap(), 40); + expect(await rx1.recv().unwrap(), 7); + expect(await rx1.recv().unwrap(), 50); +} ``` \ No newline at end of file diff --git a/lib/src/sync/channel.dart b/lib/src/sync/channel.dart index c27b97a..cd8a1f1 100644 --- a/lib/src/sync/channel.dart +++ b/lib/src/sync/channel.dart @@ -11,7 +11,7 @@ part 'isolate_channel.dart'; /// Creates a new channel, returning the [Sender] and [LocalReceiver]. Each item [T] sent by the [Sender] /// will only be seen once by the [LocalReceiver]. If the [Sender] calls [close] while the [LocalReceiver]s buffer /// is not empty, the [LocalReceiver] will still yield the remaining items in the buffer until empty. -(Sender, LocalReceiver) channel() { +(Sender, Receiver) channel() { // broadcast so no buffer StreamController controller = StreamController.broadcast(); return (LocalSender._(controller.sink), LocalReceiver._(controller.stream)); @@ -64,12 +64,13 @@ class LocalSender implements Sender { } /// [Receiver] for a single isolate. -class LocalReceiver { +class LocalReceiver implements Receiver { late final StreamSubscription _streamSubscription; final List> _buffer = []; bool _isClosed = false; Completer _waker = Completer(); + @override bool get isClosed => _isClosed; LocalReceiver._(Stream stream) { @@ -95,11 +96,7 @@ class LocalReceiver { }, cancelOnError: false); } - /// Attempts to wait for a value on this receiver, returning [Err] of: - /// - /// [DisconnectedError] if the [Sender] called [close] and the buffer is empty. - /// - /// [OtherError] if the item in the buffer is an error, indicated by the sender calling [addError]. + @override Future> recv() async { try { return await _next(); @@ -108,13 +105,7 @@ class LocalReceiver { } } - /// Attempts to wait for a value on this receiver with a time limit, returning [Err] of: - /// - /// [DisconnectedError] if the [Sender] called [close] and the buffer is empty. - /// - /// [OtherError] if the item in the buffer is an error, indicated by the sender calling [addError]. - /// - /// [TimeoutError] if the time limit is reached before the [Sender] sent any more data. + @override Future> recvTimeout(Duration timeLimit) async { try { return await _next() @@ -127,7 +118,7 @@ class LocalReceiver { } } - /// Returns an [RIterator] that drains the current buffer. + @override RIterator iter() { return RIterator.fromIterable(_iter()); } @@ -143,7 +134,7 @@ class LocalReceiver { } } - /// Returns a [Stream] of values ending once [DisconnectedError] is yielded. + @override Stream stream() async* { while (true) { final rec = await recv(); diff --git a/lib/src/sync/isolate_channel.dart b/lib/src/sync/isolate_channel.dart index 69cca68..5712856 100644 --- a/lib/src/sync/isolate_channel.dart +++ b/lib/src/sync/isolate_channel.dart @@ -33,6 +33,15 @@ class IsolateReceiver extends LocalReceiver { .cast()); } +/// [isolateChannel] is used to bi-directional isolate communication with channels. The returned +/// [Sender] and [Receiver] can communicate with the spawned isolate and +/// the spawned isolate is passed a [Sender] and [Receiver] to communicate with the original isolate. +/// Each item `T` sent by a [Sender] will only be seen once by the corresponding [Receiver]. +/// If the [Sender] calls `close` while the [Receiver]'s buffer +/// is not empty, the [Receiver] will still yield the remaining items in the buffer until empty. +/// Types that can be sent over a [SendPort] as defined here https://api.flutter.dev/flutter/dart-isolate/SendPort/send.html +/// are allow to be sent between isolates. Otherwise a [toIsolateCodec] and/or a [fromIsolateCodec] can be passed +/// to encode and decode the messages. Future<(IsolateSender tx1, IsolateReceiver rx1)> isolateChannel( FutureOr Function(IsolateSender tx2, IsolateReceiver rx2) func, {SendCodec? toIsolateCodec, diff --git a/lib/src/sync/send_codec.dart b/lib/src/sync/send_codec.dart index dba00ba..cbdf849 100644 --- a/lib/src/sync/send_codec.dart +++ b/lib/src/sync/send_codec.dart @@ -74,23 +74,23 @@ class BooleanCodec implements SendCodec { //************************************************************************// class ListSizedTCodec implements SendCodec> { - final SendCodec _codec; - final int _bytesPerT; + final SendCodec _tCodec; + final int tSizeInBytes; - const ListSizedTCodec(this._codec, this._bytesPerT) : assert(_bytesPerT > 0); + const ListSizedTCodec(this._tCodec, this.tSizeInBytes) : assert(tSizeInBytes > 0); @override List decode(ByteBuffer buffer) { final byteData = buffer.asByteData(); final length = byteData.getInt64(0, Endian.big); - assert(byteData.lengthInBytes == 8 + length * _bytesPerT); + assert(byteData.lengthInBytes == 8 + length * tSizeInBytes); final list = List.filled(length, null); for (int i = 0; i < length; i++) { - final elementBytes = ByteData(_bytesPerT); - for (int j = 0; j < _bytesPerT; j++) { - elementBytes.setUint8(j, byteData.getUint8(8 + j + i * _bytesPerT)); + final elementBytes = ByteData(tSizeInBytes); + for (int j = 0; j < tSizeInBytes; j++) { + elementBytes.setUint8(j, byteData.getUint8(8 + j + i * tSizeInBytes)); } - list[i] = _codec.decode(elementBytes.buffer); + list[i] = _tCodec.decode(elementBytes.buffer); } return list.cast(); } @@ -98,13 +98,13 @@ class ListSizedTCodec implements SendCodec> { @override ByteBuffer encode(List data) { final length = data.length; - final bytes = ByteData(8 + length * _bytesPerT); + final bytes = ByteData(8 + length * tSizeInBytes); bytes.setInt64(0, length, Endian.big); for (int i = 0; i < data.length; i++) { - final elementBytes = _codec.encode(data[i]).asByteData(); - assert(elementBytes.lengthInBytes == _bytesPerT); - for (int j = 0; j < _bytesPerT; j++) { - bytes.setUint8(8 + j + i * _bytesPerT, elementBytes.getUint8(j)); + final elementBytes = _tCodec.encode(data[i]).asByteData(); + assert(elementBytes.lengthInBytes == tSizeInBytes); + for (int j = 0; j < tSizeInBytes; j++) { + bytes.setUint8(8 + j + i * tSizeInBytes, elementBytes.getUint8(j)); } } return bytes.buffer; diff --git a/test/sync/isolate_channel_test.dart b/test/sync/isolate_channel_test.dart index 9b3fa1b..fd03c57 100644 --- a/test/sync/isolate_channel_test.dart +++ b/test/sync/isolate_channel_test.dart @@ -61,5 +61,49 @@ void main() async { expect(await rx1.recv().unwrap(), 7); expect(await rx1.recv().unwrap(), 50); }); + + test("Error handling in isolate", () async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + assert((await rx2.recv()).unwrap() == "hello"); + throw Exception("An error occurred"); + }, toIsolateCodec: const StringCodec(), fromIsolateCodec: const StringCodec()); + + tx1.send("hello"); + expect((await rx1.recv()).unwrapErr(), DisconnectedError()); + }); + + test("Complex data types", () async { + final (tx1, rx1) = await isolateChannel, List>((tx2, rx2) async { + List data = (await rx2.recv()).unwrap(); + data.sort(); + tx2.send(data); + }); + + tx1.send([3, 1, 2]); + expect((await rx1.recv()).unwrap(), [1, 2, 3]); + }); + + test("Timeouts and delays", () async { + final (tx1, rx1) = await isolateChannel((tx2, rx2) async { + await Future.delayed(Duration(milliseconds: 500)); + tx2.send((await rx2.recv()).unwrap() * 10); + }, toIsolateCodec: const IntCodec(), fromIsolateCodec: const IntCodec()); + + tx1.send(5); + final result = await rx1.recv().timeout(Duration(seconds: 1)); + expect(result.unwrap(), 50); + }); + + test("Bidirectional complex data type messages", () async { + final (tx1, rx1) = await isolateChannel, Map>((tx2, rx2) async { + var data = (await rx2.recv()).unwrap(); + data["b"] = data["a"]! * 10; + tx2.send(data); + }); + + tx1.send({"a": 5}); + var response = await rx1.recv(); + expect(response.unwrap(), {"a": 5, "b": 50}); + }); }); }