OSDN Git Service

imported from subversion repository
[xerial/xerial-core.git] / src / main / java / org / xerial / lens / relation / query / StreamAmoebaJoin.java
1 /*--------------------------------------------------------------------------\r
2  *  Copyright 2008 Taro L. Saito\r
3  *\r
4  *  Licensed under the Apache License, Version 2.0 (the "License");\r
5  *  you may not use this file except in compliance with the License.\r
6  *  You may obtain a copy of the License at\r
7  *\r
8  *     http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  *  Unless required by applicable law or agreed to in writing, software\r
11  *  distributed under the License is distributed on an "AS IS" BASIS,\r
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  *  See the License for the specific language governing permissions and\r
14  *  limitations under the License.\r
15  *--------------------------------------------------------------------------*/\r
16 //--------------------------------------\r
17 // Xerial Raquel Project\r
18 //\r
19 // StreamAmoebaJoin.java\r
20 // Since: 2008/11/20 22:07:28\r
21 //\r
22 // $URL: http://www.xerial.org/svn/project/XerialJ/trunk/xerial-core/src/main/java/org/xerial/lens/relation/query/StreamAmoebaJoin.java $\r
23 // $Author: leo $\r
24 //--------------------------------------\r
25 package org.xerial.lens.relation.query;\r
26 \r
27 import java.io.IOException;\r
28 import java.util.ArrayList;\r
29 import java.util.HashMap;\r
30 import java.util.Iterator;\r
31 import java.util.List;\r
32 import java.util.Map.Entry;\r
33 \r
34 import org.xerial.core.XerialError;\r
35 import org.xerial.core.XerialErrorCode;\r
36 import org.xerial.core.XerialException;\r
37 import org.xerial.lens.ObjectLens;\r
38 import org.xerial.lens.relation.Node;\r
39 import org.xerial.lens.relation.TupleIndex;\r
40 import org.xerial.lens.relation.Node.NodeBuilder;\r
41 import org.xerial.lens.relation.schema.Schema;\r
42 import org.xerial.util.ArrayDeque;\r
43 import org.xerial.util.Deque;\r
44 import org.xerial.util.HashedDeque;\r
45 import org.xerial.util.graph.Edge;\r
46 import org.xerial.util.graph.Lattice;\r
47 import org.xerial.util.graph.LatticeCursor;\r
48 import org.xerial.util.graph.LatticeNode;\r
49 import org.xerial.util.log.Logger;\r
50 import org.xerial.util.tree.TreeEventHandler;\r
51 import org.xerial.util.tree.TreeParser;\r
52 \r
53 /**\r
54  * \r
55  * DFA-based amoeba join processing algorithm, which consumes XML streams\r
56  * \r
57  * \r
58  * @author leo\r
59  * \r
60  */\r
61 public class StreamAmoebaJoin {\r
62     public static final String ALTERNATIVE_ATTRIBUTE_SYMBOL = "-";\r
63 \r
64     private static Logger _logger = Logger.getLogger(StreamAmoebaJoin.class);\r
65     private static Logger _logger2 = Logger.getLogger(StreamAmoebaJoin.class, "lattice");\r
66 \r
67     final QuerySet query;\r
68     final AmoebaJoinHandler handler;\r
69 \r
70     private final static String EMPTY_NODE_NAME = "";\r
71 \r
72     // for running amoeba join\r
73     private long nodeCount = -1;\r
74     private Lattice<String> nodeNameLattice = new Lattice<String>();\r
75     private LatticeCursor<String> latticeCursor;\r
76 \r
77     private Deque<String> currentPath = new ArrayDeque<String>();\r
78     private Deque<LatticeNode<String>> stateStack = new ArrayDeque<LatticeNode<String>>();\r
79 \r
80     //  HashedChainMap<String, XMLNode> nodeStackOfEachTag = new HashedChainMap<String, XMLNode>();\r
81     private HashedDeque<String, Node> nodeStackOfEachTag = new HashedDeque<String, Node>();\r
82 \r
83     private HashMap<Edge, List<Operation>> operationSetOnForward = new HashMap<Edge, List<Operation>>();\r
84     private HashMap<Edge, List<Operation>> operationSetOnBack = new HashMap<Edge, List<Operation>>();\r
85     private HashMap<Edge, List<TextOperation>> operatSetOnText = new HashMap<Edge, List<TextOperation>>();\r
86 \r
87     private int attributeAmoebaSize = 1;\r
88 \r
89     public StreamAmoebaJoin(QuerySet query, AmoebaJoinHandler handler) throws IOException {\r
90         this.query = query;\r
91         this.handler = handler;\r
92 \r
93         if (query == null)\r
94             throw new XerialError(XerialErrorCode.INVALID_INPUT, "query set is null");\r
95     }\r
96 \r
97     static interface TextOperation {\r
98         void execute(String testNodeName, String textData) throws Exception;\r
99     }\r
100 \r
101     class SimpleTextOperation implements TextOperation {\r
102         final Schema schema;\r
103         final String coreNodeName;\r
104 \r
105         public SimpleTextOperation(Schema schema, String contextNodeName) {\r
106             this.schema = schema;\r
107             this.coreNodeName = contextNodeName;\r
108         }\r
109 \r
110         public SimpleTextOperation(PushRelation pr) {\r
111             this.schema = pr.schema;\r
112             this.coreNodeName = pr.coreNodeName;\r
113         }\r
114 \r
115         public void execute(String textNodeName, String textData) throws Exception {\r
116             Deque<Node> nodeStack = getNodeStack(coreNodeName);\r
117             Node contextNode = nodeStack.getLast();\r
118 \r
119             Deque<Node> textNodeStack = getNodeStack(textNodeName);\r
120             Node textNode = textNodeStack.getLast();\r
121 \r
122             handler.text(schema, contextNode, textNode, textData);\r
123         }\r
124     }\r
125 \r
126     class ContextBasedTextOperation implements TextOperation {\r
127         final HashMap<String, TextOperation> coreNode_action = new HashMap<String, TextOperation>();\r
128 \r
129         public ContextBasedTextOperation(ScopedPushRelation scopedPushOperation) {\r
130             for (Entry<String, PushRelation> each : scopedPushOperation.coreNode_action.entrySet()) {\r
131                 coreNode_action.put(each.getKey(), new SimpleTextOperation(each.getValue()));\r
132             }\r
133         }\r
134 \r
135         public void execute(String nodeName, String textData) throws Exception {\r
136             int hop = 0;\r
137             for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()\r
138                     && hop <= attributeAmoebaSize; hop++) {\r
139                 String contextNode = it.next();\r
140                 if (coreNode_action.containsKey(contextNode)) {\r
141                     coreNode_action.get(contextNode).execute(nodeName, textData);\r
142                     return;\r
143                 }\r
144             }\r
145         }\r
146 \r
147     }\r
148 \r
149     /**\r
150      * Defines an operation assigned to an currentEdge of the node name lattice\r
151      * \r
152      * @author leo\r
153      * \r
154      */\r
155     static interface Operation {\r
156         void execute() throws Exception;\r
157     }\r
158 \r
159     class PushRelation implements Operation {\r
160         final Schema schema;\r
161         final String coreNodeName;\r
162         final String attributeNodeName;\r
163         final String newlyFoundNodeName;\r
164 \r
165         public PushRelation(Schema schema, String previouslyFoundTag, String newlyFoundTag) {\r
166             this.schema = schema;\r
167             this.newlyFoundNodeName = newlyFoundTag;\r
168 \r
169             if (isCoreNodeIndex(schema.getNodeIndex(previouslyFoundTag))) {\r
170                 this.coreNodeName = previouslyFoundTag;\r
171                 this.attributeNodeName = newlyFoundTag;\r
172             }\r
173             else if (isCoreNodeIndex(schema.getNodeIndex(newlyFoundTag))) {\r
174                 this.coreNodeName = newlyFoundTag;\r
175                 this.attributeNodeName = previouslyFoundTag;\r
176             }\r
177             else\r
178                 throw new XerialError(XerialErrorCode.INVALID_STATE, "no core node in " + schema);\r
179         }\r
180 \r
181         public void execute() throws Exception {\r
182             Node coreNode = getNodeStack(coreNodeName).getLast();\r
183             Node attributeNode = getNodeStack(attributeNodeName).getLast();\r
184 \r
185             if (_logger.isTraceEnabled())\r
186                 _logger.trace(String.format("push:(%s, %s)", coreNode, attributeNode));\r
187 \r
188             handler.newAmoeba(schema, coreNode, attributeNode);\r
189         }\r
190 \r
191         @Override\r
192         public String toString() {\r
193             return String.format("push: %s for (%s, %s)", schema, coreNodeName, attributeNodeName);\r
194         }\r
195     }\r
196 \r
197     class ScopedPushRelation implements Operation {\r
198         final HashMap<String, PushRelation> coreNode_action = new HashMap<String, PushRelation>();\r
199 \r
200         public ScopedPushRelation(List<PushRelation> candidates) {\r
201             for (PushRelation each : candidates) {\r
202                 Schema s = each.schema;\r
203                 coreNode_action.put(each.coreNodeName, each);\r
204             }\r
205         }\r
206 \r
207         public void execute() throws Exception {\r
208             int hop = 0;\r
209             for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()\r
210                     && hop <= attributeAmoebaSize; hop++) {\r
211                 String contextNode = it.next();\r
212                 if (coreNode_action.containsKey(contextNode)) {\r
213                     coreNode_action.get(contextNode).execute();\r
214                     return;\r
215                 }\r
216             }\r
217 \r
218             //            throw new XerialError(XerialErrorCode.INVALID_STATE, String.format("no action is invoked: path=%s %s",\r
219             //                    currentPath, coreNode_action));\r
220         }\r
221 \r
222     }\r
223 \r
224     class ScopedPopRelation implements Operation {\r
225         final HashMap<String, PopRelation> coreNode_action = new HashMap<String, PopRelation>();\r
226 \r
227         public ScopedPopRelation(List<PushRelation> candidates) {\r
228             for (PushRelation each : candidates) {\r
229                 Schema s = each.schema;\r
230 \r
231                 coreNode_action.put(each.coreNodeName, new PopRelation(s, each.newlyFoundNodeName));\r
232             }\r
233         }\r
234 \r
235         public void execute() throws Exception {\r
236             int hop = 0;\r
237             for (Iterator<String> it = currentPath.descendingIterator(); it.hasNext()\r
238                     && hop <= attributeAmoebaSize; hop++) {\r
239                 String contextNode = it.next();\r
240                 if (coreNode_action.containsKey(contextNode)) {\r
241                     coreNode_action.get(contextNode).execute();\r
242                     return;\r
243                 }\r
244             }\r
245 \r
246             //            throw new XerialError(XerialErrorCode.INVALID_STATE, String.format("no action is invoked: path=%s %s",\r
247             //                    currentPath, coreNode_action));\r
248         }\r
249 \r
250     }\r
251 \r
252     class PopRelation implements Operation {\r
253         final Schema schema;\r
254         final String poppedTag;\r
255 \r
256         public PopRelation(Schema schema, String poppedTag) {\r
257             this.schema = schema;\r
258             this.poppedTag = poppedTag;\r
259         }\r
260 \r
261         public void execute() throws Exception {\r
262             Node poppedNode = getNodeStack(poppedTag).getLast();\r
263             handler.leaveNode(schema, poppedNode);\r
264             //container.pop(poppedNode);\r
265         }\r
266 \r
267     }\r
268 \r
269     class PushLoopedRelation implements Operation {\r
270         final Schema schema;\r
271         final String tagName;\r
272 \r
273         public PushLoopedRelation(Schema schema, String tagName) {\r
274             this.schema = schema;\r
275             this.tagName = tagName;\r
276         }\r
277 \r
278         public void execute() throws Exception {\r
279             Deque<Node> nodeStack = getNodeStack(tagName);\r
280             if (nodeStack.size() < 2)\r
281                 throw new XerialError(XerialErrorCode.INVALID_STATE);\r
282 \r
283             Iterator<Node> reverseCursor = nodeStack.descendingIterator();\r
284             Node newlyFoundNode = reverseCursor.next();\r
285             Node previouslyFoundNode = reverseCursor.next();\r
286 \r
287             if (_logger.isTraceEnabled())\r
288                 _logger.trace(String.format("loop back: %s and %s", previouslyFoundNode,\r
289                         newlyFoundNode));\r
290 \r
291             handler.newAmoeba(schema, previouslyFoundNode, newlyFoundNode);\r
292         }\r
293 \r
294     }\r
295 \r
296     public Deque<Node> getNodeStack(String nodeName) {\r
297         return nodeStackOfEachTag.get(nodeName);\r
298     }\r
299 \r
300     public static String sanitize(String nodeName) {\r
301         return ObjectLens.getCanonicalParameterName(nodeName);\r
302     }\r
303 \r
304     /**\r
305      * Body of depth-first tree traverser\r
306      * \r
307      * @author leo\r
308      * \r
309      */\r
310     private class AmoebaFinder implements TreeEventHandler {\r
311 \r
312         public void finish() throws Exception {\r
313             leaveNode("root");\r
314             if (_logger.isTraceEnabled())\r
315                 _logger.trace("sweep finished");\r
316             handler.finish();\r
317         }\r
318 \r
319         public void init() throws Exception {\r
320             nodeCount = -1;\r
321             latticeCursor = nodeNameLattice.cursor();\r
322             stateStack.addLast(latticeCursor.getNode());\r
323 \r
324             handler.init();\r
325 \r
326             visitNode("root", null);\r
327         }\r
328 \r
329         public void visitNode(String nodeName, String nodeValue) throws Exception {\r
330             nodeName = sanitize(nodeName);\r
331 \r
332             Node currentNode = new NodeBuilder(nodeName).nodeID(++nodeCount).nodeValue(nodeValue)\r
333                     .build();\r
334             Deque<Node> nodeStack = getNodeStack(nodeName);\r
335             nodeStack.add(currentNode);\r
336 \r
337             // forward\r
338             LatticeNode<String> prevState = latticeCursor.getNode();\r
339             LatticeNode<String> nextState = latticeCursor.next(nodeName);\r
340             stateStack.addLast(nextState);\r
341             currentPath.addLast(nodeName != null ? nodeName : EMPTY_NODE_NAME);\r
342 \r
343             // for tree nodes\r
344 \r
345             if (query.isTreeNode(nodeName)) {\r
346                 throw new XerialError(XerialErrorCode.UNSUPPORTED, "tree not is not supported yet");\r
347             }\r
348 \r
349         }\r
350 \r
351         public void text(String nodeName, String textDataFragment) throws Exception {\r
352             nodeName = sanitize(nodeName);\r
353 \r
354             Iterator<LatticeNode<String>> it = stateStack.descendingIterator();\r
355             LatticeNode<String> currentState = it.next();\r
356             LatticeNode<String> prevState = it.next();\r
357 \r
358             Edge currentEdge = new Edge(prevState.getID(), currentState.getID());\r
359             List<TextOperation> textOperation = operatSetOnText.get(currentEdge);\r
360 \r
361             // generate a text operation set\r
362             if (textOperation == null) {\r
363                 textOperation = new ArrayList<TextOperation>();\r
364                 operatSetOnText.put(currentEdge, textOperation);\r
365 \r
366                 List<Operation> forwardAction = getForwardActionList(prevState, currentState,\r
367                         nodeName);\r
368                 for (Operation each : forwardAction) {\r
369                     if (each instanceof PushRelation) {\r
370                         textOperation.add(new SimpleTextOperation((PushRelation) each));\r
371                     }\r
372                     else if (each instanceof ScopedPushRelation) {\r
373                         textOperation.add(new ContextBasedTextOperation((ScopedPushRelation) each));\r
374                     }\r
375                     else\r
376                         throw new XerialError(XerialErrorCode.INVALID_STATE, "unknown operation: "\r
377                                 + each);\r
378                 }\r
379             }\r
380 \r
381             assert textOperation != null;\r
382 \r
383             try {\r
384                 for (TextOperation each : textOperation)\r
385                     each.execute(nodeName, textDataFragment);\r
386             }\r
387             catch (Exception e) {\r
388                 if (e instanceof XerialException)\r
389                     throw (XerialException) e;\r
390                 else\r
391                     throw new XerialException(XerialErrorCode.INHERITED, e);\r
392             }\r
393         }\r
394 \r
395         public void leaveNode(String nodeName) throws Exception {\r
396             nodeName = sanitize(nodeName);\r
397 \r
398             Deque<Node> nodeStack = getNodeStack(nodeName);\r
399             Node currentNode = nodeStack.getLast();\r
400 \r
401             try {\r
402                 back(currentNode);\r
403             }\r
404             catch (Exception e) {\r
405                 if (e instanceof XerialException)\r
406                     throw (XerialException) e;\r
407                 else if (e instanceof XerialError)\r
408                     throw (XerialError) e;\r
409                 else\r
410                     throw new XerialException(XerialErrorCode.INHERITED, e);\r
411             }\r
412 \r
413             currentPath.removeLast();\r
414 \r
415             nodeStack.removeLast();\r
416         }\r
417 \r
418         private List<Operation> getForwardActionList(LatticeNode<String> prevState,\r
419                 LatticeNode<String> nextState, String nodeName) {\r
420             Edge currentEdge = new Edge(prevState.getID(), nextState.getID());\r
421 \r
422             List<Operation> actionList = operationSetOnForward.get(currentEdge);\r
423             if (actionList != null)\r
424                 return actionList;\r
425 \r
426             int prevNodeID = currentEdge.getSourceNodeID();\r
427             int nextNodeID = currentEdge.getDestNodeID();\r
428 \r
429             // lazily prepare the action list\r
430             actionList = new ArrayList<Operation>();\r
431             operationSetOnForward.put(currentEdge, actionList);\r
432             List<Operation> backActionList = new ArrayList<Operation>();\r
433             operationSetOnBack.put(new Edge(nextNodeID, prevNodeID), backActionList);\r
434 \r
435             // search for the corresponding relations to newly found two node pair \r
436             String newlyFoundTag = nodeName;\r
437 \r
438             if (_logger2.isTraceEnabled())\r
439                 _logger2.trace("create actions for " + newlyFoundTag);\r
440 \r
441             if (prevNodeID != nextNodeID) {\r
442                 List<PushRelation> foundAction = new ArrayList<PushRelation>();\r
443                 // (core node, attribute node)\r
444                 for (Schema r : query.getTargetQuerySet()) {\r
445                     TupleIndex ni = r.getNodeIndex(newlyFoundTag);\r
446                     if (ni == null)\r
447                         continue;\r
448 \r
449                     for (String previouslyFoundNode : nextState) {\r
450                         TupleIndex pi = r.getNodeIndex(previouslyFoundNode);\r
451                         if (pi == null)\r
452                             continue;\r
453 \r
454                         if (previouslyFoundNode.equals(newlyFoundTag))\r
455                             continue;\r
456 \r
457                         if (!(isCoreNodeIndex(ni) || isCoreNodeIndex(pi)))\r
458                             continue;\r
459 \r
460                         if (_logger2.isTraceEnabled())\r
461                             _logger2.trace(String.format("new pair: %s, %s (in %s)",\r
462                                     previouslyFoundNode, newlyFoundTag, r));\r
463 \r
464                         foundAction.add(new PushRelation(r, previouslyFoundNode, newlyFoundTag));\r
465                         break;\r
466                     }\r
467                 }\r
468 \r
469                 // set the action list\r
470                 if (foundAction.size() > 1) {\r
471                     // context-dependent actions\r
472                     actionList.add(new ScopedPushRelation(foundAction));\r
473                     backActionList.add(new ScopedPopRelation(foundAction));\r
474                 }\r
475                 else {\r
476                     // a single action\r
477                     for (PushRelation each : foundAction) {\r
478                         actionList.add(each);\r
479                         backActionList.add(new PopRelation(each.schema, each.newlyFoundNodeName));\r
480                     }\r
481                 }\r
482 \r
483             }\r
484             else {\r
485                 // loop back e.g. A -> A\r
486                 for (Schema r : query.getTargetQuerySet()) {\r
487                     String selfLoopNode = r.selfLoopNode();\r
488                     if (selfLoopNode == null)\r
489                         continue;\r
490                     else {\r
491                         actionList.add(new PushLoopedRelation(r, selfLoopNode));\r
492                         break;\r
493                     }\r
494                 }\r
495             }\r
496 \r
497             return actionList;\r
498 \r
499         }\r
500 \r
501         void back(Node node) throws Exception {\r
502             Iterator<LatticeNode<String>> it = stateStack.descendingIterator();\r
503             LatticeNode<String> current = it.next();\r
504             LatticeNode<String> prev = it.next();\r
505             stateStack.removeLast();\r
506 \r
507             // process forward edge\r
508             for (Operation each : getForwardActionList(prev, current, node.nodeName)) {\r
509                 each.execute();\r
510             }\r
511 \r
512             // process back edge\r
513             int prevState = latticeCursor.getNodeID();\r
514             int nextState = latticeCursor.reset(stateStack.peekLast()).getID();\r
515 \r
516             Edge backEdge = new Edge(prevState, nextState);\r
517             List<Operation> actionList = operationSetOnBack.get(backEdge);\r
518             if (actionList == null) {\r
519                 throw new XerialError(XerialErrorCode.INVALID_STATE, "empty action list: " + node);\r
520             }\r
521             if (actionList.isEmpty()) {\r
522                 Node poppedNode = getNodeStack(node.nodeName).getLast();\r
523                 handler.leaveNode(null, poppedNode);\r
524             }\r
525             else\r
526                 for (Operation each : actionList) {\r
527                     each.execute();\r
528                 }\r
529 \r
530         }\r
531 \r
532     }\r
533 \r
534     private boolean isCoreNodeIndex(TupleIndex ti) {\r
535         return ti.size() == 1 && ti.get(0) == 0;\r
536     }\r
537 \r
538     public void sweep(TreeParser parser) throws Exception {\r
539         AmoebaFinder f = new AmoebaFinder();\r
540         parser.parse(f);\r
541     }\r
542 \r
543     public QuerySet getQuerySet() {\r
544         return query;\r
545     }\r
546 \r
547 }