platform/packages/pub_sub/test/json_rpc_2_test.dart

191 lines
5.8 KiB
Dart
Raw Normal View History

2021-03-08 12:56:39 +00:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pub_sub/pub_sub.dart';
import 'package:pub_sub/json_rpc_2.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
2021-05-01 00:44:20 +00:00
void main() {
2021-03-08 12:56:39 +00:00
ServerSocket serverSocket;
Server server;
Client client1, client2, client3;
JsonRpc2Client trustedClient;
JsonRpc2Adapter adapter;
setUp(() async {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);
2021-05-01 00:44:20 +00:00
adapter = JsonRpc2Adapter(
2021-03-08 12:56:39 +00:00
serverSocket.map<StreamChannel<String>>(streamSocket),
isTrusted: true);
var socket1 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket2 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket3 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
var socket4 =
await Socket.connect(InternetAddress.loopbackIPv4, serverSocket.port);
2021-05-01 00:44:20 +00:00
client1 = JsonRpc2Client('json_rpc_2_test::secret', streamSocket(socket1));
client2 = JsonRpc2Client('json_rpc_2_test::secret2', streamSocket(socket2));
client3 = JsonRpc2Client('json_rpc_2_test::secret3', streamSocket(socket3));
trustedClient = JsonRpc2Client(null, streamSocket(socket4));
2021-03-08 12:56:39 +00:00
2021-05-01 00:44:20 +00:00
server = Server([adapter])
2021-03-08 12:56:39 +00:00
..registerClient(const ClientInfo('json_rpc_2_test::secret'))
..registerClient(const ClientInfo('json_rpc_2_test::secret2'))
..registerClient(const ClientInfo('json_rpc_2_test::secret3'))
..registerClient(
const ClientInfo('json_rpc_2_test::no_publish', canPublish: false))
..registerClient(const ClientInfo('json_rpc_2_test::no_subscribe',
canSubscribe: false))
..start();
var sub = await client3.subscribe('foo');
sub.listen((data) {
print('Client3 caught foo: $data');
});
});
tearDown(() {
2021-05-01 00:44:20 +00:00
Future.wait([
server?.close(),
client1?.close(),
client2?.close(),
client3?.close()
]);
2021-03-08 12:56:39 +00:00
});
group('trusted', () {
test('can publish', () async {
await trustedClient.publish('hey', 'bye');
expect(trustedClient.clientId, isNotNull);
});
test('can sub/unsub', () async {
String clientId;
await trustedClient.publish('heyaaa', 'byeaa');
expect(clientId = trustedClient.clientId, isNotNull);
var sub = await trustedClient.subscribe('yeppp');
expect(trustedClient.clientId, clientId);
await sub.unsubscribe();
expect(trustedClient.clientId, clientId);
});
});
test('subscribers receive published events', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
expect(await sub.first, 'bar');
});
test('subscribers are not sent their own events', () async {
var sub = await client1.subscribe('foo');
await client1.publish('foo',
'<this should never be sent to client1, because client1 sent it.>');
await sub.unsubscribe();
expect(await sub.isEmpty, isTrue);
});
test('can unsubscribe', () async {
var sub = await client2.subscribe('foo');
await client1.publish('foo', 'bar');
await sub.unsubscribe();
await client1.publish('foo', '<client2 will not catch this!>');
expect(await sub.length, 1);
});
group('json_rpc_2_server', () {
test('reject unknown client id', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
var client =
2021-05-01 00:44:20 +00:00
JsonRpc2Client('json_rpc_2_test::invalid', streamSocket(sock));
2021-03-08 12:56:39 +00:00
await client.publish('foo', 'bar');
throw 'Invalid client ID\'s should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged publish', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
2021-05-01 00:44:20 +00:00
var client =
JsonRpc2Client('json_rpc_2_test::no_publish', streamSocket(sock));
2021-03-08 12:56:39 +00:00
await client.publish('foo', 'bar');
throw 'Unprivileged publishes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
test('reject unprivileged subscribe', () async {
try {
var sock = await Socket.connect(
InternetAddress.loopbackIPv4, serverSocket.port);
2021-05-01 00:44:20 +00:00
var client =
JsonRpc2Client('json_rpc_2_test::no_subscribe', streamSocket(sock));
2021-03-08 12:56:39 +00:00
await client.subscribe('foo');
throw 'Unprivileged subscribes should throw an error, but they do not.';
} on PubSubException catch (e) {
print('Expected exception was thrown: ${e.message}');
}
});
});
}
StreamChannel<String> streamSocket(Socket socket) {
2021-05-01 00:44:20 +00:00
var channel = _SocketStreamChannel(socket);
var transfomer = StreamChannelTransformer.fromCodec(utf8);
return channel.cast<List<int>>().transform(transfomer);
2021-03-08 12:56:39 +00:00
}
class _SocketStreamChannel extends StreamChannelMixin<List<int>> {
_SocketSink _sink;
final Socket socket;
_SocketStreamChannel(this.socket);
@override
2021-05-01 00:44:20 +00:00
StreamSink<List<int>> get sink => _sink ??= _SocketSink(socket);
2021-03-08 12:56:39 +00:00
@override
Stream<List<int>> get stream => socket;
}
class _SocketSink extends StreamSink<List<int>> {
final Socket socket;
_SocketSink(this.socket);
@override
void add(List<int> event) {
socket.add(event);
}
@override
void addError(Object error, [StackTrace stackTrace]) {
Zone.current.errorCallback(error, stackTrace);
}
@override
Future addStream(Stream<List<int>> stream) {
return socket.addStream(stream);
}
@override
Future close() {
return socket.close();
}
@override
Future get done => socket.done;
}