ETL – Complete guide to migrate an entire Elasticsearch index to Amazon S3 as CSV

If you are someone using Elasticsearch then you may come across a situation where you need to get the entire Elasticsearch data and store it somewhere for further validation. In this article, I will help you to migrate your Elasticsearch index to Amazon S3 as a CSV file.

In case you wonder why should you migrate an Elasticsearch index as Elasticsearch itself is a copy of some other database (in my case it was a copy of AWS DynamoDB)? Well, in my case it was to compare and check whether the data in AWS dynamodb and Elasticsearch was matching!

You can find the code on my GitHub Repository also.

In case you are going to run this code on the local machine, ensure to execute the following commands.

pip install boto3

pip install requests-aws4auth

Open the command prompt and type “aws configure” to set up your AWS credentials if not done.

Make sure to replace the hostname, index name, and columns.

import json
import boto3
from requests_aws4auth import AWS4Auth
import logging
import sys
import os

sys.path.append('/opt/python/requests')
import requests

# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# s3 configurations
s3 = boto3.resource('s3')
bucketName = "BUCKET_NAME"
s3Folder = "elasticsearch-sync/data_"

my_session = boto3.session.Session()
region = my_session.region_name 
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

headers = { "Content-Type": "application/json" }

# Get url from environment variables
host = "https://YOUR_ENDPOINT.ap-south-1.es.amazonaws.com"
searchUri = "/DEMO-TABLE/_search"
url = host + searchUri

def lambda_handler(event, context):
    total_count = 0
    next_partitionKey = None
    next_sortKey = None
    fileCount = 1
    column_names = "id,date\n"
    content = column_names
    while True:
        out_file = s3Folder + f"{fileCount}.csv"
        print("current file - ",out_file)
        print("search after - ",next_partitionKey,next_sortKey)
        payload = {
            "_source" : ["id","date"], #include fields that is required
            "size": 10000,
            "query": {
                "match_all": {}
            },
            "sort": [
                {
                    "id": "asc"
                },
                {
                    "date": "asc"
                }
            ],
            "search_after": [next_partitionKey,next_sortKey] #add keys whichever makes it unique. If it is dynamoDB table, then use the partition and sort key
        }

        response = requests.post(searchUri, auth=awsauth, data=json.dumps(payload), headers=headers)
        print(response.text)
        if json.loads(response.text)["data"] != None:
            data = eval(json.loads(response.text)["data"])
            next_partitionKey = data[len(data)-1]["id"]
            next_sortKey = data[len(data)-1]["date"]
            total_count += len(data)
            for item in data:
                content += f"{item['id']},{item['date']}\n"
            s3.Object(bucketName, out_file).put(Body = content)
            del content
            del data
            del response
            content = column_names
            fileCount += 1
        else:
            print("total_count ", total_count)
            break

lambda_handler("","") #remove this line if you are using in AWS lambda

In case you want to deploy it on your AWS lambda, remove the final line of the code and add the aws4auth layer.

Happy programming!!

1 thought on “ETL – Complete guide to migrate an entire Elasticsearch index to Amazon S3 as CSV”

Leave a Comment