1 /*--------------------------------------------------------------------------
\r
2 * Copyright 2008 Taro L. Saito
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
15 *--------------------------------------------------------------------------*/
\r
16 //--------------------------------------
\r
17 // Xerial Raquel Project
\r
19 // StreamAmoebaJoin.java
\r
20 // Since: 2008/11/20 22:07:28
\r
22 // $URL: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core/src/main/java/org/xerial/lens/relation/query/StreamAmoebaJoin.java $
\r
24 //--------------------------------------
\r
25 package org.xerial.lens.relation.query;
\r
27 import java.io.IOException;
\r
28 import java.util.ArrayList;
\r
29 import java.util.HashMap;
\r
30 import java.util.Iterator;
\r
31 import java.util.List;
\r
32 import java.util.Map.Entry;
\r
34 import org.xerial.core.XerialError;
\r
35 import org.xerial.core.XerialErrorCode;
\r
36 import org.xerial.core.XerialException;
\r
37 import org.xerial.lens.ObjectLens;
\r
38 import org.xerial.lens.relation.Node;
\r
39 import org.xerial.lens.relation.TupleIndex;
\r
40 import org.xerial.lens.relation.Node.NodeBuilder;
\r
41 import org.xerial.lens.relation.schema.Schema;
\r
42 import org.xerial.util.ArrayDeque;
\r
43 import org.xerial.util.Deque;
\r
44 import org.xerial.util.HashedDeque;
\r
45 import org.xerial.util.graph.Edge;
\r
46 import org.xerial.util.graph.Lattice;
\r
47 import org.xerial.util.graph.LatticeCursor;
\r
48 import org.xerial.util.graph.LatticeNode;
\r
49 import org.xerial.util.log.Logger;
\r
50 import org.xerial.util.tree.TreeEventHandler;
\r
51 import org.xerial.util.tree.TreeParser;
\r
55 * DFA-based amoeba join processing algorithm, which consumes XML streams
\r
61 public class StreamAmoebaJoin {
\r
62 public static final String ALTERNATIVE_ATTRIBUTE_SYMBOL = "-";
\r
64 private static Logger _logger = Logger.getLogger(StreamAmoebaJoin.class);
\r
65 private static Logger _logger2 = Logger.getLogger(StreamAmoebaJoin.class, "lattice");
\r
67 final QuerySet query;
\r
68 final AmoebaJoinHandler handler;
\r
70 private final static String EMPTY_NODE_NAME = "";
\r
72 // for running amoeba join
\r
73 private long nodeCount = -1;
\r
74 private Lattice<String> nodeNameLattice = new Lattice<String>();
\r
75 private LatticeCursor<String> latticeCursor;
\r
77 private Deque<String> currentPath = new ArrayDeque<String>();
\r
78 private Deque<LatticeNode<String>> stateStack = new ArrayDeque<LatticeNode<String>>();
\r
80 // HashedChainMap<String, XMLNode> nodeStackOfEachTag = new HashedChainMap<String, XMLNode>();
\r
81 private HashedDeque<String, Node> nodeStackOfEachTag = new HashedDeque<String, Node>();
\r
83 private HashMap<Edge, List<Operation>> operationSetOnForward = new HashMap<Edge, List<Operation>>();
\r
84 private HashMap<Edge, List<Operation>> operationSetOnBack = new HashMap<Edge, List<Operation>>();
\r
85 private HashMap<Edge, List<TextOperation>> operatSetOnText = new HashMap<Edge, List<TextOperation>>();
\r
87 private int attributeAmoebaSize = 1;
\r
89 public StreamAmoebaJoin(QuerySet query, AmoebaJoinHandler handler) throws IOException {
\r
91 this.handler = handler;
\r
94 throw new XerialError(XerialErrorCode.INVALID_INPUT, "query set is null");
\r
97 static interface TextOperation {
\r
98 void execute(String testNodeName, String textData) throws Exception;
\r
101 class SimpleTextOperation implements TextOperation {
\r
102 final Schema schema;
\r
103 final String coreNodeName;
\r
105 public SimpleTextOperation(Schema schema, String contextNodeName) {
\r
106 this.schema = schema;
\r
107 this.coreNodeName = contextNodeName;
\r
110 public SimpleTextOperation(PushRelation pr) {
\r
111 this.schema = pr.schema;
\r
112 this.coreNodeName = pr.coreNodeName;
\r
115 public void execute(String textNodeName, String textData) throws Exception {
\r
116 Deque<Node> nodeStack = getNodeStack(coreNodeName);
\r
117 Node contextNode = nodeStack.getLast();
\r
119 Deque<Node> textNodeStack = getNodeStack(textNodeName);
\r
120 Node textNode = textNodeStack.getLast();
\r
122 handler.text(schema, contextNode, textNode, textData);
\r
126 class ContextBasedTextOperation implements TextOperation {
\r
127 final HashMap<String, TextOperation> coreNode_action = new HashMap<String, TextOperation>();
\r
129 public ContextBasedTextOperation(ScopedPushRelation scopedPushOperation) {
\r
130 for (Entry<String, PushRelation> each : scopedPushOperation.coreNode_action.entrySet()) {
\r
131 coreNode_action.put(each.getKey(), new SimpleTextOperation(each.getValue()));
\r
135 public void execute(String nodeName, String textData) throws Exception {
\r
137 for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()
\r
138 && hop <= attributeAmoebaSize; hop++) {
\r
139 String contextNode = it.next();
\r
140 if (coreNode_action.containsKey(contextNode)) {
\r
141 coreNode_action.get(contextNode).execute(nodeName, textData);
\r
150 * Defines an operation assigned to an currentEdge of the node name lattice
\r
155 static interface Operation {
\r
156 void execute() throws Exception;
\r
159 class PushRelation implements Operation {
\r
160 final Schema schema;
\r
161 final String coreNodeName;
\r
162 final String attributeNodeName;
\r
163 final String newlyFoundNodeName;
\r
165 public PushRelation(Schema schema, String previouslyFoundTag, String newlyFoundTag) {
\r
166 this.schema = schema;
\r
167 this.newlyFoundNodeName = newlyFoundTag;
\r
169 if (isCoreNodeIndex(schema.getNodeIndex(previouslyFoundTag))) {
\r
170 this.coreNodeName = previouslyFoundTag;
\r
171 this.attributeNodeName = newlyFoundTag;
\r
173 else if (isCoreNodeIndex(schema.getNodeIndex(newlyFoundTag))) {
\r
174 this.coreNodeName = newlyFoundTag;
\r
175 this.attributeNodeName = previouslyFoundTag;
\r
178 throw new XerialError(XerialErrorCode.INVALID_STATE, "no core node in " + schema);
\r
181 public void execute() throws Exception {
\r
182 Node coreNode = getNodeStack(coreNodeName).getLast();
\r
183 Node attributeNode = getNodeStack(attributeNodeName).getLast();
\r
185 if (_logger.isTraceEnabled())
\r
186 _logger.trace(String.format("push:(%s, %s)", coreNode, attributeNode));
\r
188 handler.newAmoeba(schema, coreNode, attributeNode);
\r
192 public String toString() {
\r
193 return String.format("push: %s for (%s, %s)", schema, coreNodeName, attributeNodeName);
\r
197 class ScopedPushRelation implements Operation {
\r
198 final HashMap<String, PushRelation> coreNode_action = new HashMap<String, PushRelation>();
\r
200 public ScopedPushRelation(List<PushRelation> candidates) {
\r
201 for (PushRelation each : candidates) {
\r
202 Schema s = each.schema;
\r
203 coreNode_action.put(each.coreNodeName, each);
\r
207 public void execute() throws Exception {
\r
209 for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()
\r
210 && hop <= attributeAmoebaSize; hop++) {
\r
211 String contextNode = it.next();
\r
212 if (coreNode_action.containsKey(contextNode)) {
\r
213 coreNode_action.get(contextNode).execute();
\r
218 // throw new XerialError(XerialErrorCode.INVALID_STATE, String.format("no action is invoked: path=%s %s",
\r
219 // currentPath, coreNode_action));
\r
224 class ScopedPopRelation implements Operation {
\r
225 final HashMap<String, PopRelation> coreNode_action = new HashMap<String, PopRelation>();
\r
227 public ScopedPopRelation(List<PushRelation> candidates) {
\r
228 for (PushRelation each : candidates) {
\r
229 Schema s = each.schema;
\r
231 coreNode_action.put(each.coreNodeName, new PopRelation(s, each.newlyFoundNodeName));
\r
235 public void execute() throws Exception {
\r
237 for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()
\r
238 && hop <= attributeAmoebaSize; hop++) {
\r
239 String contextNode = it.next();
\r
240 if (coreNode_action.containsKey(contextNode)) {
\r
241 coreNode_action.get(contextNode).execute();
\r
246 // throw new XerialError(XerialErrorCode.INVALID_STATE, String.format("no action is invoked: path=%s %s",
\r
247 // currentPath, coreNode_action));
\r
252 class PopRelation implements Operation {
\r
253 final Schema schema;
\r
254 final String poppedTag;
\r
256 public PopRelation(Schema schema, String poppedTag) {
\r
257 this.schema = schema;
\r
258 this.poppedTag = poppedTag;
\r
261 public void execute() throws Exception {
\r
262 Node poppedNode = getNodeStack(poppedTag).getLast();
\r
263 handler.leaveNode(schema, poppedNode);
\r
264 //container.pop(poppedNode);
\r
269 class PushLoopedRelation implements Operation {
\r
270 final Schema schema;
\r
271 final String tagName;
\r
273 public PushLoopedRelation(Schema schema, String tagName) {
\r
274 this.schema = schema;
\r
275 this.tagName = tagName;
\r
278 public void execute() throws Exception {
\r
279 Deque<Node> nodeStack = getNodeStack(tagName);
\r
280 if (nodeStack.size() < 2)
\r
281 throw new XerialError(XerialErrorCode.INVALID_STATE);
\r
283 Iterator<Node> reverseCursor = nodeStack.descendingIterator();
\r
284 Node newlyFoundNode = reverseCursor.next();
\r
285 Node previouslyFoundNode = reverseCursor.next();
\r
287 if (_logger.isTraceEnabled())
\r
288 _logger.trace(String.format("loop back: %s and %s", previouslyFoundNode,
\r
291 handler.newAmoeba(schema, previouslyFoundNode, newlyFoundNode);
\r
296 public Deque<Node> getNodeStack(String nodeName) {
\r
297 return nodeStackOfEachTag.get(nodeName);
\r
300 public static String sanitize(String nodeName) {
\r
301 return ObjectLens.getCanonicalParameterName(nodeName);
\r
305 * Body of depth-first tree traverser
\r
310 private class AmoebaFinder implements TreeEventHandler {
\r
312 public void finish() throws Exception {
\r
314 if (_logger.isTraceEnabled())
\r
315 _logger.trace("sweep finished");
\r
319 public void init() throws Exception {
\r
321 latticeCursor = nodeNameLattice.cursor();
\r
322 stateStack.addLast(latticeCursor.getNode());
\r
326 visitNode("root", null);
\r
329 public void visitNode(String nodeName, String nodeValue) throws Exception {
\r
330 nodeName = sanitize(nodeName);
\r
332 Node currentNode = new NodeBuilder(nodeName).nodeID(++nodeCount).nodeValue(nodeValue)
\r
334 Deque<Node> nodeStack = getNodeStack(nodeName);
\r
335 nodeStack.add(currentNode);
\r
338 LatticeNode<String> prevState = latticeCursor.getNode();
\r
339 LatticeNode<String> nextState = latticeCursor.next(nodeName);
\r
340 stateStack.addLast(nextState);
\r
341 currentPath.addLast(nodeName != null ? nodeName : EMPTY_NODE_NAME);
\r
345 if (query.isTreeNode(nodeName)) {
\r
346 throw new XerialError(XerialErrorCode.UNSUPPORTED, "tree not is not supported yet");
\r
351 public void text(String nodeName, String textDataFragment) throws Exception {
\r
352 nodeName = sanitize(nodeName);
\r
354 Iterator<LatticeNode<String>> it = stateStack.descendingIterator();
\r
355 LatticeNode<String> currentState = it.next();
\r
356 LatticeNode<String> prevState = it.next();
\r
358 Edge currentEdge = new Edge(prevState.getID(), currentState.getID());
\r
359 List<TextOperation> textOperation = operatSetOnText.get(currentEdge);
\r
361 // generate a text operation set
\r
362 if (textOperation == null) {
\r
363 textOperation = new ArrayList<TextOperation>();
\r
364 operatSetOnText.put(currentEdge, textOperation);
\r
366 List<Operation> forwardAction = getForwardActionList(prevState, currentState,
\r
368 for (Operation each : forwardAction) {
\r
369 if (each instanceof PushRelation) {
\r
370 textOperation.add(new SimpleTextOperation((PushRelation) each));
\r
372 else if (each instanceof ScopedPushRelation) {
\r
373 textOperation.add(new ContextBasedTextOperation((ScopedPushRelation) each));
\r
376 throw new XerialError(XerialErrorCode.INVALID_STATE, "unknown operation: "
\r
381 assert textOperation != null;
\r
384 for (TextOperation each : textOperation)
\r
385 each.execute(nodeName, textDataFragment);
\r
387 catch (Exception e) {
\r
388 if (e instanceof XerialException)
\r
389 throw (XerialException) e;
\r
391 throw new XerialException(XerialErrorCode.INHERITED, e);
\r
395 public void leaveNode(String nodeName) throws Exception {
\r
396 nodeName = sanitize(nodeName);
\r
398 Deque<Node> nodeStack = getNodeStack(nodeName);
\r
399 Node currentNode = nodeStack.getLast();
\r
404 catch (Exception e) {
\r
405 if (e instanceof XerialException)
\r
406 throw (XerialException) e;
\r
407 else if (e instanceof XerialError)
\r
408 throw (XerialError) e;
\r
410 throw new XerialException(XerialErrorCode.INHERITED, e);
\r
413 currentPath.removeLast();
\r
415 nodeStack.removeLast();
\r
418 private List<Operation> getForwardActionList(LatticeNode<String> prevState,
\r
419 LatticeNode<String> nextState, String nodeName) {
\r
420 Edge currentEdge = new Edge(prevState.getID(), nextState.getID());
\r
422 List<Operation> actionList = operationSetOnForward.get(currentEdge);
\r
423 if (actionList != null)
\r
426 int prevNodeID = currentEdge.getSourceNodeID();
\r
427 int nextNodeID = currentEdge.getDestNodeID();
\r
429 // lazily prepare the action list
\r
430 actionList = new ArrayList<Operation>();
\r
431 operationSetOnForward.put(currentEdge, actionList);
\r
432 List<Operation> backActionList = new ArrayList<Operation>();
\r
433 operationSetOnBack.put(new Edge(nextNodeID, prevNodeID), backActionList);
\r
435 // search for the corresponding relations to newly found two node pair
\r
436 String newlyFoundTag = nodeName;
\r
438 if (_logger2.isTraceEnabled())
\r
439 _logger2.trace("create actions for " + newlyFoundTag);
\r
441 if (prevNodeID != nextNodeID) {
\r
442 List<PushRelation> foundAction = new ArrayList<PushRelation>();
\r
443 // (core node, attribute node)
\r
444 for (Schema r : query.getTargetQuerySet()) {
\r
445 TupleIndex ni = r.getNodeIndex(newlyFoundTag);
\r
449 for (String previouslyFoundNode : nextState) {
\r
450 TupleIndex pi = r.getNodeIndex(previouslyFoundNode);
\r
454 if (previouslyFoundNode.equals(newlyFoundTag))
\r
457 if (!(isCoreNodeIndex(ni) || isCoreNodeIndex(pi)))
\r
460 if (_logger2.isTraceEnabled())
\r
461 _logger2.trace(String.format("new pair: %s, %s (in %s)",
\r
462 previouslyFoundNode, newlyFoundTag, r));
\r
464 foundAction.add(new PushRelation(r, previouslyFoundNode, newlyFoundTag));
\r
469 // set the action list
\r
470 if (foundAction.size() > 1) {
\r
471 // context-dependent actions
\r
472 actionList.add(new ScopedPushRelation(foundAction));
\r
473 backActionList.add(new ScopedPopRelation(foundAction));
\r
477 for (PushRelation each : foundAction) {
\r
478 actionList.add(each);
\r
479 backActionList.add(new PopRelation(each.schema, each.newlyFoundNodeName));
\r
485 // loop back e.g. A -> A
\r
486 for (Schema r : query.getTargetQuerySet()) {
\r
487 String selfLoopNode = r.selfLoopNode();
\r
488 if (selfLoopNode == null)
\r
491 actionList.add(new PushLoopedRelation(r, selfLoopNode));
\r
501 void back(Node node) throws Exception {
\r
502 Iterator<LatticeNode<String>> it = stateStack.descendingIterator();
\r
503 LatticeNode<String> current = it.next();
\r
504 LatticeNode<String> prev = it.next();
\r
505 stateStack.removeLast();
\r
507 // process forward edge
\r
508 for (Operation each : getForwardActionList(prev, current, node.nodeName)) {
\r
512 // process back edge
\r
513 int prevState = latticeCursor.getNodeID();
\r
514 int nextState = latticeCursor.reset(stateStack.peekLast()).getID();
\r
516 Edge backEdge = new Edge(prevState, nextState);
\r
517 List<Operation> actionList = operationSetOnBack.get(backEdge);
\r
518 if (actionList == null) {
\r
519 throw new XerialError(XerialErrorCode.INVALID_STATE, "empty action list: " + node);
\r
521 if (actionList.isEmpty()) {
\r
522 Node poppedNode = getNodeStack(node.nodeName).getLast();
\r
523 handler.leaveNode(null, poppedNode);
\r
526 for (Operation each : actionList) {
\r
534 private boolean isCoreNodeIndex(TupleIndex ti) {
\r
535 return ti.size() == 1 && ti.get(0) == 0;
\r
538 public void sweep(TreeParser parser) throws Exception {
\r
539 AmoebaFinder f = new AmoebaFinder();
\r
543 public QuerySet getQuerySet() {
\r