Kinesis Firehose lambda transformación se ha producido Un error durante la serialización JSON de respuesta

0

Pregunta

Yo estoy usando el siguiente código en Python (Función Lambda) para la transformación de datos utilizando kinesis datos firehose. Estoy recibiendo por debajo de error.

Código:

#This function is created to Transform the data from Kinesis Data Firehose -> S3 Bucket
#It converts single line json to multi line json as expected by AWS Athena best practice.
#It also removes special characters from json keys (column name in Athena) as Athena expects column names without special characters

import json
import boto3
import base64
import string
from typing import Optional, Iterable, Union

delete_dict = {sp_character: '' for sp_character in string.punctuation}
PUNCT_TABLE = str.maketrans(delete_dict)
output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        
        remove_special_char = json.loads(payload, object_pairs_hook=clean_keys)
        row_w_newline = str(remove_special_char) + "\n"
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}
    
def strip_punctuation(s: str,
                      exclude_chars: Optional[Union[str, Iterable]] = None) -> str:
    """
    Remove punctuation and spaces from a string.

    If `exclude_chars` is passed, certain characters will not be removed
    from the string.

    """
    punct_table = PUNCT_TABLE.copy()
    if exclude_chars:
        for char in exclude_chars:
            punct_table.pop(ord(char), None)

    # Next, remove the desired punctuation from the string
    return s.translate(punct_table)

def clean_keys(o):
    return {strip_punctuation(k): v for k, v in o}

Error:

An error occurred during JSON serialization of response: b'eyd2ZXJzaW9uJzogJzAnLCAnaWQnOiAnNjFhMGI4YjQtOGRhYS0xNGMwLTllOTMtNzhhNjk0MTY0MDgxJywgJ2RldGFpbHR5cGUnOiAnQVdTIEFQSSBDYWxsIHZpYSBDbG91ZFRyYWlsJywgJ3NvdXJjZSc6ICdhd3Muc2VjdXJpdHlodWInLCAnYWNjb3VudCc6ICc5MzQ3NTU5ODkxNzYnLCAndGltZSc6ICcyMDIxLTExLTIzVDE1OjQxOjQ3WicsICdyZWdpb24nOiAndXMtZWFzdC0xJywgJ3Jlc291cmNlcyc6IFtdLCAnZGV0YWlsJzogeydldmVudFZlcnNpb24nOiAnMS4wOCcsICd1c2VySWRlbnRpdH'
     is not JSON serializable
    Traceback (most recent call last):
      File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
        **kw).encode(obj)
      File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/var/runtime/bootstrap.py", line 135, in decimal_serializer
        raise TypeError(repr(o) + " is not JSON serializable")

Evento :

