【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历 |
您所在的位置:网站首页 › flinksql血缘关系 › 【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历 |
相关推荐: 【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法 【Flink】第二十八篇:Flink SQL 与 Apache Calcite 【Flink】第二十九篇:源码分析 Blink Planner 从【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法 这篇文章开始,笔者开启了一个Flink SQL字段血缘实现的探索之路。但是由于当时只是针对Flink 运行时中产生的calcitetree简单分析后写出的简易版,有诸多不成熟之处,所以也没说过多的实现细节。 经过一段时间的探索,笔者又寻找到了可能是更优的实现方案,这里先将算法部分的细节进行阐述,后续再陆续展开更多实现细节。 在进一步深入探索Flink SQL源码的过程中,笔者发现可以从源码中运行时中提取到这样的数据结构: 1. nodes: nodes是Flink SQL中AST树的各个节点,每个节点包含两个重要的属性:id、fields 2. edges: edges是Flink SQL中AST树的各个节点之间的边,包含两个重要属性:source、target。注意这里的source和target即是nodes中的id。 并且,顺着edges将nodes连线后发现这是一颗倒二叉树。例如,一个三个表关联后insert into到第四张表的SQL的倒二叉树结构简化后如下, 每个node都包含向下游node沉淀的fields,而每个边都是有向的说明了source node和target node。 顺着这些分析结论,我们接下来处理它,并最终画出source表到sink表的字段血缘关系。 递归构建树 我们先对前面提到的,从源码中提取的元数据进行分析,得到一些辅助构建和遍历的信息: 得到root node id:即寻找入度为0的节点, public static Integer getRootNodeId(Map edgesMap) { List rootNodeIds = new LinkedList(); List sourceNodeIds = getAllSourceNodeIds(edgesMap); List targetNodeIds = getAllTargetNodeIds(edgesMap); // 寻找入度为0的节点 for (Integer targetNodeId : targetNodeIds) { if (!sourceNodeIds.contains(targetNodeId)) { rootNodeIds.add(targetNodeId); } } if (rootNodeIds.size() > 1) { throw new RuntimeException("more than one rootNodeId has been found!"); } return rootNodeIds.get(0); }接着我们开始由root node自顶向下递归构建这颗二叉树,代码如下 public void handleCreateTree(TreeNode node) { if (!addChilds(node)) { return; } else { if (node.getLeftChild() != null) { handleCreateTree(node.getLeftChild()); } if (node.getRightChild() != null) { handleCreateTree(node.getRightChild()); } } }为node添加左右子节点,注意在构建过程中,需要保证左子树的id小于右子树的id,因为关系到后续SQL在为重复字段重命名的规则, public boolean addChilds(TreeNode parent) { boolean optFlag = false; Integer parentId = parent.getData().getId(); for (Map.Entry edge : edgesMap.entrySet()) { if (edge.getValue().equals(parentId)) { if (parent.getLeftChild() == null) { parent.setLeftChild(new TreeNode(nodesMap.get(edge.getKey()))); } else if (parent.getRightChild() == null) { // 保证左子树比右子树的id小 Node left = parent.getLeftChild().getData(); Node tempNode = nodesMap.get(edge.getKey()); if (left.getId() < tempNode.getId()) { parent.setRightChild(new TreeNode(tempNode)); } else { parent.setRightChild(parent.getLeftChild()); parent.setLeftChild(new TreeNode(tempNode)); } } else { throw new RuntimeException(" more than two child node has been found in node:" + parent.getData().getId()); } optFlag = true; } } return optFlag; }这两部分的操作就从元数据中构建起了一颗类AST树。 递归遍历画血字段缘 接下来我们如何在遍历二叉树的过程中进行字段血缘的分析呢? 由于每个node都有本node的fields,而parent node的fields又来源于左右子节点上浮的fields,换句话说,上游的fields是由左右子node的fields而来,那么我们当然应该选择二叉树后续遍历的总体思路。 例如,下面这个情况下,在左边两个叶子结点的原始表中的fields均为id、name、ts,而上浮过程中由于上游取下了下游左子节点的id,和右子节点的name,但是name重名所以SQL在重复字段后面加"0",即成为name0。 在第二个join时,右子节点的 op_ts 进行了重命名为 ts,这种情况在上浮时依旧要出处理。 综上,代码如下, // 后续遍历 public List postOrder(TreeNode treeNode) { if (treeNode == null) { return null; } else { List leftFields = postOrder(treeNode.getLeftChild()); List rightFields = postOrder(treeNode.getRightChild()); return visit(treeNode, leftFields, rightFields); } }而最重要的visit过程又是如何将子节点的fields上浮的呢? public List visit(TreeNode node, List leftFields, List rightFields) { List fields; if (leftFields == null && rightFields == null) { fields = visitLeafNode(node); } else if (leftFields != null && rightFields != null) { fields = visitCrossNode(node, leftFields, rightFields); } else { if (leftFields != null) { fields = visitMiddleNode(node, leftFields); } else { fields = visitMiddleNode(node, rightFields); // 理论上不应该出现这种情况! System.out.println("A node with only the right subtree was found:" + node.getData().getId()); } } return fields; }这里笔者将节点的遍历分为了三种情况:叶子节点;单子树的中间节点;双子树的交叉节点。 理由是:叶子结点除了附带fields信息,还有catalog中这张source表的一些元信息,例如 在遍历具体每种node过程中,重要的操作是:将本层的fields向上浮动,并进行一些命名的特殊处理,例如之前所述的重复字段命名处理,AS字段重命处理等。三种类型的节点的主要遍历思想如下, public List visitLeafNode(TreeNode leafNode) { List fields = new LinkedList(); for (String field : leafNode.getData().getFields()) { fields.add(leafNode.getData().getIdentifier() + SPLITER + field); } return fields; }public List visitMiddleNode(TreeNode node, List childFields) { List fields = new LinkedList(); for (String afterAsField : node.getData().getFields()) { String beforeAsField = parseAsField(afterAsField, node.getData().getDescription()); String searchField = searchField(beforeAsField, childFields, afterAsField); if (searchField != null) { fields.add(searchField); } } return fields; }public List visitCrossNode(TreeNode node, List smallChildIdFields, List bigChildIdFields) { List newBigChildIdFields = handleDuplicateName(smallChildIdFields, bigChildIdFields); List fields = new LinkedList(); for (String afterAsField : node.getData().getFields()) { String beforeAsField = parseAsField(afterAsField, node.getData().getDescription()); String searchSmallChildIdField = searchField(beforeAsField, smallChildIdFields, afterAsField); String searchNewBigChildIdField = searchField(beforeAsField, newBigChildIdFields, afterAsField); if (searchSmallChildIdField != null && searchNewBigChildIdField == null) { fields.add(searchSmallChildIdField); } else if (searchNewBigChildIdField != null && searchSmallChildIdField == null) { fields.add(searchNewBigChildIdField); } else if (searchSmallChildIdField != null && searchNewBigChildIdField != null) { throw new RuntimeException(" match field both in left child and right child:" + node.getData().getId()); } else { throw new RuntimeException(" match field faild:" + node.getData().getId()); } } return fields; }综合以上,我们来看看最终程序的血缘分析运行结果: 测试用例: String insertSQL3 = " insert into sinkT " + " select leftT.id pk, rightT.name, rightT0.ts `time` " + "from leftT left join rightT on leftT.id = rightT.id left join rightT0 on leftT.id = rightT0.id ";除此之外,笔者还想总结一下递归的思想。 递归 To iterate is human,to recurse divine. 迭代的是人,递归的是神。 ——L. Peter Deutsch 虽然这句话听上去可能有几分夸大的成分,但是足以说明递归不好理解。 那为什么会出现这种情况呢?这似乎和人的思维方式有关系。我们人类的思维天生就不适合递归! 我将递归类比做我们反省自身的过程,古人云:"吾日三省吾身"。可见反省自己的难能可贵之处。而这仅仅是进行了一层的递归,更别说反省自己的反省了~~~~ 例如,在认知学中,我们经常提到元认知,这其实也是在进行一种递归反省自己的思维方式。 这个角度上看来人类在递归思维上本身就先天不足,那么我们就熟能生巧! 经过最近这段时间探索血缘中接触到的AST递归处理,总结了如下几点递归的思维技巧: 递归非常类似于我们在高数学中就接触的求数列递推公式:an = f(an-1) 我们当时在求递推公式中有一种方法叫做归纳法证明,大体思路就是先计算a1=?,再由递推公式an = f(an-1)进行一般性证明。 这种思想和递归思维非常相似,我们举例说明。 例如,斐波那契,递推定义:F(0)=0,F(1)=1, F(n)=F(n - 1)+F(n - 2),那么,直接可以写出: int fun(int i){ if(i |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |