Recursive Multiple Schema using Avro4s
Today I came across a new Scala library for generating AVRO schema called avros4s. One of the cool things I found was it’s capability to manage recursive schemas. The library brings support of using our lovely case classes
for generation of AVRO schema. Before I found out avro4s
I was using another popular scala plugin called avrohugger. Avrohugger works great if you would like to generate your models as part of your compile task in sbt. However, this needed an .avsc
file around in your classpath to generate the semi-scala like code in your target folder. Anyways, I like Avro4s because I don’t need to pre-write any schema files to generate my model so that I could start coding. In the following sections we will see how we can leverage recursive case classes to define an avro schema. After that, we will use the very schema for serialization and deserialization of the models or records.
Prerequisite
//build.sbt"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.0"
"com.sksamuel.avro4s" %% "avro4s-json" % "4.0.0"
Create your recursive case class structure. Make sure your trait of sealed
.
// IMPORTANT: sealed is necessary or won't work without you having to write your own Encoder/Decoder. sealed trait Foo
case class Bar(i:Int) extends Foo
case class Baz(z: Int) extends Foo
case class Fighter(k: List[Foo]) extends Foo
Notice if you simply write the following to generate your schema using avro4s, you will get an error.
val fooSchema: Schema = AvroSchema[Foo]
Error:(115, 37) could not find implicit value for parameter schemaFor: com.sksamuel.avro4s.SchemaFor[Foo]
However executing the same code but for generating Bar schema, there is no such issue.
val barSchema: Schema = AvroSchema[Bar]
Rule: The reason this works is because Bar
contains fields that are primitive types. avro4s
knows how to convert scala primitives to avro primitives.
AvroSchema[Bar]
works because of the Rule.AvroSchema[Baz]
works because of the Rule.AvroSchema[Fighter]
does not workAvroSchema[Foo]
does not work
AvroSchema[Foo|Fighter]
When we ask avro4s
to generate a schema for Fighter
it replies (if it were a person) …
hey, Fighter is defined like this
Fighter(k: List[Foo])
. I know how to create a schema forList[Int]
,List[String]
,List[Float]
, etc... but I am not sure about Foo. Foo is a trait (interface) that you defined. Foo can be so many things for me! If you can provide me a "list" of possible CONCRETE implementation of Foo then I can help you generate the schema.
In other words, instead of using an implicit schemaFor
(which is only for primitive resolution), we need to supply our own.
val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)
All we need to do is defined SchemaFor for Foo
and Fighter
— fooSchemaFor
and fighterSchemaFor
respectively.
fooSchemaFor
//leaves of tree
val barSchema: Schema = AvroSchema[Bar]
val bazSchema: Schema = AvroSchema[Baz]//
//create the record type for fighter schema with meta stuff
//avro record meta configuration:
//
//name is "Fighter"
//namespace is "foo.fighter"
//doc is "my cool fighter"
//
val fighterSchema: Schema = Schema.createRecord("Fighter", "my cool fighter", "foo.fighter", isError = false)//
//Foo can be one of the three types, so its schema is going to be a
//union of barSchema, bazSchema, fighterSchema in avro sense
//
val fooSchema: Schema = Schema.createUnion(barSchema, bazSchema, fighterSchema)val fooSchemaFor: SchemaFor[Foo] = SchemaFor(fooSchema)
Sweet! fooSchemaFor
is all set.
fighterSchemaFor
//
//notice when we created fighterSchema above it was never given
//field information. let's set its field for k
//
fighterSchema.setFields(
Seq(
//k is an array of Foo elements, so create the field of array
//type that has items of schema fooSchema we defined above
new Schema.Field("k", Schema.createArray(fooSchema))
).asJava
)val fighterSchemaFor: SchemaFor[Fighter] = SchemaFor(fighterSchema)
Sweet! fighterSchemaFor is all set.
Let’s test it out!
To test all of this, we can simply testing if serdes process works as expected. In other words:
- Print the schemas.
- Serialize: you can convert any implementation of the
sealed trait Foo
(Bar, Baz, Fighter) to an AVRO record with correct schema.case class
→avro.record
- Deserialize: you can convert an avro Record back to its respective
case class
(Bar, Baz, Fighter).
Print the Schema for Foo and Fighter
val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)println(avroFooSchema)
val avroFooSchema = AvroSchema[Foo](fooSchemaFor)
val avroFighterSchema = AvroSchema[Fighter](fighterSchemaFor)println(avroFighterSchema)
Serialization and Deserialization
Let’s quickly create some helper functions to do these conversions.
// this function reads a set of Foo and write it out to
// ByteArrayOutputStream. this stream essentially is your AVRO
// record with all the Foo elements
def getFooAvroOutputStream(data: Seq[Foo]): ByteArrayOutputStream = {
val stream = new ByteArrayOutputStream()
val os = AvroOutputStream.data[Foo].to(stream).build()
os.write(data)
os.flush()
os.close()
stream
}// this function is intended to be used in conjunction with the
// above function to convert stream (avro) to Foo case class.
def getFooFromAvro(stream: ByteArrayOutputStream): List[Foo] = {
val schema = AvroSchema[Foo]
val inputStream = new ByteArrayInputStream(stream.toByteArray)
val is = AvroInputStream.data[Foo].from(inputStream).build(schema)
val data = is.iterator.toList
is.close()
data
}
Please note here I am using
ByteArrayOutputStream
to write out the avro record. If you wanted to output to a file as your output stream, you can useAvroOutputStream.data[Foo].to(new File("foo.avsc")).build()
. If you don’t want to deal with a file, just useByteArrayOutputStream
.
//test data
val bar = Bar(5)
val baz = Baz(65)
val fighter = Fighter(k = bar :: baz :: Nil)
val data: List[Foo] = List(fighter, bar, baz)//serialize case classes to avro byte stream stream
val outputStream = getFooAvroOutputStream(data)//deserialize back to case classes from the stream
val readFromAvro = getFooFromAvro(outputStream)
println(readFromAvro)
//output: List(Fighter(List(Bar(5), Baz(65))), Bar(5), Baz(65))
Easier composition to/from Avro
To perform conversions, use the RecordFormat
typeclass which converts to/from case classes and Avro records. Here Record
is avro4s
version of avro GenericRecord
with SpecificRecord
.
val format = RecordFormat[Foo]
val fighterRecord: Record = format.to(fighter)
val barRecord: Record = format.to(bar)