{'recordId': '49623720050963652954313901532126731765249603147428528130000000', 'approximateArrivalTimestamp': 1637711607661, 'data': 'eyJ2ZXJzaW9uIjoiMCIsImlkIjoiMzFkOGE3MmItYWUxNC02ZDYzLWRjODUtMTZmNWViMzk3ZTAyIiwiZGV0YWlsLXR5cGUiOiJBV1MgQVBJIENhbGwgdmlhIENsb3VkVHJhaWwiLCJzb3VyY2UiOiJhd3Muc2VjdXJpdHlodWIiLCJhY2NvdW50IjoiMjIwMzA3MjAyMzYyIiwidGltZSI6IjIwMjEtMTEtMjNUMjM6NTM6MTdaIiwicmVnaW9uIjoidXMtd2VzdC0yIiwicmVzb3VyY2VzIjpbXSwiZGV0YWlsIjp7ImV2ZW50VmVyc2lvbiI6IjEuMDgiLCJ1c2VySWRlbnRpdHkiOnsidHlwZSI6IlJvb3QiLCJwcmluY2lwYWxJZCI6IjIyMDMwNzIwMjM2MiIsImFybiI6ImFybjphd3M6aWFtOjoyMjAzMDcyMDIzNjI6cm9vdCIsImFjY291bnRJZCI6IjIyMDMwNzIwMjM2MiIsImFjY2Vzc0tleUlkIjoiQVNJQVRHUzJWRUU1TEQ2TUZFRlYiLCJzZXNzaW9uQ29udGV4dCI6eyJzZXNzaW9uSXNzdWVyIjp7fSwid2ViSWRGZWRlcmF0aW9uRGF0YSI6e30sImF0dHJpYnV0ZXMiOnsiY3JlYXRpb25EYXRlIjoiMjAyMS0xMS0yM1QxNToxMDo1N1oiLCJtZmFBdXRoZW50aWNhdGVkIjoiZmFsc2UifX19LCJldmVudFRpbWUiOiIyMDIxLTExLTIzVDIzOjUzOjE3WiIsImV2ZW50U291cmNlIjoic2VjdXJpdHlodWIuYW1hem9uYXdzLmNvbSIsImV2ZW50TmFtZSI6IkJhdGNoRGlzYWJsZVN0YW5kYXJkcyIsImF3c1JlZ2lvbiI6InVzLXdlc3QtMiIsInNvdXJjZUlQQWRkcmVzcyI6IjEwNC4xMjkuMTk4LjEwMSIsInVzZXJBZ2VudCI6ImF3cy1pbnRlcm5hbC8zIGF3cy1zZGstamF2YS8xLjEyLjExMiBMaW51eC81LjQuMTU2LTk0LjI3My5hbXpuMmludC54ODZfNjQgT3BlbkpES182NC1CaXRfU2VydmVyX1ZNLzI1LjMxMi1iMDcgamF2YS8xLjguMF8zMTIgdmVuZG9yL09yYWNsZV9Db3Jwb3JhdGlvbiBjZmcvcmV0cnktbW9kZS9zdGFuZGFyZCIsInJlcXVlc3RQYXJhbWV0ZXJzIjp7IlN0YW5kYXJkc1N1YnNjcmlwdGlvbkFybnMiOlsiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6MjIwMzA3MjAyMzYyOnN1YnNjcmlwdGlvbi9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiXX0sInJlc3BvbnNlRWxlbWVudHMiOnsiU3RhbmRhcmRzU3Vic2NyaXB0aW9ucyI6W3siU3RhbmRhcmRzQXJuIjoiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6OnN0YW5kYXJkcy9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiLCJTdGFuZGFyZHNJbnB1dCI6e30sIlN0YW5kYXJkc1N0YXR1cyI6IkRFTEVUSU5HIiwiU3RhbmRhcmRzU3Vic2NyaXB0aW9uQXJuIjoiYXJuOmF3czpzZWN1cml0eWh1Yjp1cy13ZXN0LTI6MjIwMzA3MjAyMzYyOnN1YnNjcmlwdGlvbi9hd3MtZm91bmRhdGlvbmFsLXNlY3VyaXR5LWJlc3QtcHJhY3RpY2VzL3YvMS4wLjAiLCJTdGFuZGFyZHNTdGF0dXNSZWFzb24iOnsiU3RhdHVzUmVhc29uQ29kZSI6Ik5PX0FWQUlMQUJMRV9DT05GSUdVUkFUSU9OX1JFQ09SREVSIn19XX0sInJlcXVlc3RJRCI6IjcyYzVjODYyLTJmOWEtNDBjYS05NDExLTY2YzIxMTcyNjIxMCIsImV2ZW50SUQiOiI3YWY4NjFiZS03YjExLTRmOTQtOWZlYS0yYTgyZjg5NDIxNWYiLCJyZWFkT25seSI6ZmFsc2UsImV2ZW50VHlwZSI6IkF3c0FwaUNhbGwiLCJtYW5hZ2VtZW50RXZlbnQiOnRydWUsInJlY2lwaWVudEFjY291bnRJZCI6IjIyMDMwNzIwMjM2MiIsImV2ZW50Q2F0ZWdvcnkiOiJNYW5hZ2VtZW50In19'}
1

Mejor respuesta

0

Este código me ayudó con el problema anterior

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        remove_special_char = json.loads(payload, object_pairs_hook=clean_keys)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(remove_special_char).encode('utf-8') + b'\n').decode('utf-8')
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    return {'records': output}
2021-11-24 21:26:32

En otros idiomas

Esta página está en otros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Slovenský
..................................................................................................................