ScalarFunction#eval()`
- class MySplit extends TableFunction[String] {
- def eval(str: String): Unit = {
- if (str.contains("#")){
- str.split("#").foreach(collect)
- }
- }
-
- def eval(str: String, prefix: String): Unit = {
- if (str.contains("#")) {
- str.split("#").foreach(s => collect(prefix + s))
- }
- }}
b. 使用
- ...
- val fun = new MySplit()
- tEnv.registerFunction("mySplit", fun)
- val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"
- ...
3. UDAF
a. 定义
UDAF 要实现的接口比较多,我们以一个简单的CountAGG为例,做简单实现如下:
- /** The initial accumulator for count aggregate function */
- class CountAccumulator extends JTuple1[Long] {
- f0 = 0L //count
- }
-
- /**
- * User-defined count aggregate function
- */
- class MyCount
- extends AggregateFunction[JLong, CountAccumulator] {
-
- // process argument is optimized by Calcite.
- // For instance count(42) or count(*) will be optimized to count().
- def accumulate(acc: CountAccumulator): Unit = {
- acc.f0 += 1L
- }
-
- // process argument is optimized by Calcite.
- // For instance count(42) or count(*) will be optimized to count().
- def retract(acc: CountAccumulator): Unit = {
- acc.f0 -= 1L
- }
-
- def accumulate(acc: CountAccumulator, value: Any): Unit = {
- if (value != null) {
- acc.f0 += 1L
- }
- }
-
- def retract(acc: CountAccumulator, value: Any): Unit = {
- if (value != null) {
- acc.f0 -= 1L
- }
- }
-
- override def getValue(acc: CountAccumulator): JLong = {
- acc.f0
- }
-
- def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
- val iter = its.iterator()
- while (iter.hasNext) {
- acc.f0 += iter.next().f0
- }
- }
-
- override def createAccumulator(): CountAccumulator = {
- new CountAccumulator
- }
-
- def resetAccumulator(acc: CountAccumulator): Unit = {
- acc.f0 = 0L
- }
-
- override def getAccumulatorType: TypeInformation[CountAccumulator] = {
- new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
- }
-
- override def getResultType: TypeInformation[JLong] =
- BasicTypeInfo.LONG_TYPE_INFO}
(编辑:好传媒网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|