Recursive Multiple Schema using Avro4s

Karan Gupta
5 min readOct 2, 2020

Today I came across a new Scala library for generating AVRO schema called avros4s. One of the cool things I found was it’s capability to manage recursive schemas. The library brings support of using our lovely case classes for generation of AVRO schema. Before I found out avro4s I was using another popular scala plugin called avrohugger. Avrohugger works great if you would like to generate your models as part of your compile task in sbt. However, this needed an .avsc file around in your classpath to generate the semi-scala like code in your target folder. Anyways, I like Avro4s because I don’t need to pre-write any schema files to generate my model so that I could start coding. In the following sections we will see how we can leverage recursive case classes to define an avro schema. After that, we will use the very schema for serialization and deserialization of the models or records.

Prerequisite

//build.sbt"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.0" 
"com.sksamuel.avro4s" %% "avro4s-json" % "4.0.0"

Create your recursive case class structure. Make sure your trait of sealed .

// IMPORTANT: sealed is necessary or won't work without you having to write your own Encoder/Decoder. sealed trait Foo 

case class Bar(i:Int) extends Foo

case class Baz(z: Int) extends Foo

case class Fighter(k: List[Foo]) extends Foo

Notice if you simply write the following to generate your schema using avro4s, you will get an error.

val fooSchema: Schema = AvroSchema[Foo]
Error:(115, 37) could not find implicit value for parameter schemaFor: com.sksamuel.avro4s.SchemaFor[Foo]

However executing the same code but for generating Bar schema, there is no such issue.

val barSchema: Schema = AvroSchema[Bar]

Rule: The reason this works is because Bar contains fields that are primitive types. avro4s knows how to convert scala primitives to avro primitives.

  • AvroSchema[Bar] works because of the Rule.
  • AvroSchema[Baz] works because of the Rule.
  • AvroSchema[Fighter] does not work
  • AvroSchema[Foo] does not work

AvroSchema[Foo|Fighter]

When we ask avro4s to generate a schema for Fighterit replies (if it were a person) …

hey, Fighter is defined like this Fighter(k: List[Foo]). I know how to create a schema for List[Int], List[String], List[Float], etc... but I am not sure about Foo. Foo is a trait (interface) that you defined. Foo can be so many things for me! If you can provide me a "list" of possible CONCRETE implementation of Foo then I can help you generate the schema.

In other words, instead of using an implicit schemaFor (which is only for primitive resolution), we need to supply our own.

val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)

All we need to do is defined SchemaFor for Foo and FighterfooSchemaFor and fighterSchemaFor respectively.

fooSchemaFor

//leaves of tree
val barSchema: Schema = AvroSchema[Bar]
val bazSchema: Schema = AvroSchema[Baz]
//
//create the record type for fighter schema with meta stuff
//avro record meta configuration:
//
//name is "Fighter"
//namespace is "foo.fighter"
//doc is "my cool fighter"
//
val fighterSchema: Schema = Schema.createRecord("Fighter", "my cool fighter", "foo.fighter", isError = false)
//
//Foo can be one of the three types, so its schema is going to be a
//union of barSchema, bazSchema, fighterSchema in avro sense
//
val fooSchema: Schema = Schema.createUnion(barSchema, bazSchema, fighterSchema)
val fooSchemaFor: SchemaFor[Foo] = SchemaFor(fooSchema)

Sweet! fooSchemaFor is all set.

fighterSchemaFor

//
//notice when we created fighterSchema above it was never given
//field information. let's set its field for k
//
fighterSchema.setFields(
Seq(

//k is an array of Foo elements, so create the field of array
//type that has items of schema fooSchema we defined above

new Schema.Field("k", Schema.createArray(fooSchema))

).asJava
)
val fighterSchemaFor: SchemaFor[Fighter] = SchemaFor(fighterSchema)

Sweet! fighterSchemaFor is all set.

Let’s test it out!

To test all of this, we can simply testing if serdes process works as expected. In other words:

  1. Print the schemas.
  2. Serialize: you can convert any implementation of the sealed trait Foo (Bar, Baz, Fighter) to an AVRO record with correct schema. case classavro.record
  3. Deserialize: you can convert an avro Record back to its respective case class (Bar, Baz, Fighter).

Print the Schema for Foo and Fighter

val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)
println(avroFooSchema)
val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)
println(avroFighterSchema)

Serialization and Deserialization

Let’s quickly create some helper functions to do these conversions.

// this function reads a set of Foo and write it out to 
// ByteArrayOutputStream. this stream essentially is your AVRO
// record with all the Foo elements
def getFooAvroOutputStream(data: Seq[Foo]): ByteArrayOutputStream = {
val stream = new ByteArrayOutputStream()
val os = AvroOutputStream.data[Foo].to(stream).build()
os.write(data)
os.flush()
os.close()
stream
}
// this function is intended to be used in conjunction with the
// above function to convert stream (avro) to Foo case class.
def getFooFromAvro(stream: ByteArrayOutputStream): List[Foo] = {
val schema = AvroSchema[Foo]
val inputStream = new ByteArrayInputStream(stream.toByteArray)
val is = AvroInputStream.data[Foo].from(inputStream).build(schema)
val data = is.iterator.toList
is.close()
data
}

Please note here I am using ByteArrayOutputStream to write out the avro record. If you wanted to output to a file as your output stream, you can useAvroOutputStream.data[Foo].to(new File("foo.avsc")).build() . If you don’t want to deal with a file, just use ByteArrayOutputStream.

//test data
val bar = Bar(5)
val baz = Baz(65)
val fighter = Fighter(k = bar :: baz :: Nil)

val data: List[Foo] = List(fighter, bar, baz)
//serialize case classes to avro byte stream stream
val outputStream = getFooAvroOutputStream(data)
//deserialize back to case classes from the stream
val readFromAvro = getFooFromAvro(outputStream)

println(readFromAvro)
//output: List(Fighter(List(Bar(5), Baz(65))), Bar(5), Baz(65))

Easier composition to/from Avro

To perform conversions, use the RecordFormat typeclass which converts to/from case classes and Avro records. Here Record is avro4s version of avro GenericRecord with SpecificRecord .

val format = RecordFormat[Foo]
val fighterRecord: Record = format.to(fighter)
val barRecord: Record = format.to(bar)

Final Code

--

--

Karan Gupta

Just a curious developer, a proud uncle, a weightlifter, & your neighborhood yogi.