\r
import org.xerial.silk.impl.SilkNode;\r
\r
-public class SilkContext\r
+class SilkContext\r
{\r
public final SilkNode contextNode;\r
boolean isOpen;\r
private static Logger _logger = Logger.getLogger(SilkLineFastParser.class);
private final BufferedReader buffer;
-
- private int numLinesPerJob = 5000;
- private int numWorkers = 4;
private final ExecutorService threadManager;
-
- private final LinkedBlockingQueue<Future<List<SilkEvent>>> eventContainer = new LinkedBlockingQueue<Future<List<SilkEvent>>>(
- numWorkers);
-
- private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1M
+ private final LinkedBlockingQueue<Future<List<SilkEvent>>> eventContainer;
+ private final SilkParserConfig config;
public SilkLineFastParser(URL resourceURL) throws IOException
{
- this(resourceURL, DEFAULT_BUFFER_SIZE);
+ this(resourceURL, new SilkParserConfig());
}
- public SilkLineFastParser(URL resourceURL, int bufferSize) throws IOException
+ public SilkLineFastParser(URL resourceURL, SilkParserConfig config) throws IOException
{
- this(new InputStreamReader(resourceURL.openStream()), bufferSize);
+ this(new InputStreamReader(resourceURL.openStream()), config);
}
public SilkLineFastParser(Reader reader)
{
- this(reader, DEFAULT_BUFFER_SIZE);
+ this(reader, new SilkParserConfig());
}
- public SilkLineFastParser(Reader reader, int bufferSize)
+ public SilkLineFastParser(Reader reader, SilkParserConfig config)
{
+ this.config = config;
+ this.eventContainer = new LinkedBlockingQueue<Future<List<SilkEvent>>>(config.numWorkers);
+
if (reader.getClass().isAssignableFrom(BufferedReader.class))
buffer = BufferedReader.class.cast(reader);
else
- buffer = new BufferedReader(reader, bufferSize);
+ buffer = new BufferedReader(reader, config.bufferSize);
- threadManager = Executors.newFixedThreadPool(numWorkers + 1);
+ threadManager = Executors.newFixedThreadPool(config.numWorkers + 1);
}
private volatile boolean foundEOF = false;
while (!foundEOF)
{
- ArrayList<String> cache = new ArrayList<String>(numLinesPerJob);
+ ArrayList<String> cache = new ArrayList<String>(config.numLinesInBlock);
int lineCount = 0;
- while (lineCount < numLinesPerJob)
+ while (lineCount < config.numLinesInBlock)
{
lineCount++;
String line = buffer.readLine();
private final BufferedReader buffer;
private long lineCount = 0;
private SilkEventHandler handler = null;
+ private final SilkParserConfig config;
private static final SilkEvent EOFEvent = new SilkEvent(SilkEventType.END_OF_FILE, null);
private static final SilkEvent BlankLineEvent = new SilkEvent(SilkEventType.BLANK_LINE, null);
this(new InputStreamReader(resourceURL.openStream()));
}
- public SilkLinePushParser(URL resourceURL, int bufferSize) throws IOException
+ public SilkLinePushParser(URL resourceURL, SilkParserConfig config) throws IOException
{
- this(new InputStreamReader(resourceURL.openStream()), bufferSize);
+ this(new InputStreamReader(resourceURL.openStream()), config);
}
public SilkLinePushParser(Reader reader)
{
- this(reader, 1024 * 1024); // 1MB
+ this(reader, new SilkParserConfig()); // 1MB
}
- public SilkLinePushParser(Reader reader, int bufferSize)
+ public SilkLinePushParser(Reader reader, SilkParserConfig config)
{
+ this.config = config;
+
if (reader.getClass().isAssignableFrom(BufferedReader.class))
buffer = BufferedReader.class.cast(reader);
else
- buffer = new BufferedReader(reader, bufferSize);
+ buffer = new BufferedReader(reader, config.bufferSize);
lexer = new SilkLexer();
}
{
private static Logger _logger = Logger.getLogger(SilkStreamReader.class);
- private final SilkLinePushParser parser;
+ private final SilkLineFastParser parser;
private final SilkEnv parseContext;
private TreeEventQueue eventQueue = new TreeEventQueue();
private final ArrayDeque<TreeStreamReader> readerStack = new ArrayDeque<TreeStreamReader>();
private long numReadLine = 0;
+ private final SilkParserConfig config;
+
/**
* Creates a new reader with the specified reader
*
*/
private SilkParser(Reader input, SilkEnv env) throws IOException
{
- this.parser = new SilkLinePushParser(input);
+ this.config = new SilkParserConfig();
+ this.parser = new SilkLineFastParser(input);
this.parseContext = env;
}
this(resourcePath, SilkEnv.newEnv());
}
- public SilkParser(URL resourcePath, int bufferSize) throws IOException
+ public SilkParser(URL resourcePath, SilkParserConfig config) throws IOException
{
- this(resourcePath, SilkEnv.newEnv(), bufferSize);
+ this(resourcePath, SilkEnv.newEnv(), config);
}
public SilkParser(URL resource, SilkEnv env) throws IOException
{
- this(resource, env, 1024 * 1024);
+ this(resource, env, new SilkParserConfig());
}
- public SilkParser(URL resource, SilkEnv env, int bufferSize) throws IOException
+ public SilkParser(URL resource, SilkEnv env, SilkParserConfig config) throws IOException
{
+ this.config = config;
String path = resource.toExternalForm();
int fileNamePos = path.lastIndexOf("/");
String resourceBasePath = fileNamePos > 0 ? path.substring(0, fileNamePos) : null;
- this.parser = new SilkLinePushParser(new InputStreamReader(resource.openStream()), bufferSize);
+ this.parser = new SilkLineFastParser(new InputStreamReader(resource.openStream()), config);
this.parseContext = SilkEnv.newEnv(env, resourceBasePath);
}
}
catch (JSONException e)
{
- throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", parser.getNumReadLine(), e
- .getMessage()));
+ throw new XerialException(e.getErrorCode(), String.format("line=%d: %s", numReadLine, e.getMessage()));
}
}
--- /dev/null
+/*--------------------------------------------------------------------------
+ * Copyright 2009 Taro L. Saito
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *--------------------------------------------------------------------------*/
+//--------------------------------------
+// XerialJ
+//
+// SilkParserConfig.java
+// Since: Jun 3, 2009 5:33:54 PM
+//
+// $URL$
+// $Author$
+//--------------------------------------
+package org.xerial.silk;
+
+/**
+ * SilkParserConfig
+ *
+ * @author leo
+ *
+ */
+public class SilkParserConfig
+{
+ public int bufferSize = 1024 * 1024; // 1M
+ public int numWorkers = 2;
+ public int numLinesInBlock = 1000;
+
+}
import java.io.File;
import java.io.FileReader;
+import org.xerial.lens.ObjectLens;
import org.xerial.silk.SilkEvent;
import org.xerial.silk.SilkEventHandler;
import org.xerial.silk.SilkEventType;
import org.xerial.silk.SilkLineFastParser;
import org.xerial.silk.SilkLinePushParser;
import org.xerial.silk.SilkParser;
+import org.xerial.silk.SilkParserConfig;
import org.xerial.util.StopWatch;
import org.xerial.util.log.Logger;
import org.xerial.util.opt.Argument;
@Option(symbol = "b", longName = "buffer", description = "buffer size in MB (default = 1)")
private int bufferSizeInMB = 1;
+ @Option(symbol = "n", longName = "thread", description = "num workder threads")
+ private int numThreads = 2;
+
+ @Option(symbol = "c", longName = "lines", description = "num assigned lines for each worker threads")
+ private int numLines = 1000;
+
private void reportReadSpeed(double time, long fileSize)
{
double speedInMBS = fileSize / 1024 / 1024 / time;
- _logger.info(String.format("time=%.2f, %3.2f MB/s", time, speedInMBS));
+ _logger.info(String.format("\ntime=%.2f, %3.2f MB/s", time, speedInMBS));
}
+ private void reportLinesPerSec(double time, long lineCount)
+ {
+ double speed = lineCount / time;
+ System.err.print(String.format("time=%5.2f line=%,10d %,10.0f lines/s\r", time, lineCount, speed));
+ }
+
public void execute() throws Exception
{
File f = new File(inputSilkFile);
final long fileSize = f.length();
- int bufferSize = bufferSizeInMB * 1024 * 1024;
+ final SilkParserConfig config = new SilkParserConfig();
+ config.bufferSize = bufferSizeInMB * 1024 * 1024;
+ config.numWorkers = numThreads;
+ config.numLinesInBlock = numLines;
+
+ _logger.info("config: " + ObjectLens.toJSON(config));
switch (mode)
{
case NODE:
{
- SilkParser parser = new SilkParser(f.toURL(), bufferSize);
+ SilkParser parser = new SilkParser(f.toURL(), config);
parser.parse(new TreeEventHandlerBase() {
{
double time = timer.getElapsedTime();
double speed = count / time;
- _logger.info(String.format("node=%,15d time=%5.2f %,10.0f nodes/s", count, time, speed));
+ System.err.print(String.format("node=%,15d time=%5.2f %,10.0f nodes/s\r", count, time, speed));
}
}
double time = timer.getElapsedTime();
double speedPerNode = ((double) count) / time;
double speedInMBS = fileSize / 1024 / 1024 / time;
- _logger
- .info(String
- .format("time=%.2f %,10.0f nodes/s, %3.2f MB/s", time, speedPerNode, speedInMBS));
+ _logger.info(String.format("\ntime=%.2f %,10.0f nodes/s, %3.2f MB/s", time, speedPerNode,
+ speedInMBS));
}
});
}
case LINE:
{
- SilkLinePushParser parser = new SilkLinePushParser(f.toURL(), bufferSize);
+ SilkLinePushParser parser = new SilkLinePushParser(f.toURL(), config);
parser.parse(new SilkEventHandler() {
int lineCount = 0;
lineCount++;
if (lineCount % 100000 == 0)
{
- double time = timer.getElapsedTime();
- double speed = lineCount / time;
- _logger.info(String.format("time=%5.2f line=%,10d %,10.0f lines/s", time, lineCount, speed));
+ reportLinesPerSec(timer.getElapsedTime(), lineCount);
}
}
}
case FASTLINE:
{
- SilkLineFastParser parser = new SilkLineFastParser(f.toURL(), bufferSize);
+ SilkLineFastParser parser = new SilkLineFastParser(f.toURL(), config);
parser.parse(new SilkEventHandler() {
int lineCount = 0;
lineCount++;
if (lineCount % 100000 == 0)
{
- double time = timer.getElapsedTime();
- double speed = lineCount / time;
- _logger.info(String.format("time=%5.2f line=%,10d %,10.0f lines/s", time, lineCount, speed));
+ reportLinesPerSec(timer.getElapsedTime(), lineCount);
}
}
}
case READONLY:
{
- BufferedReader reader = new BufferedReader(new FileReader(f), bufferSize);
+ BufferedReader reader = new BufferedReader(new FileReader(f), config.bufferSize);
String line;
int lineCount = 0;
lineCount++;
if (lineCount % 100000 == 0)
{
- double time = timer.getElapsedTime();
- double speed = lineCount / time;
- _logger.info(String.format("time=%5.2f, line=%,10d, %,10.0f lines/s", time, lineCount, speed));
+ reportLinesPerSec(timer.getElapsedTime(), lineCount);
}
}
private static final double largeFileLines = 111965;\r
private static final int numNodes = 5826313;\r
\r
+ private SilkParserConfig config;\r
+\r
@Before\r
public void setUp() throws Exception\r
- {}\r
+ {\r
+ config = new SilkParserConfig();\r
+ config.bufferSize = 1024 * 1024 * 16; // 16MB\r
+ }\r
\r
@After\r
public void tearDown() throws Exception\r
speedPerNode, speedInMBS, speedPerLine));\r
}\r
\r
- private final int bufferSize = 1024 * 1024 * 16;\r
-\r
@Test\r
public void maxReadSpeed() throws Exception\r
{\r
- BufferedReader reader = new BufferedReader(new InputStreamReader(largeFile.openStream()), bufferSize);\r
+ BufferedReader reader = new BufferedReader(new InputStreamReader(largeFile.openStream()), config.bufferSize);\r
int lineCount = 0;\r
String line = null;\r
StopWatch timer = new StopWatch();\r
int lineCount = 0;\r
\r
StopWatch timer = new StopWatch();\r
- char[] buf = new char[bufferSize];\r
+ char[] buf = new char[config.bufferSize];\r
int numBytes = 0;\r
int numReadBytes = 0;\r
- while ((numReadBytes = reader.read(buf, 0, bufferSize)) != -1)\r
+ while ((numReadBytes = reader.read(buf, 0, config.bufferSize)) != -1)\r
{\r
numBytes += numReadBytes;\r
}\r
\r
int count = 0;\r
\r
- @Ignore\r
@Test\r
public void parserPerformance() throws Exception\r
{\r
- SilkParser parser = new SilkParser(largeFile, bufferSize);\r
+ SilkParser parser = new SilkParser(largeFile, config);\r
final StopWatch timer = new StopWatch();\r
\r
count = 0;\r
@Test\r
public void pushParserPerformance() throws Exception\r
{\r
- final SilkLinePushParser reader = new SilkLinePushParser(largeFile, bufferSize);\r
+ final SilkLinePushParser reader = new SilkLinePushParser(largeFile, config);\r
final StopWatch timer = new StopWatch();\r
\r
reader.parse(new SilkEventHandler() {\r
@Test\r
public void fastPushParserPerformance() throws Exception\r
{\r
- SilkLineFastParser parser = new SilkLineFastParser(largeFile, bufferSize);\r
+ SilkLineFastParser parser = new SilkLineFastParser(largeFile, config);\r
StopWatch timer = new StopWatch();\r
parser.parse(new SilkEventHandler() {\r
\r