Home > Archives > Spark基础之Java中使用DataFrame Join时的序列化问题

Spark基础之Java中使用DataFrame Join时的序列化问题

Published on

在DataFrame中,join方法有很多重载的版本,大致如下:

def join(right: DataFrame, usingColumn: String): DataFrame
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame

我们重点来看一下第三个和第四个方法。第三个允许我们自己构造join表达式,并且指定连表类型,如果两边join的Column一样,则两个都会被保留。那么最后列名映射的时候,就会因为有两个相同的列名而抛出下面的AnalysisException。当然我们也可以在刚开始的时候分别重命名对应的列名。

val list1: List[(String, Int)] = List(("allen", 1001))
val dataFrame1 = sc.parallelize(list1).toDF("name", "id")

val list2: List[(String, Int)] = List(("allen", 26), ("zml", 28))
val dataFrame2 = sc.parallelize(list2).toDF("name", "age")
val dataFrame3 = dataFrame1.join(dataFrame2, dataFrame1.col("name") === dataFrame2.col("name"), "outer")
dataFrame3: org.apache.spark.sql.DataFrame = [name: string, id: int, name: string, age: int]

dataFrame3.show()
+-----+----+-----+---+                                                          
| name|  id| name|age|
+-----+----+-----+---+
| null|null|  zml| 28|
|allen|1001|allen| 26|
+-----+----+-----+---+

dataFrame3.select("name", "id", "age")
org.apache.spark.sql.AnalysisException: Reference 'name' is ambiguous, could be: name#12, name#16.;

第四个可以让我们直接指定join的列名以及连表类型,另外相较于第三个方法,当join列名相同的时候,它只会保留其中一个

val dataFrame3 = dataFrame1.join(dataFrame2, List("name"), "outer")
dataFrame3: org.apache.spark.sql.DataFrame = [name: string, id: int, age: int]

dataFrame3.show()

+-----+----+---+                                                                
| name|  id|age|
+-----+----+---+
|  zml|null| 28|
|allen|1001| 26|
+-----+----+---+

在Scala中我们可以非常方便的使用该方法,但是Java中目前(Spark 1.6)好像不能使用该方法。因为它的方法签名需要scala.collection.Seq,而Java并没有该类型,不过我们可以通过相应的方法来构造。我在标题中提到的序列化问题也正是由于这次尝试导致的

public static <T> scala.collection.Seq<T> asScalaSeq(T... elements) {
    // def toSeq: Seq[A] = toStream
    return collectionAsScalaIterable(Arrays.asList(elements)).toSeq();
}

通过上面的方法,将Java中的容器类型转换成为Scala的容器类型,但是上面的方法有一个潜在的问题 – 将driver上不能序列化的游标类(Itr, java.util.AbstractList中的一个内部类)直接传递到executor上面去了。

DataFrame dataFrame3 =
    dataFrame1.join(dataFrame2, CollectionUtil.asScalaSeq("name"), "outer");
    
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr

collectionAsScalaIterable将Java容器类转换成Scala的Iterable类JCollectionWrapper,而toSeq()则将该它转换成为Stream(scala中的流),这一个过程是非常自然的,因为它们两种类型的容器都有延迟计算的特性。关键就在这一步,转换过程中调用了底层Java容器的iterator方法,但恰好ArrayList(java.util.Arrays中的内部类,不是我们经常使用的那个)的iterator返回的是Itr*的实例(它没有实现Serializable接口)。

// scala.collection.convert.Wrappers

case class JCollectionWrapper[A](underlying: ju.Collection[A]) 
    extends AbstractIterable[A] with Iterable[A] {
    ...
    def iterator = underlying.iterator
    ...
}

上面提到过程的简化调用链

Arrays.asList(elements)
    ArrayList<E> extends AbstractList<E>
    Iterator<E> iterator() { return new Itr(); } // 该类不能序列化
    
collectionAsScalaIterable(Arrays.asList(elements))
    JCollectionWrapper { def iterator = underlying.iterator }
        JCollectionWrapper.toSeq()
            JCollectionWrapper.toStream()
                iterator.toStream
                    underlying.iterator

对于类不能被序列化而造成Task Failure,一般有两种解法:

在本场景中,由于Executor使用该类的过程我们无法更改,所以可以尝试让该类变得可以序列化,最直接的方式就强制该流的输出,即立马拿到结果,比如说转换成scala.collection.immutable.List

DataFrame dataFrame3 =
    dataFrame1.join(dataFrame2, CollectionUtil.asScalaSeq("row1").toList(), "outer");

所以,在Spark中,在处理Scala和Java互相使用的时候一定要注意是否可以序列化的问题。

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方