En su docstring, elasticsearch.helpers.async_bulk
se describe a sí mismo como un
Auxiliar para el :meth:
~elasticsearch.AsyncElasticsearch.bulk
api que proporciona una más humana y amigable interfaz que consume un iterador de acciones y envía a elasticsearch en trozos. fuente
Contexto
He estado usando AsyncElasticsearch.bulk()
correctamente para enviar pandas dataframes para algunos ES la instancia
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Problema
Sin embargo, cuando se trata de async_bulk
Estoy obteniendo index is missing
errores.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Trató de sintonizar _rec_to_actions()
en varias formas sin mucho efecto.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Supongo que el principal problema es que no estoy muy seguro de saber qué es una acción, en el contexto de elasticsearch. Esta noción está en todas partes en la documentación, pero no tiene una clara estructura de datos de la contraparte en esta biblioteca de código fuente (ninguno que he podido encontrar, de todos modos)
¿Qué es exactamente una acción y cómo debo ajustar mi generador para enviar datos del df self.index
?
medio ambiente
- python = "3.9.5"
- elasticsearch = "7.14.1"