Skip to content

Commit

Permalink
feat: Channel docs, test, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mcmah309 committed Jun 30, 2024
1 parent c46ac33 commit a437f1b
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 32 deletions.
102 changes: 99 additions & 3 deletions book/src/libs/sync/channel.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# channel

`channel` is used for communication between produces and consumers. `channel` is
rust_core supports two types of channels, "local" channels (same isolate) and "isolate" channels (different isolates).

## Local Channels

`channel` is used for communication between produces and consumers on the **same** isolate. `channel` is
similar to `StreamController` except it buffers data until read and will never throw.
In more detail, `channel` returns a `Sender` and `Receiver`. Each item `T` sent by the `Sender`
will only be seen once by the `Receiver`. If the `Sender` calls `close` while the `Receiver`'s buffer
is not empty, the `Receiver` will still yield the remaining items in the buffer until empty.

## Examples
### Single Sender, Single Receiver
### Examples
#### Single Sender, Single Receiver
In this example, a single sender sends data to a single receiver. The receiver retrieves the data and processes it.

```dart
Expand Down Expand Up @@ -120,4 +124,96 @@ void main() async {
print(value); // Outputs: 1, 2, 3
}
}
```

## Isolate Channels

`isolateChannel` is used to bi-directional isolate communication with channels. The returned
`Sender` and `Receiver` can communicate with the spawned isolate and
the spawned isolate is passed a `Sender` and `Receiver` to communicate with the original isolate.
Each item `T` sent by the `Sender` will only be seen once by the `Receiver`. If the `Sender` calls `close` while the `Receiver`'s buffer
is not empty, the `Receiver` will still yield the remaining items in the buffer until empty.
Types that can be sent over a `SendPort` as defined [here](https://api.flutter.dev/flutter/dart-isolate/SendPort/send.html)
are allow to be sent between isolates. Otherwise a `toIsolateCodec` and/or a `fromIsolateCodec` can be passed
to encode and decode the messages.

### Examples

#### Simple String Communication
This example demonstrates a simple string message being sent and received between the main isolate and a spawned isolate.

```dart
void main() async {
final (tx1, rx1) = await isolateChannel<String, String>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send("hi");
}, toIsolateCodec: const StringCodec(), fromIsolateCodec: const StringCodec());
tx1.send("hello");
expect((await rx1.recv()).unwrap(), "hi");
}
```
#### Different Codecs for Communication
This example demonstrates using different codecs for communication between the main isolate and a spawned isolate.

```dart
void main() async {
final (tx1, rx1) = await isolateChannel<String, int>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send(1);
}, toIsolateCodec: const StringCodec(), fromIsolateCodec: const IntCodec());
tx1.send("hello");
expect((await rx1.recv()).unwrap(), 1);
}
```
#### No Codecs
This example demonstrates communication without specifying codecs, relying on the default codecs.

```dart
void main() async {
final (tx1, rx1) = await isolateChannel<String, int>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
tx2.send(1);
});
tx1.send("hello");
expect((await rx1.recv()).unwrap(), 1);
}
```
#### Bi-directional Send and Receive
This example demonstrates a more complex scenario where multiple messages are sent and received in both directions.

```dart
void main() async {
final (tx1, rx1) = await isolateChannel<int, int>((tx2, rx2) async {
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send(6);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
await Future.delayed(Duration(milliseconds: 100));
tx2.send(7);
await Future.delayed(Duration(milliseconds: 100));
tx2.send((await rx2.recv()).unwrap() * 10);
}, toIsolateCodec: const IntCodec(), fromIsolateCodec: const IntCodec());
tx1.send(1);
expect(await rx1.recv().unwrap(), 10);
tx1.send(2);
expect(await rx1.recv().unwrap(), 20);
tx1.send(3);
expect(await rx1.recv().unwrap(), 6);
tx1.send(4);
expect(await rx1.recv().unwrap(), 30);
tx1.send(5);
expect(await rx1.recv().unwrap(), 40);
expect(await rx1.recv().unwrap(), 7);
expect(await rx1.recv().unwrap(), 50);
}
```
23 changes: 7 additions & 16 deletions lib/src/sync/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ part 'isolate_channel.dart';
/// Creates a new channel, returning the [Sender] and [LocalReceiver]. Each item [T] sent by the [Sender]
/// will only be seen once by the [LocalReceiver]. If the [Sender] calls [close] while the [LocalReceiver]s buffer
/// is not empty, the [LocalReceiver] will still yield the remaining items in the buffer until empty.
(Sender<T>, LocalReceiver<T>) channel<T>() {
(Sender<T>, Receiver<T>) channel<T>() {
// broadcast so no buffer
StreamController<T> controller = StreamController<T>.broadcast();
return (LocalSender._(controller.sink), LocalReceiver._(controller.stream));
Expand Down Expand Up @@ -64,12 +64,13 @@ class LocalSender<T> implements Sender<T> {
}

/// [Receiver] for a single isolate.
class LocalReceiver<T> {
class LocalReceiver<T> implements Receiver<T> {
late final StreamSubscription<T> _streamSubscription;
final List<Result<T, Object>> _buffer = [];
bool _isClosed = false;
Completer _waker = Completer();

@override
bool get isClosed => _isClosed;

LocalReceiver._(Stream<T> stream) {
Expand All @@ -95,11 +96,7 @@ class LocalReceiver<T> {
}, cancelOnError: false);
}

/// Attempts to wait for a value on this receiver, returning [Err] of:
///
/// [DisconnectedError] if the [Sender] called [close] and the buffer is empty.
///
/// [OtherError] if the item in the buffer is an error, indicated by the sender calling [addError].
@override
Future<Result<T, RecvError>> recv() async {
try {
return await _next();
Expand All @@ -108,13 +105,7 @@ class LocalReceiver<T> {
}
}

/// Attempts to wait for a value on this receiver with a time limit, returning [Err] of:
///
/// [DisconnectedError] if the [Sender] called [close] and the buffer is empty.
///
/// [OtherError] if the item in the buffer is an error, indicated by the sender calling [addError].
///
/// [TimeoutError] if the time limit is reached before the [Sender] sent any more data.
@override
Future<Result<T, RecvTimeoutError>> recvTimeout(Duration timeLimit) async {
try {
return await _next()
Expand All @@ -127,7 +118,7 @@ class LocalReceiver<T> {
}
}

/// Returns an [RIterator] that drains the current buffer.
@override
RIterator<T> iter() {
return RIterator.fromIterable(_iter());
}
Expand All @@ -143,7 +134,7 @@ class LocalReceiver<T> {
}
}

/// Returns a [Stream] of values ending once [DisconnectedError] is yielded.
@override
Stream<T> stream() async* {
while (true) {
final rec = await recv();
Expand Down
9 changes: 9 additions & 0 deletions lib/src/sync/isolate_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class IsolateReceiver<T> extends LocalReceiver<T> {
.cast<T>());
}

/// [isolateChannel] is used to bi-directional isolate communication with channels. The returned
/// [Sender] and [Receiver] can communicate with the spawned isolate and
/// the spawned isolate is passed a [Sender] and [Receiver] to communicate with the original isolate.
/// Each item `T` sent by a [Sender] will only be seen once by the corresponding [Receiver].
/// If the [Sender] calls `close` while the [Receiver]'s buffer
/// is not empty, the [Receiver] will still yield the remaining items in the buffer until empty.
/// Types that can be sent over a [SendPort] as defined here https://api.flutter.dev/flutter/dart-isolate/SendPort/send.html
/// are allow to be sent between isolates. Otherwise a [toIsolateCodec] and/or a [fromIsolateCodec] can be passed
/// to encode and decode the messages.
Future<(IsolateSender<T> tx1, IsolateReceiver<U> rx1)> isolateChannel<T, U>(
FutureOr<void> Function(IsolateSender<U> tx2, IsolateReceiver<T> rx2) func,
{SendCodec<T>? toIsolateCodec,
Expand Down
26 changes: 13 additions & 13 deletions lib/src/sync/send_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,37 +74,37 @@ class BooleanCodec implements SendCodec<bool> {
//************************************************************************//

class ListSizedTCodec<T> implements SendCodec<List<T>> {
final SendCodec<T> _codec;
final int _bytesPerT;
final SendCodec<T> _tCodec;
final int tSizeInBytes;

const ListSizedTCodec(this._codec, this._bytesPerT) : assert(_bytesPerT > 0);
const ListSizedTCodec(this._tCodec, this.tSizeInBytes) : assert(tSizeInBytes > 0);

@override
List<T> decode(ByteBuffer buffer) {
final byteData = buffer.asByteData();
final length = byteData.getInt64(0, Endian.big);
assert(byteData.lengthInBytes == 8 + length * _bytesPerT);
assert(byteData.lengthInBytes == 8 + length * tSizeInBytes);
final list = List<T?>.filled(length, null);
for (int i = 0; i < length; i++) {
final elementBytes = ByteData(_bytesPerT);
for (int j = 0; j < _bytesPerT; j++) {
elementBytes.setUint8(j, byteData.getUint8(8 + j + i * _bytesPerT));
final elementBytes = ByteData(tSizeInBytes);
for (int j = 0; j < tSizeInBytes; j++) {
elementBytes.setUint8(j, byteData.getUint8(8 + j + i * tSizeInBytes));
}
list[i] = _codec.decode(elementBytes.buffer);
list[i] = _tCodec.decode(elementBytes.buffer);
}
return list.cast<T>();
}

@override
ByteBuffer encode(List<T> data) {
final length = data.length;
final bytes = ByteData(8 + length * _bytesPerT);
final bytes = ByteData(8 + length * tSizeInBytes);
bytes.setInt64(0, length, Endian.big);
for (int i = 0; i < data.length; i++) {
final elementBytes = _codec.encode(data[i]).asByteData();
assert(elementBytes.lengthInBytes == _bytesPerT);
for (int j = 0; j < _bytesPerT; j++) {
bytes.setUint8(8 + j + i * _bytesPerT, elementBytes.getUint8(j));
final elementBytes = _tCodec.encode(data[i]).asByteData();
assert(elementBytes.lengthInBytes == tSizeInBytes);
for (int j = 0; j < tSizeInBytes; j++) {
bytes.setUint8(8 + j + i * tSizeInBytes, elementBytes.getUint8(j));
}
}
return bytes.buffer;
Expand Down
44 changes: 44 additions & 0 deletions test/sync/isolate_channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,49 @@ void main() async {
expect(await rx1.recv().unwrap(), 7);
expect(await rx1.recv().unwrap(), 50);
});

test("Error handling in isolate", () async {
final (tx1, rx1) = await isolateChannel<String, String>((tx2, rx2) async {
assert((await rx2.recv()).unwrap() == "hello");
throw Exception("An error occurred");
}, toIsolateCodec: const StringCodec(), fromIsolateCodec: const StringCodec());

tx1.send("hello");
expect((await rx1.recv()).unwrapErr(), DisconnectedError());
});

test("Complex data types", () async {
final (tx1, rx1) = await isolateChannel<List<int>, List<int>>((tx2, rx2) async {
List<int> data = (await rx2.recv()).unwrap();
data.sort();
tx2.send(data);
});

tx1.send([3, 1, 2]);
expect((await rx1.recv()).unwrap(), [1, 2, 3]);
});

test("Timeouts and delays", () async {
final (tx1, rx1) = await isolateChannel<int, int>((tx2, rx2) async {
await Future.delayed(Duration(milliseconds: 500));
tx2.send((await rx2.recv()).unwrap() * 10);
}, toIsolateCodec: const IntCodec(), fromIsolateCodec: const IntCodec());

tx1.send(5);
final result = await rx1.recv().timeout(Duration(seconds: 1));
expect(result.unwrap(), 50);
});

test("Bidirectional complex data type messages", () async {
final (tx1, rx1) = await isolateChannel<Map<String, int>, Map<String, int>>((tx2, rx2) async {
var data = (await rx2.recv()).unwrap();
data["b"] = data["a"]! * 10;
tx2.send(data);
});

tx1.send({"a": 5});
var response = await rx1.recv();
expect(response.unwrap(), {"a": 5, "b": 50});
});
});
}

0 comments on commit a437f1b

Please sign in to comment.