Fighting NotSerializableException in Apache Spark

Tags :,
Category :spark

Using Spark context in a class contructor can cause serialization issues. Move the logic and variables to a member method to avoid some of these problems. There are many reasons why you can get this nasty SparkException: Task not serializable. StackOverflow is full of answers but this one was not so obvious. At least not for me.

I had simple Spark application which created direct stream to Kafka, did some filtering and then saved results to Cassandra. When I ran it, I got the exception saying that the filtering task cannot be serialized. Check the code and try to tell me what’s wrong with it:

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
  // Create direct stream to Kafka
  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, ...)

  // Save raw data to Cassandra
  kafkaStream.saveToCassandra("cassandraKeyspace", "cassandraTableRaw")

  // Get some data from another Cassandra table
  val someTable = ssc.sparkContext.cassandraTable[SomeTable]("cassandraKeyspace", "someTable")

  // Filter and save data to Cassandra
  kafkaStream
    .filter { message =>
      // Whatever logic can be here, the point is that "someTable" is used
      someTable.filter(_.message == message).count > 42
    }
    .saveToCassandra(cassandraKeyspace, cassandraTableAggNewVisitors)

  def receive = Actor.emptyBehavior
}

Ok. Do you see that someTable variable inside the filter function? That’s the cause of the problem. It is an RDD which is, of course, by definition serializable. Firstly I thought that the concrete implementation is for some reason not serializable, but that’s just also wrong way of thinking.

Whom does the variable belong to? I looked at it as a “local” variable inside the class constructor. But it’s not. someTable variable is a public member of the MyActor class! It belongs to the class which is not serializable. (Side note: we don’t want Akka actors to be serializable beacuse it doesn’t make sense to send actors over the wire)

That explains everything. Spark needs to serialize the whole closure and the actor instance is a part of it. Let’s just put the whole logic inside a method. That makes all variables method-local causing that the actor doesn’t have to be serialized anymore.

import akka.actor._

class MyActor(ssc: StreamingContext) extends Actor {
  def init(): Unit = {
    // Create direct stream to Kafka ... the same code as before, only inside this methos
    val kafkaStream = ...
    ...
  }

  init()

  def receive = Actor.emptyBehavior
}

How simple. You’re welcome.