Data science and analytics
Scala Symposium: Big Data Pipeline Powered by Scala
val pipelineStages = List( new AddRowKeyStage(EvergreenSchema), new WriteToHBaseForLanding(hBaseCatalog), new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols), new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols), new DataTypeValidatorStage(EvergreenSchema), new DataTypeCastStage(sourceRawDf.schema, EvergreenSchema) )
import com.thoughtworks.awayday.ingest.models.ErrorModels.DataError import org.apache.spark.sql.{DataFrame, Dataset} trait DataStage[T < : Dataset[_]] extends Serializable { def apply(errors: Dataset[DataError], dataRecords: T): (Dataset[DataError], DataFrame) def stage: String }
case class DataError(rowKey: String, stage: String, fieldName: String, fieldValue: String, error: String, severity: String, addlInfo: String = "")
import com.thoughtworks.awayday.ingest.DataFrameOps import com.thoughtworks.awayday.ingest.UDFs.generateUUID import com.thoughtworks.awayday.ingest.models.ErrorModels.DataError import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession} class AddRowKeyStage(schemaWithRowKey: StructType)(implicit spark: SparkSession, encoder: Encoder[DataError]) extends DataStage[DataFrame] { override val stage: String = getClass.getSimpleName def apply(errors: Dataset[DataError], dataRecords: DataFrame): (Dataset[DataError],DataFrame) = addRowKeys(errors, dataRecords) def addRowKeys(errors: Dataset[DataError], data: DataFrame): (Dataset[DataError],DataFrame) = { val colOrder = schemaWithRowKey.fields.map(_.name) val returnDf = data.withColumn(RowKey, lit(generateUUID())).select(colOrder.map(col): _*) (errors.union(DataFrameOps.emptyErrorStream(spark)), returnDf) } }
val (initErr, initDf) = (DataFrameOps.emptyErrorStream(spark), sourceRawDf) val validRecords = pipelineStages.foldLeft((initErr,initDf)) { case ((err, df), stage) => stage(err, df) }
def getCurrentTemperature():Future[Double] = ??? //1 def getTomorrowsTempFromPredictionAPI(curr: Double): Future[Double] = ??? //2 def publishItInOurWebsite(pred: Double):Future[Double] = ??? //3
val published2:Future[Double] = for { curr < - getCurrentTemperature() pred < - getTomorrowsTempFromPredictionAPI(curr) pubw < - publishItInOurWebsite(pred) } yield pubw
def getCurrentTemperatureW(): Writer[List[String], Double] = { Writer(List("Thermometer isn't broken yet"), 10.0) } def getTomorrowsTempFromPredictionAPIW(curr: Double): Writer[List[String], Double] = { Writer(List("Yay, the Prediction API works too"), 20.0) } def publishItInOurWebsiteW(pred: Double): Writer[List[String], Double] = { Writer(List("Published to our website"), 20.0) }
val publishedWriter: Writer[List[String], Double] = for { curr < - getCurrentTemperatureW() pred < - getTomorrowsTempFromPredictionAPIW(curr) pubw < - publishItInOurWebsiteW(pred) } yield pubw
val (logs, value) = publishedWriter.run logs.foreach(println) println (value)
Thermometer isn't broken yet Yay, the Prediction API works too Published to our website 20.0
def flatMap[U](f: V => WriterT[F, L, U])(implicit flatMapF: FlatMap[F], semigroupL: Semigroup[L]): WriterT[F, L, U] = WriterT { flatMapF.flatMap(run) { lv => flatMapF.map(f(lv._2).run) { lv2 => (semigroupL.combine(lv._1, lv2._1), lv2._2) } } }
object DataFrameOps { ... ... implicit val dataFrameSemigroup: Semigroup[Dataset[_]] = new Semigroup[Dataset[_]] { override def combine(x: Dataset[_], y: Dataset[_]): Dataset[_] = x.union(y) } }
type DataSetWithErrors[A] = Writer[Dataset[DataError], A] trait DataStage[T < : Dataset[_]] extends Serializable { def apply(data: T): DataSetWithErrors[T] def stage: String }
import cats.data.Writer import com.thoughtworks.awayday.ingest.UDFs._ import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError, DataSetWithErrors} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import StageConstants._ class DataTypeValidatorStage(schema: StructType)(implicit val spark: SparkSession) extends DataStage[DataFrame] { override val stage = getClass.getSimpleName def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] = validateTypes(dataRecords) def validateTypes(data: DataFrame): DataSetWithErrors[DataFrame] = { val withErrorsDF = data.withColumn(RowLevelErrorListCol, validateRowUDF(schema, stage)(struct(data.columns.map(data(_)): _*))) import spark.implicits._ val errorRecords = withErrorsDF .select(RowLevelErrorListCol) .select(explode(col(RowLevelErrorListCol))) .select("col.*") .map(row = > DataError(row)) Writer(errorRecords, withErrorsDF.drop(RowLevelErrorListCol)) } }
import DataFrameOps._ val initDf = Writer(DataFrameOps.emptyErrorStream(spark), sourceRawDf) val validRecords = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) => for { df < - dfWithErrors applied < - stage.apply(df) } yield applied }
val (errors, processedDf) = validRecords.run val query = processedDf .writeStream .format("console") .outputMode(OutputMode.Append()) .start()
org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;;
val emptyErrorStream = (spark:SparkSession) => { implicit val sqlC = spark.sqlContext MemoryStream[DataError].toDS() } Note: For Data type specific row-level error handling for CSV and JSON, you could optionally consider using the ["mode"] option of the DataFrameReader.
Disclaimer: The statements and opinions expressed in this article are those of the author(s) and do not necessarily reflect the positions of Thoughtworks.