Absolutely working

This commit is contained in:
thosakwe 2017-01-24 23:28:09 -05:00
parent b99eaf1965
commit 4478f72015
5 changed files with 150 additions and 14 deletions

View file

@ -8,10 +8,14 @@ Stream<String> input() async* {
tagline
}
}
'''.trim();
'''
.trim();
}
main() async {
var lexer = new Lexer(), parser = new Parser();
await input().transform(lexer).forEach(print);
var stream = input().transform(lexer).asBroadcastStream();
await stream.forEach(print);
stream.pipe(parser);
await parser.onNode.forEach(print);
}

12
example/boolean.dart Normal file
View file

@ -0,0 +1,12 @@
import 'dart:async';
import 'package:graphql_parser/src/language/language.dart';
Stream<String> input() async* {
yield 'true';
}
main() async {
var lexer = new Lexer(), parser = new Parser();
input().transform(lexer).pipe(parser);
await parser.onBooleanValue.forEach(print);
}

View file

@ -5,7 +5,11 @@ import 'value.dart';
class BooleanValueContext extends ValueContext {
final Token BOOLEAN;
BooleanValueContext(this.BOOLEAN);
BooleanValueContext(this.BOOLEAN) {
assert(BOOLEAN?.text == 'true' || BOOLEAN?.text == 'false');
}
bool get booleanValue => BOOLEAN.text == 'true';
@override
SourceSpan get span => BOOLEAN.span;

View file

@ -1,26 +1,96 @@
import 'dart:async';
import 'ast/ast.dart';
import 'stream_reader.dart';
import 'syntax_error.dart';
import 'token.dart';
import 'token_type.dart';
class Parser implements StreamConsumer<Token> {
bool _closed = false;
final Completer _closer = new Completer();
final List<SyntaxError> _errors = [];
final StreamReader<Token> _reader = new StreamReader();
final StreamController<BooleanValueContext> _onBooleanValue =
new StreamController<BooleanValueContext>();
final StreamController<DocumentContext> _onDocument =
new StreamController<DocumentContext>();
final StreamController<Node> _onNode = new StreamController<Node>();
List<SyntaxError> get errors => new List<SyntaxError>.unmodifiable(_errors);
Stream<Node> get onBooleanValue => _onBooleanValue.stream;
Stream<Node> get onDocument => _onDocument.stream;
Stream<Node> get onNode => _onNode.stream;
@override
Future addStream(Stream<Token> stream) async {
if (_closed) throw new StateError('Parser is already closed.');
stream.pipe(_reader);
Future _waterfall(List<Function> futures) async {
for (var f in futures) {
var r = await f();
if (r != null) return r;
}
}
@override
Future close() async {
Future addStream(Stream<Token> stream) {
if (_closed) throw new StateError('Parser is already closed.');
_closed = true;
await _onNode.close();
_reader.onData.listen((data) => _waterfall([document, booleanValue]))
..onDone(() => Future.wait([
_onBooleanValue.close(),
_onDocument.close(),
_onNode.close(),
]))
..onError(_closer.completeError);
return stream.pipe(_reader);
}
@override
Future close() {
return _closer.future;
}
Future<bool> expect(TokenType type) async {
var peek = await _reader.peek();
if (peek?.type != type) {
_errors.add(new SyntaxError.fromSourceLocation(
"Expected $type, found '${peek?.text ?? 'empty text'}' instead.",
peek?.span?.start));
return false;
} else {
await _reader.consume();
return true;
}
}
Future<bool> maybe(TokenType type) async {
var peek = await _reader.peek();
if (peek?.type == type) {
await _reader.consume();
return true;
}
return false;
}
Future<bool> nextIs(TokenType type) =>
_reader.peek().then((t) => t?.type == type);
Future<DocumentContext> document() async {
return null;
}
Future<BooleanValueContext> booleanValue() async {
if (await nextIs(TokenType.BOOLEAN)) {
var result = new BooleanValueContext(await _reader.consume());
_onBooleanValue.add(result);
return result;
}
return null;
}
}

View file

@ -4,10 +4,40 @@ import 'dart:collection';
class StreamReader<T> implements StreamConsumer<T> {
final Queue<T> _buffer = new Queue();
bool _closed = false;
T _current;
bool _onDataListening = false;
final Queue<Completer<T>> _nextQueue = new Queue();
final Queue<Completer<T>> _peekQueue = new Queue();
StreamController<T> _onData;
bool get isDone => _closed;
Stream<T> get onData => _onData.stream;
_onListen() {
_onDataListening = true;
}
_onPause() {
_onDataListening = false;
}
StreamReader() {
_onData = new StreamController<T>(
onListen: _onListen,
onResume: _onListen,
onPause: _onPause,
onCancel: _onPause);
}
Future<T> current() {
if (_current == null) {
if (_nextQueue.isNotEmpty) return _nextQueue.first.future;
return consume();
}
return new Future.value(_current);
}
Future<T> peek() {
if (isDone) throw new StateError('Cannot read from closed stream.');
@ -18,9 +48,13 @@ class StreamReader<T> implements StreamConsumer<T> {
return c.future;
}
Future<T> next() {
Future<T> consume() {
if (isDone) throw new StateError('Cannot read from closed stream.');
if (_buffer.isNotEmpty) return new Future.value(_buffer.removeFirst());
if (_buffer.isNotEmpty) {
_current = _buffer.removeFirst();
return close().then((_) => new Future.value(_current));
}
var c = new Completer<T>();
_nextQueue.addLast(c);
@ -34,19 +68,21 @@ class StreamReader<T> implements StreamConsumer<T> {
var c = new Completer();
stream.listen((data) {
if (_onDataListening) _onData.add(data);
if (_peekQueue.isNotEmpty || _nextQueue.isNotEmpty) {
if (_peekQueue.isNotEmpty) {
_peekQueue.removeFirst().complete(data);
}
if (_nextQueue.isNotEmpty) {
_nextQueue.removeFirst().complete(data);
_nextQueue.removeFirst().complete(_current = data);
}
} else {
_buffer.add(data);
}
})
..onDone(c.complete)
..onDone(() => close().then(c.complete))
..onError(c.completeError);
return c.future;
@ -54,7 +90,17 @@ class StreamReader<T> implements StreamConsumer<T> {
@override
Future close() async {
_closed = true;
if (_buffer.isEmpty && _nextQueue.isEmpty && _peekQueue.isEmpty) {
_closed = true;
kill(Completer c) {
c.completeError(new StateError(
'Reached end of stream, although more input was expected.'));
}
_peekQueue.forEach(kill);
_nextQueue.forEach(kill);
}
}
}