Estoy tratando de escribir un fragmento de código que hace la siguiente:-
- Se lee en un gran archivo csv desde el remoto origen como s3.
- Procesar el archivo de registro por registro.
- Enviar una notificación al usuario
- Escribir el resultado en una ubicación remota
Ejemplo de un registro de entrada csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Mi entrada de caso de la clase que representa un récord en la entrada de csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Ejemplo de un registro de salida csv (que debe ser por escrito):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Mi salida de caso de la clase que representa un récord en la entrada de csv:
case class OutputRecord(recordId: String, name: String, designation: String)
La lectura de un registro mediante akka flujo de csv (usa Alpakka reactiva s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Ahora tengo una función para procesar los registros:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Función para escribir el OutputRecord como csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Función para enviar la notificación por correo electrónico:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Costuras todo junto
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
En la Línea 15 y 16 estoy recibiendo un error, soy capaz de agregar una Línea de 15 o 16, pero no tanto ya que ambos notify
& writeOutput
necesidades outputRecord
. Una vez que notificar se llama voy a perder mi outputRecord
.
Hay una manera que puedo añadir tanto notify
y writeOutput
a la misma gráfica?
No estoy buscando para la ejecución en paralelo como quiero a la primera llamada notify
y, a continuación, sólo writeOutput
. Así que esto no es de ayuda: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
El caso de uso que parece muy simple para mí, pero de alguna forma yo no soy capaz de encontrar una solución limpia.