【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历

您所在的位置:网站首页 flinksql血缘关系 【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历

【Flink】第三十二篇:Flink SQL 字段血缘中树的构建与遍历

2023-10-24 12:12| 来源: 网络整理| 查看: 265

相关推荐:

【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法

【Flink】第二十八篇:Flink SQL 与 Apache Calcite

【Flink】第二十九篇:源码分析 Blink Planner

从【Flink】第二十七篇:三天撸了一个 Flink SQL 字段血缘算法 这篇文章开始,笔者开启了一个Flink SQL字段血缘实现的探索之路。但是由于当时只是针对Flink 运行时中产生的calcitetree简单分析后写出的简易版,有诸多不成熟之处,所以也没说过多的实现细节。

经过一段时间的探索,笔者又寻找到了可能是更优的实现方案,这里先将算法部分的细节进行阐述,后续再陆续展开更多实现细节。

在进一步深入探索Flink SQL源码的过程中,笔者发现可以从源码中运行时中提取到这样的数据结构:

1. nodes:

nodes是Flink SQL中AST树的各个节点,每个节点包含两个重要的属性:id、fields

2. edges:

edges是Flink SQL中AST树的各个节点之间的边,包含两个重要属性:source、target。注意这里的source和target即是nodes中的id。

并且,顺着edges将nodes连线后发现这是一颗倒二叉树。例如,一个三个表关联后insert into到第四张表的SQL的倒二叉树结构简化后如下,

每个node都包含向下游node沉淀的fields,而每个边都是有向的说明了source node和target node。

顺着这些分析结论,我们接下来处理它,并最终画出source表到sink表的字段血缘关系。

递归构建树

我们先对前面提到的,从源码中提取的元数据进行分析,得到一些辅助构建和遍历的信息:

得到root node id:即寻找入度为0的节点,

