Fighting NotSerializableException in Apache Spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems. There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:


class MyActor(ssc: StreamingContext) extends Actor {
  // Create direct stream to Kafka
  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

  // Save raw data to Cassandra
  kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

  // Get some data from another Cassandra table
  val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

  // Filter and save data to Cassandra
    .filter { message =>
      // Whatever logic can be here, the point is that "someTable" is used
      someTable.filter(_.message == message).count > 42
    .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

  def receive = Actor.emptyBehavior

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.


class MyActor(ssc: StreamingContext) extends Actor {
  def init(): Unit = {
    // Create direct stream to Kafka ... the same code as before, only inside this methos
    val kafkaStream = ...


  def receive = Actor.emptyBehavior

How simple. You’re welcome.