1 /*--------------------------------------------------------------------------
\r
2 * Copyright 2009 Taro L. Saito
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
15 *--------------------------------------------------------------------------*/
\r
16 //--------------------------------------
\r
19 // SilkStreamReader.java
\r
20 // Since: 2009/03/31 19:53:33
\r
24 //--------------------------------------
\r
25 package org.xerial.silk;
\r
27 import java.io.BufferedReader;
\r
28 import java.io.IOException;
\r
29 import java.io.InputStream;
\r
30 import java.io.InputStreamReader;
\r
31 import java.io.Reader;
\r
32 import java.net.URL;
\r
33 import java.util.concurrent.ArrayBlockingQueue;
\r
34 import java.util.concurrent.Callable;
\r
35 import java.util.concurrent.ExecutorService;
\r
36 import java.util.concurrent.Executors;
\r
37 import java.util.concurrent.TimeUnit;
\r
39 import org.xerial.core.XerialException;
\r
40 import org.xerial.util.ArrayDeque;
\r
41 import org.xerial.util.log.Logger;
\r
42 import org.xerial.util.tree.TreeEvent;
\r
43 import org.xerial.util.tree.TreeEventHandler;
\r
44 import org.xerial.util.tree.TreeStreamReader;
\r
47 * {@link TreeStreamReader} implementation for the Silk data format.
\r
52 public class SilkPullParser implements TreeStreamReader {
\r
53 private static Logger _logger = Logger.getLogger(SilkPullParser.class);
\r
55 private final SilkParser parser;
\r
56 private final ArrayBlockingQueue<TreeEvent> eventQueue = new ArrayBlockingQueue<TreeEvent>(
\r
58 private final ArrayDeque<TreeEvent> prefetchedEventQueue = new ArrayDeque<TreeEvent>();
\r
60 private long numReadLine = 0;
\r
62 // for changing push-parser to pull parser
\r
63 private final ExecutorService threadManager;
\r
66 * Creates a new reader with the specified input stream
\r
69 * `@throws IOException
\r
71 protected SilkPullParser(InputStream input) throws IOException {
\r
72 this(new InputStreamReader(input));
\r
76 * Creates a new reader with the specified reader
\r
79 * @throws IOException
\r
81 protected SilkPullParser(Reader input) throws IOException {
\r
82 this(input, SilkEnv.newEnv(), new SilkParserConfig());
\r
85 public SilkPullParser(Reader input, SilkEnv env) throws IOException {
\r
86 this(input, env, new SilkParserConfig());
\r
90 * Creates a new reader inherited the given environment
\r
94 * @throws IOException
\r
96 public SilkPullParser(Reader input, SilkEnv env, SilkParserConfig config) throws IOException {
\r
97 this.parser = new SilkParser(input, env, config);
\r
99 this.threadManager = Executors.newFixedThreadPool(1);
\r
100 threadManager.submit(new BackgroundParser());
\r
105 * Create a new reader for reading local resources
\r
107 * @param resourceBasePath
\r
108 * @param resourceName
\r
109 * @throws IOException
\r
111 public SilkPullParser(String resourceBasePath, String resourceName) throws IOException {
\r
112 this(new BufferedReader(new InputStreamReader(SilkWalker.class
\r
113 .getResourceAsStream(SilkParser.getResourcePath(resourceBasePath, resourceName)))),
\r
114 SilkEnv.newEnv(resourceBasePath));
\r
118 * Create a new reader for reading the specified resource URL
\r
120 * @param resourcePath
\r
121 * @throws IOException
\r
123 public SilkPullParser(URL resourcePath) throws IOException {
\r
124 this(resourcePath, SilkEnv.newEnv());
\r
127 public SilkPullParser(URL resource, SilkEnv env) throws IOException {
\r
128 this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env,
\r
129 SilkParser.getResourceBasePath(resource)));
\r
132 public SilkPullParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException {
\r
133 this(new BufferedReader(new InputStreamReader(resource.openStream())), SilkEnv.newEnv(env,
\r
134 SilkParser.getResourceBasePath(resource)), config);
\r
137 public SilkPullParser(URL resource, SilkParserConfig config) throws IOException {
\r
138 this(resource, SilkEnv.newEnv(), config);
\r
141 private class BackgroundParser implements Callable<Void> {
\r
143 public Void call() throws Exception {
\r
145 parser.parse(new TreeEventHandler() {
\r
146 public void finish() throws Exception {
\r
147 hasParsingFinished = true;
\r
150 public void init() throws Exception {
\r
151 hasParsingFinished = false;
\r
154 public void leaveNode(String nodeName) throws Exception {
\r
155 eventQueue.put(TreeEvent.newLeaveEvent(nodeName));
\r
158 public void text(String nodeName, String textDataFragment) throws Exception {
\r
159 eventQueue.put(TreeEvent.newTextEvent(nodeName, textDataFragment));
\r
162 public void visitNode(String nodeName, String immediateNodeValue)
\r
164 eventQueue.put(TreeEvent.newVisitEvent(nodeName, immediateNodeValue));
\r
170 catch (Exception e) {
\r
171 hasParsingFinished = true;
\r
175 threadManager.shutdown();
\r
181 public TreeEvent peekNext() throws XerialException {
\r
183 return prefetchedEventQueue.getFirst();
\r
188 public TreeEvent next() throws XerialException {
\r
190 TreeEvent e = prefetchedEventQueue.removeFirst();
\r
198 * Has finished reading the stream?
\r
200 private volatile boolean hasParsingFinished = false;
\r
201 private boolean hasPrefetchFinished = false;
\r
204 * Is next event available?
\r
206 * @return true if there are remaining events, otherwise fales
\r
207 * @throws XerialException
\r
209 private boolean hasNext() throws XerialException {
\r
210 if (!prefetchedEventQueue.isEmpty())
\r
213 if (hasPrefetchFinished)
\r
216 if (hasParsingFinished) {
\r
217 int count = eventQueue.drainTo(prefetchedEventQueue);
\r
218 hasPrefetchFinished = true;
\r
223 TreeEvent e = null;
\r
224 while (!hasParsingFinished && (e = eventQueue.poll(1, TimeUnit.SECONDS)) == null) {}
\r
227 prefetchedEventQueue.addLast(e);
\r
233 catch (InterruptedException e) {
\r
240 public long getNumReadLine() {
\r
241 return numReadLine;
\r