This commit is contained in:
thosakwe 2017-08-08 17:04:36 -04:00
parent 1ac827e537
commit b2a7af039d
6 changed files with 164 additions and 0 deletions

1
.travis.yml Normal file
View file

@ -0,0 +1 @@
language: dart

View file

@ -1,2 +1,21 @@
# sync
[![Pub](https://img.shields.io/pub/v/angel_sync.svg)](https://pub.dartlang.org/packages/angel_sync)
[![build status](https://travis-ci.org/angel-dart/sync.svg)](https://travis-ci.org/angel-dart/sync)
Easily synchronize and scale WebSockets using package:pub_sub.
# Usage
This package exposes `PubSubWebSocketSynchronizer`, which
can simply be dropped into any `AngelWebSocket` constructor.
Once you've set that up, instances of your application will
automatically fire events in-sync. That's all you have to do
to scale a real-time application with Angel!
```dart
await app.configure(new AngelWebSocket(
synchronizer: new PubSubWebSocketSynchronizer(
new pub_sub.IsolateClient('<client-id>', adapter.receivePort.sendPort),
),
));
```

2
analysis_options.yaml Normal file
View file

@ -0,0 +1,2 @@
analyzer:
strong-mode: true

41
lib/angel_sync.dart Normal file
View file

@ -0,0 +1,41 @@
import 'dart:async';
import 'package:angel_websocket/server.dart';
import 'package:pub_sub/pub_sub.dart' as pub_sub;
/// Synchronizes WebSockets using `package:pub_sub`.
class PubSubWebSocketSynchronizer extends WebSocketSynchronizer {
/// The event name used to synchronize events on the server.
static const String eventName = 'angel_sync::event';
final StreamController<WebSocketEvent> _stream =
new StreamController<WebSocketEvent>();
pub_sub.ClientSubscription _subscription;
final pub_sub.Client client;
PubSubWebSocketSynchronizer(this.client) {
client.subscribe(eventName).then((sub) {
_subscription = sub
..listen((Map data) {
if (!_stream.isClosed) _stream.add(new WebSocketEvent.fromJson(data));
}, onError: _stream.addError);
}).catchError(_stream.addError);
}
@override
Stream<WebSocketEvent> get stream => _stream.stream;
Future close() {
if (_subscription != null) {
_subscription.unsubscribe().then((_) => client.close());
} else
client.close();
return new Future.value();
}
@override
void notifyOthers(WebSocketEvent e) {
client.publish(eventName, e.toJson()).catchError(_stream.addError);
}
}

14
pubspec.yaml Normal file
View file

@ -0,0 +1,14 @@
name: angel_sync
version: 1.0.0
description: Easily synchronize and scale WebSockets using package:pub_sub.
author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/sync
environment:
sdk: ">=1.19.0"
dependencies:
angel_framework: ^1.0.0
angel_websocket: ^1.0.0
pub_sub: ^1.0.0
dev_dependencies:
angel_test: ^1.0.0
test: ^0.12.0

87
test/all_test.dart Normal file
View file

@ -0,0 +1,87 @@
import 'dart:isolate';
import 'package:angel_framework/angel_framework.dart';
import 'package:angel_sync/angel_sync.dart';
import 'package:angel_test/angel_test.dart';
import 'package:angel_websocket/io.dart' as client;
import 'package:angel_websocket/server.dart';
import 'package:pub_sub/isolate.dart' as pub_sub;
import 'package:pub_sub/pub_sub.dart' as pub_sub;
import 'package:test/test.dart';
main() {
Angel app1, app2;
TestClient app1Client;
client.WebSockets app2Client;
pub_sub.Server server;
ReceivePort app1Port, app2Port;
setUp(() async {
var adapter = new pub_sub.IsolateAdapter();
server = new pub_sub.Server([
adapter,
])
..registerClient(const pub_sub.ClientInfo('angel_sync1'))
..registerClient(const pub_sub.ClientInfo('angel_sync2'))
..start();
app1 = new Angel();
app2 = new Angel();
app1.post('/message', (RequestContext req, AngelWebSocket ws) async {
// Manually broadcast. Even though app1 has no clients, it *should*
// propagate to app2.
ws.batchEvent(new WebSocketEvent(
eventName: 'message',
data: req.body['message'],
));
return 'Sent: ${req.body['message']}';
});
app1Port = new ReceivePort();
await app1.configure(new AngelWebSocket(
synchronizer: new PubSubWebSocketSynchronizer(
new pub_sub.IsolateClient('angel_sync1', adapter.receivePort.sendPort),
),
));
app1Client = await connectTo(app1);
app2Port = new ReceivePort();
await app2.configure(new AngelWebSocket(
synchronizer: new PubSubWebSocketSynchronizer(
new pub_sub.IsolateClient('angel_sync2', adapter.receivePort.sendPort),
),
));
var http = await app2.startServer();
app2Client =
new client.WebSockets('ws://${http.address.address}:${http.port}/ws');
await app2Client.connect();
});
tearDown(() {
server.close();
app1Port.close();
app2Port.close();
app1.close();
app2.close();
app1Client.close();
app2Client.close();
});
test('events propagate', () async {
// The point of this test is that neither app1 nor app2
// is aware that the other even exists.
//
// Regardless, a WebSocket event broadcast in app1 will be
// broadcast by app2 as well.
var stream = app2Client.on['message'];
var response =
await app1Client.post('/message', body: {'message': 'Hello, world!'});
print('app1 response: ${response.body}');
var msg = await stream.first.timeout(const Duration(seconds: 5));
print('app2 got message: ${msg.data}');
});
}