Elasticsearch-py: error en la utilización de async_bulk, índice de la falta de acciones

0

Pregunta

En su docstring, elasticsearch.helpers.async_bulk se describe a sí mismo como un

Auxiliar para el :meth:~elasticsearch.AsyncElasticsearch.bulk api que proporciona una más humana y amigable interfaz que consume un iterador de acciones y envía a elasticsearch en trozos. fuente

Contexto

He estado usando AsyncElasticsearch.bulk() correctamente para enviar pandas dataframes para algunos ES la instancia

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
        yield (json.dumps(record, default=int))

async def send_to_elasticsearch(self, df: DataFrame):
    logger.info(f"{self.stage_name} sending batch to elastic")
    await self.elastic_client.bulk(self._rec_to_actions(df))

Problema

Sin embargo, cuando se trata de async_bulkEstoy obteniendo index is missing errores.

async def send_to_elasticsearch(self, df: DataFrame):
    await async_bulk(self.elastic_client, self._rec_to_actions(df))

Trató de sintonizar _rec_to_actions() en varias formas sin mucho efecto.

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
        record["index"] = self.index
        yield (json.dumps(record, default=int))

Supongo que el principal problema es que no estoy muy seguro de saber qué es una acción, en el contexto de elasticsearch. Esta noción está en todas partes en la documentación, pero no tiene una clara estructura de datos de la contraparte en esta biblioteca de código fuente (ninguno que he podido encontrar, de todos modos)

¿Qué es exactamente una acción y cómo debo ajustar mi generador para enviar datos del df self.index?

medio ambiente

  • python = "3.9.5"
  • elasticsearch = "7.14.1"
elasticsearch python
2021-11-18 16:22:30
1

Mejor respuesta

0

Esta documentación se hizo más fácil:

def _rec_to_actions(self, df):
    for record in df.to_dict(orient="records"):
       yield {"_index": self.index, "_source": json.dumps(record, default=int)}
2021-11-19 15:34:19

En otros idiomas

Esta página está en otros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Slovenský
..................................................................................................................