This commit is contained in:
thosakwe 2017-04-17 07:03:42 -04:00
parent 7120dafc0c
commit 6e1c1f4191
3 changed files with 28 additions and 16 deletions

View file

@ -1,5 +1,5 @@
# angel_websocket # angel_websocket
[![1.0.4+2](https://img.shields.io/badge/pub-1.0.4+2-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket) [![1.0.4+3](https://img.shields.io/badge/pub-1.0.4+3-brightgreen.svg)](https://pub.dartlang.org/packages/angel_websocket)
[![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket) [![build status](https://travis-ci.org/angel-dart/websocket.svg)](https://travis-ci.org/angel-dart/websocket)
WebSocket plugin for Angel. WebSocket plugin for Angel.

View file

@ -1,4 +1,5 @@
import 'dart:async'; import 'dart:async';
import 'dart:collection';
import 'dart:convert'; import 'dart:convert';
import 'package:angel_client/angel_client.dart'; import 'package:angel_client/angel_client.dart';
import 'package:http/src/base_client.dart' as http; import 'package:http/src/base_client.dart' as http;
@ -14,6 +15,7 @@ final RegExp _straySlashes = new RegExp(r"(^/)|(/+$)");
abstract class BaseWebSocketClient extends BaseAngelClient { abstract class BaseWebSocketClient extends BaseAngelClient {
Duration _reconnectInterval; Duration _reconnectInterval;
WebSocketChannel _socket; WebSocketChannel _socket;
final Queue<WebSocketAction> _queue = new Queue<WebSocketAction>();
final StreamController _onData = new StreamController(); final StreamController _onData = new StreamController();
final StreamController<WebSocketEvent> _onAllEvents = final StreamController<WebSocketEvent> _onAllEvents =
@ -99,6 +101,12 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
getConnectedWebSocket().then((socket) { getConnectedWebSocket().then((socket) {
if (!c.isCompleted) { if (!c.isCompleted) {
if (timer.isActive) timer.cancel(); if (timer.isActive) timer.cancel();
while (_queue.isNotEmpty) {
var action = _queue.removeFirst();
socket.sink.add(serialize(action));
}
c.complete(socket); c.complete(socket);
} }
}).catchError((e, st) { }).catchError((e, st) {
@ -174,6 +182,7 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
}, },
cancelOnError: true, cancelOnError: true,
onDone: () { onDone: () {
_socket = null;
if (reconnectOnClose == true) { if (reconnectOnClose == true) {
new Timer.periodic(reconnectInterval, (Timer timer) async { new Timer.periodic(reconnectInterval, (Timer timer) async {
var result; var result;
@ -199,6 +208,9 @@ abstract class BaseWebSocketClient extends BaseAngelClient {
/// Sends the given [action] on the [socket]. /// Sends the given [action] on the [socket].
void sendAction(WebSocketAction action) { void sendAction(WebSocketAction action) {
if (_socket == null)
_queue.addLast(action);
else
socket.sink.add(serialize(action)); socket.sink.add(serialize(action));
} }
@ -321,56 +333,56 @@ class BaseWebSocketService extends Service {
/// Sends the given [action] on the [socket]. /// Sends the given [action] on the [socket].
void send(WebSocketAction action) { void send(WebSocketAction action) {
socket.sink.add(serialize(action)); app.sendAction(action);
} }
@override @override
Future index([Map params]) async { Future index([Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_INDEX}', params: params ?? {}))); eventName: '$path::${ACTION_INDEX}', params: params ?? {}));
return null; return null;
} }
@override @override
Future read(id, [Map params]) async { Future read(id, [Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_READ}', id: id, params: params ?? {}))); eventName: '$path::${ACTION_READ}', id: id, params: params ?? {}));
return null; return null;
} }
@override @override
Future create(data, [Map params]) async { Future create(data, [Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_CREATE}', eventName: '$path::${ACTION_CREATE}',
data: data, data: data,
params: params ?? {}))); params: params ?? {}));
return null; return null;
} }
@override @override
Future modify(id, data, [Map params]) async { Future modify(id, data, [Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_MODIFY}', eventName: '$path::${ACTION_MODIFY}',
id: id, id: id,
data: data, data: data,
params: params ?? {}))); params: params ?? {}));
return null; return null;
} }
@override @override
Future update(id, data, [Map params]) async { Future update(id, data, [Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_UPDATE}', eventName: '$path::${ACTION_UPDATE}',
id: id, id: id,
data: data, data: data,
params: params ?? {}))); params: params ?? {}));
return null; return null;
} }
@override @override
Future remove(id, [Map params]) async { Future remove(id, [Map params]) async {
socket.sink.add(serialize(new WebSocketAction( app.sendAction(new WebSocketAction(
eventName: '$path::${ACTION_REMOVE}', id: id, params: params ?? {}))); eventName: '$path::${ACTION_REMOVE}', id: id, params: params ?? {}));
return null; return null;
} }
} }

View file

@ -2,7 +2,7 @@ name: angel_websocket
description: WebSocket plugin for Angel. description: WebSocket plugin for Angel.
environment: environment:
sdk: ">=1.19.0" sdk: ">=1.19.0"
version: 1.0.4+2 version: 1.0.4+3
author: Tobe O <thosakwe@gmail.com> author: Tobe O <thosakwe@gmail.com>
homepage: https://github.com/angel-dart/angel_websocket homepage: https://github.com/angel-dart/angel_websocket
dependencies: dependencies: