文章目录
- 背景
 - 示例
 - FlinkLogicalCalcConverter
 - BatchPhysicalCalcRule
 - StreamPhysicalCalcRule
 - 其它算子
 - FlinkLogicalAggregate
 - FlinkLogicalCorrelate
 - FlinkLogicalDataStreamTableScan
 - FlinkLogicalDistribution
 - FlinkLogicalExpand
 - FlinkLogicalIntermediateTableScan
 - FlinkLogicalIntersect
 - FlinkLogicalJoin
 - FlinkLogicalLegacySink
 - FlinkLogicalLegacyTableSourceScan
 - FlinkLogicalMatch
 - FlinkLogicalMinus
 - FlinkLogicalOverAggregate
 - FlinkLogicalRank
 - FlinkLogicalSink
 - FlinkLogicalSnapshot
 - FlinkLogicalSort
 - FlinkLogicalUnion
 - FlinkLogicalValues
 
背景
本文主要介绍calcite 如何转成自定义的relnode

示例
FlinkLogicalCalcConverter
检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
 的rel,类型FlinkLogicalCalc
private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
  override def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[LogicalCalc]
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalCalc.create(newInput, calc.getProgram)
  }
}
 
BatchPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }
  def convert(rel: RelNode): RelNode = {
    val calc = rel.asInstanceOf[FlinkLogicalCalc]
    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
    new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
  }
}
 
StreamPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
  override def matches(call: RelOptRuleCall): Boolean = {
    val calc: FlinkLogicalCalc = call.rel(0)
    val program = calc.getProgram
    !program.getExprList.asScala.exists(containsPythonCall(_))
  }
  def convert(rel: RelNode): RelNode = {
    val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
    new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
  }
}
 
其它算子
介绍下算子的匹配条件
FlinkLogicalAggregate
对应的SQL语义是聚合函数
 FlinkLogicalAggregateBatchConverter
 不存在准确的distinct调用并且支持聚合函数,则返回true
override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
      // we support AVG
      case SqlKind.AVG => true
      // but none of the other AVG agg functions
      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
      case _ => true
    }
    val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)
    !hasAccurateDistinctCall && supported
  }
 
FlinkLogicalAggregateStreamConverter
SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
 非这几种,都支持转换
override def matches(call: RelOptRuleCall): Boolean = {
    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
    // we do not support these functions natively
    // they have to be converted using the FlinkAggregateReduceFunctionsRule
    agg.getAggCallList.map(_.getAggregation.getKind).forall {
      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
      case _ => true
    }
  }
 
FlinkLogicalCorrelate
对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
 检查relnode 是不是LogicalCorrelate,重写relnode
默认的onMatch 函数
 
FlinkLogicalDataStreamTableScan
对应的SQL语义是,检查数据源是不是流式的
 检查relnode 是不是LogicalCorrelate,重写relnode
  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
    dataStreamTable != null
  }
  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
  }
 
FlinkLogicalDistribution
描述数据是不是打散的
  override def convert(rel: RelNode): RelNode = {
    val distribution = rel.asInstanceOf[LogicalDistribution]
    val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
  }
 
FlinkLogicalExpand
支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符
 override def convert(rel: RelNode): RelNode = {
    val expand = rel.asInstanceOf[LogicalExpand]
    val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
  }
 
FlinkLogicalIntermediateTableScan
FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作
override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
    intermediateTable != null
  }
  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
  }
 
FlinkLogicalIntersect
用于表示 SQL 中 INTERSECT 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {
    val intersect = rel.asInstanceOf[LogicalIntersect]
    val newInputs = intersect.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalIntersect.create(newInputs, intersect.all)
  }
 
FlinkLogicalJoin
用于表示 SQL 中 JOIN 操作的逻辑运算符
 override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
  }
 
FlinkLogicalLegacySink
写数据到传统的数据源
override def convert(rel: RelNode): RelNode = {
    val sink = rel.asInstanceOf[LogicalLegacySink]
    val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalLegacySink.create(
      newInput,
      sink.hints,
      sink.sink,
      sink.sinkName,
      sink.catalogTable,
      sink.staticPartitions)
  }
 
FlinkLogicalLegacyTableSourceScan
读传统的数据源
override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    isTableSourceScan(scan)
  }
  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
    FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
  }
 
FlinkLogicalMatch
MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。
override def convert(rel: RelNode): RelNode = {
    val logicalMatch = rel.asInstanceOf[LogicalMatch]
    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
    val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)
    new FlinkLogicalMatch(
      rel.getCluster,
      traitSet,
      newInput,
      logicalMatch.getRowType,
      logicalMatch.getPattern,
      logicalMatch.isStrictStart,
      logicalMatch.isStrictEnd,
      logicalMatch.getPatternDefinitions,
      logicalMatch.getMeasures,
      logicalMatch.getAfter,
      logicalMatch.getSubsets,
      logicalMatch.isAllRows,
      logicalMatch.getPartitionKeys,
      logicalMatch.getOrderKeys,
      logicalMatch.getInterval)
  }
 
FlinkLogicalMinus
用于表示 SQL 中 minus 操作的逻辑运算符
 override def convert(rel: RelNode): RelNode = {
    val minus = rel.asInstanceOf[LogicalMinus]
    val newInputs = minus.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalMinus.create(newInputs, minus.all)
  }
 
FlinkLogicalOverAggregate
用于表示 SQL 中 窗口函数操作的逻辑运算符
FlinkLogicalRank
SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名
override def convert(rel: RelNode): RelNode = {
    val rank = rel.asInstanceOf[LogicalRank]
    val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalRank.create(
      newInput,
      rank.partitionKey,
      rank.orderKey,
      rank.rankType,
      rank.rankRange,
      rank.rankNumberType,
      rank.outputRankNumber
    )
  }
 
FlinkLogicalSink
表示SQL里的写
FlinkLogicalSnapshot
SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照
def convert(rel: RelNode): RelNode = {
    val snapshot = rel.asInstanceOf[LogicalSnapshot]
    val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
    snapshot.getPeriod match {
      case _: RexFieldAccess =>
        FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
      case _: RexLiteral =>
        newInput
    }
  }
 
FlinkLogicalSort
表示SQL里的排序
FlinkLogicalUnion
表示SQL里的union 操作
 override def matches(call: RelOptRuleCall): Boolean = {
    val union: LogicalUnion = call.rel(0)
    union.all
  }
  override def convert(rel: RelNode): RelNode = {
    val union = rel.asInstanceOf[LogicalUnion]
    val newInputs = union.getInputs.map {
      input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
    }
    FlinkLogicalUnion.create(newInputs, union.all)
  }
 
FlinkLogicalValues
SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。










