Apollo port + graphQLWS
This commit is contained in:
parent
6c08f61a3d
commit
37d58529c2
7 changed files with 220 additions and 2 deletions
|
@ -7,6 +7,8 @@ import 'package:angel_validate/server.dart';
|
|||
import 'package:graphql_parser/graphql_parser.dart';
|
||||
import 'package:graphql_schema/graphql_schema.dart';
|
||||
import 'package:graphql_server/graphql_server.dart';
|
||||
import 'package:graphql_server/subscriptions_transport_ws.dart' as stw;
|
||||
import 'package:web_socket_channel/io.dart';
|
||||
|
||||
/// A [RequestHandler] that serves a spec-compliant GraphQL backend, over WebSockets.
|
||||
/// This endpoint only supports WebSockets, and can be used to deliver subscription events.
|
||||
|
@ -22,8 +24,10 @@ RequestHandler graphQLWS(GraphQL graphQL) {
|
|||
if (WebSocketTransformer.isUpgradeRequest(req.rawRequest)) {
|
||||
await res.detach();
|
||||
var socket = await WebSocketTransformer.upgrade(req.rawRequest);
|
||||
// TODO: Apollo protocol
|
||||
throw UnimplementedError('Apollo protocol not yet implemented.');
|
||||
var channel = IOWebSocketChannel(socket);
|
||||
var client = stw.RemoteClient(channel.cast<String>());
|
||||
var server = _GraphQLWSServer(client, graphQL, req, res);
|
||||
await server.done;
|
||||
} else {
|
||||
throw AngelHttpException.badRequest(
|
||||
message: 'The `graphQLWS` endpoint only accepts WebSockets.');
|
||||
|
@ -34,3 +38,35 @@ RequestHandler graphQLWS(GraphQL graphQL) {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
class _GraphQLWSServer extends stw.Server {
|
||||
final GraphQL graphQL;
|
||||
final RequestContext req;
|
||||
final ResponseContext res;
|
||||
|
||||
_GraphQLWSServer(stw.RemoteClient client, this.graphQL, this.req, this.res)
|
||||
: super(client);
|
||||
|
||||
@override
|
||||
bool onConnect(stw.RemoteClient client, [Map connectionParams]) => true;
|
||||
|
||||
@override
|
||||
Future<stw.GraphQLResult> onOperation(String id, String query,
|
||||
[Map<String, dynamic> variables, String operationName]) async {
|
||||
try {
|
||||
var globalVariables = <String, dynamic>{
|
||||
'__requestctx': req,
|
||||
'__responsectx': res,
|
||||
};
|
||||
var data = await graphQL.parseAndExecute(
|
||||
query,
|
||||
operationName: operationName,
|
||||
sourceUrl: 'input',
|
||||
globalVariables: globalVariables,
|
||||
);
|
||||
return stw.GraphQLResult(data);
|
||||
} on GraphQLException catch (e) {
|
||||
return stw.GraphQLResult(null, errors: e.errors);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ dependencies:
|
|||
graphql_schema: ^1.0.0
|
||||
graphql_server: ^1.0.0-beta
|
||||
http_parser: ^3.0.0
|
||||
web_socket_channel: ^1.0.0
|
||||
dev_dependencies:
|
||||
angel_serialize: ^2.0.0
|
||||
logging: ^0.11.0
|
||||
|
|
29
graphql_server/lib/src/apollo/remote_client.dart
Normal file
29
graphql_server/lib/src/apollo/remote_client.dart
Normal file
|
@ -0,0 +1,29 @@
|
|||
import 'dart:async';
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
import 'transport.dart';
|
||||
|
||||
class RemoteClient extends StreamChannelMixin<OperationMessage> {
|
||||
final StreamChannel<Map> channel;
|
||||
final StreamChannelController<OperationMessage> _ctrl =
|
||||
StreamChannelController();
|
||||
|
||||
RemoteClient.withoutJson(this.channel) {
|
||||
_ctrl.local.stream.map((m) => m.toJson()).cast<Map>().pipe(channel.sink);
|
||||
channel.stream.listen((m) {
|
||||
_ctrl.local.sink.add(OperationMessage.fromJson(m));
|
||||
});
|
||||
}
|
||||
|
||||
RemoteClient(StreamChannel<String> channel)
|
||||
: this.withoutJson(jsonDocument.bind(channel).cast<Map>());
|
||||
@override
|
||||
StreamSink<OperationMessage> get sink => _ctrl.foreign.sink;
|
||||
|
||||
@override
|
||||
Stream<OperationMessage> get stream => _ctrl.foreign.stream;
|
||||
|
||||
void close() {
|
||||
channel.sink.close();
|
||||
_ctrl.local.sink.close();
|
||||
}
|
||||
}
|
92
graphql_server/lib/src/apollo/server.dart
Normal file
92
graphql_server/lib/src/apollo/server.dart
Normal file
|
@ -0,0 +1,92 @@
|
|||
import 'dart:async';
|
||||
import 'remote_client.dart';
|
||||
import 'transport.dart';
|
||||
|
||||
abstract class Server {
|
||||
final RemoteClient client;
|
||||
final Completer _done = Completer();
|
||||
StreamSubscription<OperationMessage> _sub;
|
||||
bool _init = false;
|
||||
|
||||
Future get done => _done.future;
|
||||
|
||||
Server(this.client) {
|
||||
_sub = client.stream.listen((msg) async {
|
||||
if (msg.type == OperationMessage.gqlConnectionInit && !_init) {
|
||||
try {
|
||||
Map connectionParams = null;
|
||||
if (msg.payload is Map)
|
||||
connectionParams = msg.payload as Map;
|
||||
else if (msg.payload != null)
|
||||
throw FormatException(
|
||||
'${msg.type} payload must be a map (object).');
|
||||
|
||||
var connect = await onConnect(client, connectionParams);
|
||||
if (!connect) throw false;
|
||||
_init = true;
|
||||
client.sink.add(OperationMessage(OperationMessage.gqlConnectionAck));
|
||||
} catch (e) {
|
||||
if (e == false)
|
||||
_reportError('The connection was rejected.');
|
||||
else
|
||||
_reportError(e.toString());
|
||||
}
|
||||
} else if (_init) {
|
||||
if (msg.type == OperationMessage.gqlStart) {
|
||||
if (msg.id == null)
|
||||
throw FormatException('${msg.type} id is required.');
|
||||
if (msg.payload == null)
|
||||
throw FormatException('${msg.type} payload is required.');
|
||||
else if (msg.payload is! Map)
|
||||
throw FormatException(
|
||||
'${msg.type} payload must be a map (object).');
|
||||
var payload = msg.payload as Map;
|
||||
var query = payload['query'];
|
||||
var variables = payload['variables'];
|
||||
var operationName = payload['operationName'];
|
||||
if (query == null || query is! String)
|
||||
throw FormatException(
|
||||
'${msg.type} payload must contain a string named "query".');
|
||||
if (variables != null && variables is! Map)
|
||||
throw FormatException(
|
||||
'${msg.type} payload\'s "variables" field must be a map (object).');
|
||||
if (operationName != null && operationName is! String)
|
||||
throw FormatException(
|
||||
'${msg.type} payload\'s "operationName" field must be a string.');
|
||||
var result = await onOperation(
|
||||
msg.id,
|
||||
query as String,
|
||||
(variables as Map).cast<String, dynamic>(),
|
||||
operationName as String);
|
||||
var data = result.data;
|
||||
|
||||
if (data is Stream) {
|
||||
await for (var event in data) {
|
||||
client.sink.add(OperationMessage(OperationMessage.gqlData,
|
||||
id: msg.id,
|
||||
payload: {'data': event, 'errors': result.errors}));
|
||||
}
|
||||
} else {
|
||||
client.sink.add(OperationMessage(OperationMessage.gqlData,
|
||||
id: msg.id, payload: {'data': data, 'errors': result.errors}));
|
||||
}
|
||||
|
||||
client.sink
|
||||
.add(OperationMessage(OperationMessage.gqlComplete, id: msg.id));
|
||||
} else if (msg.type == OperationMessage.gqlConnectionTerminate) {
|
||||
await _sub?.cancel();
|
||||
}
|
||||
}
|
||||
}, onError: _done.completeError, onDone: _done.complete);
|
||||
}
|
||||
|
||||
void _reportError(String message) {
|
||||
client.sink.add(OperationMessage(OperationMessage.gqlConnectionError,
|
||||
payload: {'message': message}));
|
||||
}
|
||||
|
||||
FutureOr<bool> onConnect(RemoteClient client, [Map connectionParams]);
|
||||
|
||||
FutureOr<GraphQLResult> onOperation(String id, String query,
|
||||
[Map<String, dynamic> variables, String operationName]);
|
||||
}
|
50
graphql_server/lib/src/apollo/transport.dart
Normal file
50
graphql_server/lib/src/apollo/transport.dart
Normal file
|
@ -0,0 +1,50 @@
|
|||
import 'dart:async';
|
||||
import 'package:graphql_schema/graphql_schema.dart';
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
|
||||
/// A basic message in the Apollo WebSocket protocol.
|
||||
class OperationMessage {
|
||||
static const String gqlConnectionInit = 'GQL_CONNECTION_INIT',
|
||||
gqlConnectionAck = 'GQL_CONNECTION_ACK',
|
||||
gqlConnectionKeepAlive = 'GQL_CONNECTION_KEEP_ALIVE',
|
||||
gqlConnectionError = 'GQL_CONNECTION_ERROR',
|
||||
gqlStart = 'GQL_START',
|
||||
gqlStop = 'GQL_STOP',
|
||||
gqlConnectionTerminate = 'GQL_CONNECTION_TERMINATE',
|
||||
gqlData = 'GQL_DATA',
|
||||
gqlError = 'GQL_ERROR',
|
||||
gqlComplete = 'GQL_COMPLETE';
|
||||
final dynamic payload;
|
||||
final String id;
|
||||
final String type;
|
||||
|
||||
OperationMessage(this.type, {this.payload, this.id});
|
||||
|
||||
factory OperationMessage.fromJson(Map map) {
|
||||
var type = map['type'];
|
||||
var payload = map['payload'];
|
||||
var id = map['id'];
|
||||
|
||||
if (type == null)
|
||||
throw ArgumentError.notNull('type');
|
||||
else if (type is! String)
|
||||
throw ArgumentError.value(type, 'type', 'must be a string');
|
||||
else if (id != null && id is! String)
|
||||
throw ArgumentError.value(type, 'id', 'must be a string');
|
||||
return OperationMessage(type as String, id: id as String, payload: payload);
|
||||
}
|
||||
|
||||
Map<String, dynamic> toJson() {
|
||||
var out = <String, dynamic>{'type': type};
|
||||
if (id != null) out['id'] = id;
|
||||
if (payload != null) out['payload'] = payload;
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
||||
class GraphQLResult {
|
||||
final dynamic data;
|
||||
final Iterable<GraphQLExceptionError> errors;
|
||||
|
||||
GraphQLResult(this.data, {this.errors: const []});
|
||||
}
|
9
graphql_server/lib/subscriptions_transport_ws.dart
Normal file
9
graphql_server/lib/subscriptions_transport_ws.dart
Normal file
|
@ -0,0 +1,9 @@
|
|||
/// An implementation of Apollo's `subscriptions-transport-ws` in Dart.
|
||||
///
|
||||
/// See:
|
||||
/// https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
|
||||
library graphql_server.subscriptions_transport_ws;
|
||||
|
||||
export 'src/apollo/remote_client.dart';
|
||||
export 'src/apollo/server.dart';
|
||||
export 'src/apollo/transport.dart';
|
|
@ -12,6 +12,7 @@ dependencies:
|
|||
graphql_parser: ^1.0.0
|
||||
meta: ^1.0.0
|
||||
recase: ^2.0.0
|
||||
stream_channel: ^2.0.0
|
||||
tuple: ^1.0.0
|
||||
dev_dependencies:
|
||||
test: ">=0.12.0 <2.0.0"
|
Loading…
Reference in a new issue