Akka flujo de Entrada ("In") como de Salida ("Out")

0

Pregunta

Estoy tratando de escribir un fragmento de código que hace la siguiente:-

  1. Se lee en un gran archivo csv desde el remoto origen como s3.
  2. Procesar el archivo de registro por registro.
  3. Enviar una notificación al usuario
  4. Escribir el resultado en una ubicación remota

Ejemplo de un registro de entrada csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Mi entrada de caso de la clase que representa un récord en la entrada de csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Ejemplo de un registro de salida csv (que debe ser por escrito):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Mi salida de caso de la clase que representa un récord en la entrada de csv:

case class OutputRecord(recordId: String, name: String, designation: String)

La lectura de un registro mediante akka flujo de csv (usa Alpakka reactiva s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Ahora tengo una función para procesar los registros:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Función para escribir el OutputRecord como csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Función para enviar la notificación por correo electrónico:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Costuras todo junto

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

En la Línea 15 y 16 estoy recibiendo un error, soy capaz de agregar una Línea de 15 o 16, pero no tanto ya que ambos notify & writeOutput necesidades outputRecord. Una vez que notificar se llama voy a perder mi outputRecord.

Hay una manera que puedo añadir tanto notify y writeOutput a la misma gráfica?

No estoy buscando para la ejecución en paralelo como quiero a la primera llamada notify y, a continuación, sólo writeOutput. Así que esto no es de ayuda: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

El caso de uso que parece muy simple para mí, pero de alguna forma yo no soy capaz de encontrar una solución limpia.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Mejor respuesta

1

La salida de notify es un PushResultpero la entrada de writeOutput es ByteString. Una vez que el cambio que se va a compilar. En caso de que usted necesita ByteString, obtener el mismo de OutputRecord.

Por CIERTO, en el código de ejemplo que usted ha proporcionado, un error similar existe en readCSV y process.

2021-11-24 03:36:16

En otros idiomas

Esta página está en otros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Slovenský
..................................................................................................................