diff --git a/example/basic.dart b/example/basic.dart index b7bcdb15..e694405e 100644 --- a/example/basic.dart +++ b/example/basic.dart @@ -8,10 +8,14 @@ Stream input() async* { tagline } } - '''.trim(); + ''' + .trim(); } main() async { var lexer = new Lexer(), parser = new Parser(); - await input().transform(lexer).forEach(print); + var stream = input().transform(lexer).asBroadcastStream(); + await stream.forEach(print); + stream.pipe(parser); + await parser.onNode.forEach(print); } diff --git a/example/boolean.dart b/example/boolean.dart new file mode 100644 index 00000000..643f7588 --- /dev/null +++ b/example/boolean.dart @@ -0,0 +1,12 @@ +import 'dart:async'; +import 'package:graphql_parser/src/language/language.dart'; + +Stream input() async* { + yield 'true'; +} + +main() async { + var lexer = new Lexer(), parser = new Parser(); + input().transform(lexer).pipe(parser); + await parser.onBooleanValue.forEach(print); +} diff --git a/lib/src/language/ast/boolean_value.dart b/lib/src/language/ast/boolean_value.dart index 61d5697a..3126b176 100644 --- a/lib/src/language/ast/boolean_value.dart +++ b/lib/src/language/ast/boolean_value.dart @@ -5,7 +5,11 @@ import 'value.dart'; class BooleanValueContext extends ValueContext { final Token BOOLEAN; - BooleanValueContext(this.BOOLEAN); + BooleanValueContext(this.BOOLEAN) { + assert(BOOLEAN?.text == 'true' || BOOLEAN?.text == 'false'); + } + + bool get booleanValue => BOOLEAN.text == 'true'; @override SourceSpan get span => BOOLEAN.span; diff --git a/lib/src/language/parser.dart b/lib/src/language/parser.dart index ffcb9478..98d151d4 100644 --- a/lib/src/language/parser.dart +++ b/lib/src/language/parser.dart @@ -1,26 +1,96 @@ import 'dart:async'; import 'ast/ast.dart'; import 'stream_reader.dart'; +import 'syntax_error.dart'; import 'token.dart'; +import 'token_type.dart'; class Parser implements StreamConsumer { bool _closed = false; + final Completer _closer = new Completer(); + final List _errors = []; final StreamReader _reader = new StreamReader(); + final StreamController _onBooleanValue = + new StreamController(); + final StreamController _onDocument = + new StreamController(); final StreamController _onNode = new StreamController(); + List get errors => new List.unmodifiable(_errors); + + Stream get onBooleanValue => _onBooleanValue.stream; + Stream get onDocument => _onDocument.stream; Stream get onNode => _onNode.stream; - @override - Future addStream(Stream stream) async { - if (_closed) throw new StateError('Parser is already closed.'); - stream.pipe(_reader); + Future _waterfall(List futures) async { + for (var f in futures) { + var r = await f(); + if (r != null) return r; + } } @override - Future close() async { + Future addStream(Stream stream) { + if (_closed) throw new StateError('Parser is already closed.'); + _closed = true; - await _onNode.close(); + _reader.onData.listen((data) => _waterfall([document, booleanValue])) + ..onDone(() => Future.wait([ + _onBooleanValue.close(), + _onDocument.close(), + _onNode.close(), + ])) + ..onError(_closer.completeError); + + return stream.pipe(_reader); + } + + @override + Future close() { + return _closer.future; + } + + Future expect(TokenType type) async { + var peek = await _reader.peek(); + + if (peek?.type != type) { + _errors.add(new SyntaxError.fromSourceLocation( + "Expected $type, found '${peek?.text ?? 'empty text'}' instead.", + peek?.span?.start)); + return false; + } else { + await _reader.consume(); + return true; + } + } + + Future maybe(TokenType type) async { + var peek = await _reader.peek(); + + if (peek?.type == type) { + await _reader.consume(); + return true; + } + + return false; + } + + Future nextIs(TokenType type) => + _reader.peek().then((t) => t?.type == type); + + Future document() async { + return null; + } + + Future booleanValue() async { + if (await nextIs(TokenType.BOOLEAN)) { + var result = new BooleanValueContext(await _reader.consume()); + _onBooleanValue.add(result); + return result; + } + + return null; } } diff --git a/lib/src/language/stream_reader.dart b/lib/src/language/stream_reader.dart index 415f295d..ea7e75df 100644 --- a/lib/src/language/stream_reader.dart +++ b/lib/src/language/stream_reader.dart @@ -4,10 +4,40 @@ import 'dart:collection'; class StreamReader implements StreamConsumer { final Queue _buffer = new Queue(); bool _closed = false; + T _current; + bool _onDataListening = false; final Queue> _nextQueue = new Queue(); final Queue> _peekQueue = new Queue(); + StreamController _onData; + bool get isDone => _closed; + Stream get onData => _onData.stream; + + _onListen() { + _onDataListening = true; + } + + _onPause() { + _onDataListening = false; + } + + StreamReader() { + _onData = new StreamController( + onListen: _onListen, + onResume: _onListen, + onPause: _onPause, + onCancel: _onPause); + } + + Future current() { + if (_current == null) { + if (_nextQueue.isNotEmpty) return _nextQueue.first.future; + return consume(); + } + + return new Future.value(_current); + } Future peek() { if (isDone) throw new StateError('Cannot read from closed stream.'); @@ -18,9 +48,13 @@ class StreamReader implements StreamConsumer { return c.future; } - Future next() { + Future consume() { if (isDone) throw new StateError('Cannot read from closed stream.'); - if (_buffer.isNotEmpty) return new Future.value(_buffer.removeFirst()); + + if (_buffer.isNotEmpty) { + _current = _buffer.removeFirst(); + return close().then((_) => new Future.value(_current)); + } var c = new Completer(); _nextQueue.addLast(c); @@ -34,19 +68,21 @@ class StreamReader implements StreamConsumer { var c = new Completer(); stream.listen((data) { + if (_onDataListening) _onData.add(data); + if (_peekQueue.isNotEmpty || _nextQueue.isNotEmpty) { if (_peekQueue.isNotEmpty) { _peekQueue.removeFirst().complete(data); } if (_nextQueue.isNotEmpty) { - _nextQueue.removeFirst().complete(data); + _nextQueue.removeFirst().complete(_current = data); } } else { _buffer.add(data); } }) - ..onDone(c.complete) + ..onDone(() => close().then(c.complete)) ..onError(c.completeError); return c.future; @@ -54,7 +90,17 @@ class StreamReader implements StreamConsumer { @override Future close() async { - _closed = true; + if (_buffer.isEmpty && _nextQueue.isEmpty && _peekQueue.isEmpty) { + _closed = true; + + kill(Completer c) { + c.completeError(new StateError( + 'Reached end of stream, although more input was expected.')); + } + + _peekQueue.forEach(kill); + _nextQueue.forEach(kill); + } } }