If you are someone using Elasticsearch then you may come across a situation where you need to get the entire Elasticsearch data and store it somewhere for further validation. In this article, I will help you to migrate your Elasticsearch index to Amazon S3 as a CSV file.
In case you wonder why should you migrate an Elasticsearch index as Elasticsearch itself is a copy of some other database (in my case it was a copy of AWS DynamoDB)? Well, in my case it was to compare and check whether the data in AWS dynamodb and Elasticsearch was matching!
You can find the code on my GitHub Repository also.
In case you are going to run this code on the local machine, ensure to execute the following commands.
pip install boto3
pip install requests-aws4auth
Open the command prompt and type “aws configure” to set up your AWS credentials if not done.
Make sure to replace the hostname, index name, and columns.
import json
import boto3
from requests_aws4auth import AWS4Auth
import logging
import sys
import os
sys.path.append('/opt/python/requests')
import requests
# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# s3 configurations
s3 = boto3.resource('s3')
bucketName = "BUCKET_NAME"
s3Folder = "elasticsearch-sync/data_"
my_session = boto3.session.Session()
region = my_session.region_name
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = { "Content-Type": "application/json" }
# Get url from environment variables
host = "https://YOUR_ENDPOINT.ap-south-1.es.amazonaws.com"
searchUri = "/DEMO-TABLE/_search"
url = host + searchUri
def lambda_handler(event, context):
total_count = 0
next_partitionKey = None
next_sortKey = None
fileCount = 1
column_names = "id,date\n"
content = column_names
while True:
out_file = s3Folder + f"{fileCount}.csv"
print("current file - ",out_file)
print("search after - ",next_partitionKey,next_sortKey)
payload = {
"_source" : ["id","date"], #include fields that is required
"size": 10000,
"query": {
"match_all": {}
},
"sort": [
{
"id": "asc"
},
{
"date": "asc"
}
],
"search_after": [next_partitionKey,next_sortKey] #add keys whichever makes it unique. If it is dynamoDB table, then use the partition and sort key
}
response = requests.post(searchUri, auth=awsauth, data=json.dumps(payload), headers=headers)
print(response.text)
if json.loads(response.text)["data"] != None:
data = eval(json.loads(response.text)["data"])
next_partitionKey = data[len(data)-1]["id"]
next_sortKey = data[len(data)-1]["date"]
total_count += len(data)
for item in data:
content += f"{item['id']},{item['date']}\n"
s3.Object(bucketName, out_file).put(Body = content)
del content
del data
del response
content = column_names
fileCount += 1
else:
print("total_count ", total_count)
break
lambda_handler("","") #remove this line if you are using in AWS lambda
In case you want to deploy it on your AWS lambda, remove the final line of the code and add the aws4auth layer.
Happy programming!!
1 thought on “ETL – Complete guide to migrate an entire Elasticsearch index to Amazon S3 as CSV”