+++ /dev/null
-/*--------------------------------------------------------------------------\r
- * Copyright 2009 Taro L. Saito\r
- *\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- *--------------------------------------------------------------------------*/\r
-//--------------------------------------\r
-// XerialJ\r
-//\r
-// SilkPullParser.java\r
-// Since: Jan 28, 2009 1:00:02 PM\r
-//\r
-// $URL: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core/src/main/java/org/xerial/silk/SilkLinePullParser.java $\r
-// $Author: leo $\r
-//--------------------------------------\r
-package org.xerial.silk;\r
-\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.InputStreamReader;\r
-import java.io.Reader;\r
-import java.net.URL;\r
-import java.util.concurrent.ArrayBlockingQueue;\r
-import java.util.concurrent.Callable;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Executors;\r
-import java.util.concurrent.Future;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import org.xerial.core.XerialException;\r
-import org.xerial.silk.impl.SilkElement;\r
-import org.xerial.util.ArrayDeque;\r
-import org.xerial.util.log.Logger;\r
-\r
-/**\r
- * Pull parser of the Silk format. Pull-style means each parsing event is\r
- * generated when next() method is called, suited to stream processing.\r
- * \r
- * @author leo\r
- * \r
- */\r
-public class SilkLinePullParser\r
-{\r
- private static Logger _logger = Logger.getLogger(SilkLinePullParser.class);\r
- private static final SilkEvent EOFEvent = new SilkEvent(SilkEventType.END_OF_FILE, null);\r
- private static final SilkEvent BlankLineEvent = new SilkEvent(SilkEventType.BLANK_LINE, null);\r
-\r
- private int lineCount = 0;\r
-\r
- private final int eventQueueMax = 10000;\r
- volatile private boolean foundEOF = false;\r
-\r
- private ArrayBlockingQueue<SilkEvent> eventQueue = new ArrayBlockingQueue<SilkEvent>(eventQueueMax);\r
-\r
- /**\r
- * SilkEvents\r
- * \r
- * @author leo\r
- * \r
- */\r
- private static class EventItem\r
- {\r
- SilkEventType event;\r
- SilkElement element;\r
-\r
- public EventItem(SilkEventType event, SilkElement element)\r
- {\r
- this.event = event;\r
- this.element = element;\r
- }\r
-\r
- public EventItem(SilkEventType event)\r
- {\r
- this.event = event;\r
- this.element = null;\r
- }\r
- }\r
-\r
- private SilkLinePushParser parser;\r
- private ExecutorService threadPool;\r
- private Future<Boolean> future;\r
-\r
- public SilkLinePullParser(URL silkURL) throws IOException\r
- {\r
- this(silkURL.openStream());\r
- }\r
-\r
- public SilkLinePullParser(InputStream input) throws IOException\r
- {\r
- this(new InputStreamReader(input));\r
- }\r
-\r
- public SilkLinePullParser(Reader input) throws IOException\r
- {\r
- threadPool = Executors.newFixedThreadPool(1);\r
-\r
- parser = new SilkLinePushParser(input);\r
- future = threadPool.submit(new SilkEventProducer());\r
- }\r
-\r
- private class SilkEventProducer implements Callable<Boolean>, SilkEventHandler\r
- {\r
- public SilkEventProducer()\r
- {}\r
-\r
- public void handle(SilkEvent event) throws XerialException\r
- {\r
- try\r
- {\r
- if (!Thread.currentThread().isInterrupted())\r
- eventQueue.put(event);\r
- }\r
- catch (InterruptedException e)\r
- {\r
-\r
- }\r
- }\r
-\r
- public Boolean call() throws Exception\r
- {\r
- parser.parse(this);\r
- foundEOF = true;\r
- threadPool.shutdownNow();\r
- return true;\r
- }\r
-\r
- }\r
-\r
- private ArrayDeque<SilkEvent> prefetchedEventQueue = new ArrayDeque<SilkEvent>();\r
-\r
- public boolean hasNext() throws XerialException\r
- {\r
- if (!prefetchedEventQueue.isEmpty())\r
- return true;\r
-\r
- if (foundEOF)\r
- {\r
- return !eventQueue.isEmpty();\r
- }\r
-\r
- fetchNext();\r
-\r
- return hasNext();\r
- }\r
-\r
- public SilkEvent next() throws XerialException\r
- {\r
- if (!prefetchedEventQueue.isEmpty())\r
- return prefetchedEventQueue.removeFirst();\r
-\r
- if (foundEOF)\r
- return eventQueue.poll();\r
-\r
- fetchNext();\r
-\r
- return next();\r
- }\r
-\r
- private void fetchNext() throws XerialException\r
- {\r
- try\r
- {\r
- SilkEvent e = null;\r
- while (!foundEOF && (e = eventQueue.poll(1, TimeUnit.MILLISECONDS)) == null)\r
- {}\r
- if (e != null)\r
- prefetchedEventQueue.addLast(e);\r
- }\r
- catch (InterruptedException e)\r
- {\r
- foundEOF = true;\r
- }\r
- return;\r
- }\r
-\r
- public long getNumReadLine()\r
- {\r
- return parser.getNumReadLine();\r
- }\r
-\r
-}\r