public static Integer getRootNodeId(Map edgesMap) { List rootNodeIds = new LinkedList(); List sourceNodeIds = getAllSourceNodeIds(edgesMap); List targetNodeIds = getAllTargetNodeIds(edgesMap); // 寻找入度为0的节点 for (Integer targetNodeId : targetNodeIds) { if (!sourceNodeIds.contains(targetNodeId)) { rootNodeIds.add(targetNodeId); } } if (rootNodeIds.size() > 1) { throw new RuntimeException("more than one rootNodeId has been found!"); } return rootNodeIds.get(0); }

接着我们开始由root node自顶向下递归构建这颗二叉树,代码如下

public void handleCreateTree(TreeNode node) { if (!addChilds(node)) { return; } else { if (node.getLeftChild() != null) { handleCreateTree(node.getLeftChild()); } if (node.getRightChild() != null) { handleCreateTree(node.getRightChild()); } } }

为node添加左右子节点,注意在构建过程中,需要保证左子树的id小于右子树的id,因为关系到后续SQL在为重复字段重命名的规则,

public boolean addChilds(TreeNode parent) { boolean optFlag = false; Integer parentId = parent.getData().getId(); for (Map.Entry edge : edgesMap.entrySet()) { if (edge.getValue().equals(parentId)) { if (parent.getLeftChild() == null) { parent.setLeftChild(new TreeNode(nodesMap.get(edge.getKey()))); } else if (parent.getRightChild() == null) { // 保证左子树比右子树的id小 Node left = parent.getLeftChild().getData(); Node tempNode = nodesMap.get(edge.getKey()); if (left.getId() < tempNode.getId()) { parent.setRightChild(new TreeNode(tempNode)); } else { parent.setRightChild(parent.getLeftChild()); parent.setLeftChild(new TreeNode(tempNode)); } } else { throw new RuntimeException(" more than two child node has been found in node:" + parent.getData().getId()); } optFlag = true; } } return optFlag; }

这两部分的操作就从元数据中构建起了一颗类AST树。

递归遍历画血字段缘

接下来我们如何在遍历二叉树的过程中进行字段血缘的分析呢?

由于每个node都有本node的fields,而parent node的fields又来源于左右子节点上浮的fields,换句话说,上游的fields是由左右子node的fields而来,那么我们当然应该选择二叉树后续遍历的总体思路。

例如,下面这个情况下,在左边两个叶子结点的原始表中的fields均为id、name、ts,而上浮过程中由于上游取下了下游左子节点的id,和右子节点的name,但是name重名所以SQL在重复字段后面加"0",即成为name0。

在第二个join时,右子节点的 op_ts 进行了重命名为 ts,这种情况在上浮时依旧要出处理。

综上,代码如下,

// 后续遍历 public List postOrder(TreeNode treeNode) { if (treeNode == null) { return null; } else { List leftFields = postOrder(treeNode.getLeftChild()); List rightFields = postOrder(treeNode.getRightChild()); return visit(treeNode, leftFields, rightFields); } }

而最重要的visit过程又是如何将子节点的fields上浮的呢?

public List visit(TreeNode node, List leftFields, List rightFields) { List fields; if (leftFields == null && rightFields == null) { fields = visitLeafNode(node); } else if (leftFields != null && rightFields != null) { fields = visitCrossNode(node, leftFields, rightFields); } else { if (leftFields != null) { fields = visitMiddleNode(node, leftFields); } else { fields = visitMiddleNode(node, rightFields); // 理论上不应该出现这种情况! System.out.println("A node with only the right subtree was found:" + node.getData().getId()); } } return fields; }

这里笔者将节点的遍历分为了三种情况:叶子节点;单子树的中间节点;双子树的交叉节点。

理由是:叶子结点除了附带fields信息,还有catalog中这张source表的一些元信息,例如

在遍历具体每种node过程中,重要的操作是:将本层的fields向上浮动,并进行一些命名的特殊处理,例如之前所述的重复字段命名处理,AS字段重命处理等。三种类型的节点的主要遍历思想如下,

public List visitLeafNode(TreeNode leafNode) { List fields = new LinkedList(); for (String field : leafNode.getData().getFields()) { fields.add(leafNode.getData().getIdentifier() + SPLITER + field); } return fields; }public List visitMiddleNode(TreeNode node, List childFields) { List fields = new LinkedList(); for (String afterAsField : node.getData().getFields()) { String beforeAsField = parseAsField(afterAsField, node.getData().getDescription()); String searchField = searchField(beforeAsField, childFields, afterAsField); if (searchField != null) { fields.add(searchField); } } return fields; }public List visitCrossNode(TreeNode node, List smallChildIdFields, List bigChildIdFields) { List newBigChildIdFields = handleDuplicateName(smallChildIdFields, bigChildIdFields); List fields = new LinkedList(); for (String afterAsField : node.getData().getFields()) { String beforeAsField = parseAsField(afterAsField, node.getData().getDescription()); String searchSmallChildIdField = searchField(beforeAsField, smallChildIdFields, afterAsField); String searchNewBigChildIdField = searchField(beforeAsField, newBigChildIdFields, afterAsField); if (searchSmallChildIdField != null && searchNewBigChildIdField == null) { fields.add(searchSmallChildIdField); } else if (searchNewBigChildIdField != null && searchSmallChildIdField == null) { fields.add(searchNewBigChildIdField); } else if (searchSmallChildIdField != null && searchNewBigChildIdField != null) { throw new RuntimeException(" match field both in left child and right child:" + node.getData().getId()); } else { throw new RuntimeException(" match field faild:" + node.getData().getId()); } } return fields; }

综合以上,我们来看看最终程序的血缘分析运行结果:

测试用例:

String insertSQL3 = " insert into sinkT " + " select leftT.id pk, rightT.name, rightT0.ts `time` " + "from leftT left join rightT on leftT.id = rightT.id left join rightT0 on leftT.id = rightT0.id ";

除此之外,笔者还想总结一下递归的思想。

递归

To iterate is human,to recurse divine.

迭代的是人,递归的是神。

——L. Peter Deutsch

虽然这句话听上去可能有几分夸大的成分,但是足以说明递归不好理解。

那为什么会出现这种情况呢?这似乎和人的思维方式有关系。我们人类的思维天生就不适合递归!

我将递归类比做我们反省自身的过程,古人云:"吾日三省吾身"。可见反省自己的难能可贵之处。而这仅仅是进行了一层的递归,更别说反省自己的反省了~~~~

例如,在认知学中,我们经常提到元认知,这其实也是在进行一种递归反省自己的思维方式。

这个角度上看来人类在递归思维上本身就先天不足,那么我们就熟能生巧!

经过最近这段时间探索血缘中接触到的AST递归处理,总结了如下几点递归的思维技巧:

递归非常类似于我们在高数学中就接触的求数列递推公式:an = f(an-1)

我们当时在求递推公式中有一种方法叫做归纳法证明,大体思路就是先计算a1=?,再由递推公式an = f(an-1)进行一般性证明。

这种思想和递归思维非常相似,我们举例说明。

例如,斐波那契,递推定义:F(0)=0,F(1)=1, F(n)=F(n - 1)+F(n - 2),那么,直接可以写出:

int fun(int i){ if(i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3