In this article, we are going to see how to reconcile the mismatching data in AWS DynamoDB and AWS OpenSearch (Elasticsearch) with the help of Amazon Redshift.
In case you are wondering how to migrate the DynamoDB data to Elasticsearch in the first place, then take a look at this article.
Note: This reconcile method is done using Amazon Redshift to find the mismatching data. You can use any other database but I am not sure if that will be as fast as Redshift.
Now let us see the steps that I used to reconcile these databases.
- Create tables in Redshift
- Migrate data with required fields to Redshift
- Update the Elasticsearch index with missing data
Create tables in Redshift
Create two tables named TABLE_ES and TABLE_DYNAMODB in redshift with the unique key of DynamoDB as field names. Assuming the partition key of DynamoDB as “id” and sort key as “data”.
create table TABLE_ES //or TABLE_DYNAMODB
(
pk bigint identity(1,1),
id bigint,
date datetime
)
Migrate data to Redshift
- Migrate the entire primary key fields of DynamoDB from DynamoDB to Amazon S3 as CSV using this article.
- Migrate the entire primary key fields of DynamoDB from Elasticsearch to Amazon S3 as CSV using this article.
After migrating the data to Amazon S3 then things are pretty much simple. Use the COPY command to load data to Amazon Redshift. For example,
copy TABLE_ES from 'S3_PATH'
iam_role 'IAM_ROLE'
IGNOREHEADER 1 null as 'NULL'
dateformat as 'YYYY-MM-DD HH:MI:SS'
csv
Now the data are loaded to redshift. Now it’s time to find out the missing data using the following query.
create table mismatch_data as
select id,date from TABLE_DYNAMODB
minus select id,date from TABLE_ES;
Now the mismatching data will be stored in the “mismatch_data” table.
Update the Elasticsearch index with missing data
Write a python script to store this data in a JSON file.
import psycopg2
import psycopg2.extras
import json
import boto3
redShiftDBConnection = psycopg2.connect(
dbname = '',
port = '',
user = '',
password = '',
host = ''
)
# s3 configurations
s3 = boto3.resource('s3')
bucketName = "BUCKET_NAME"
out_file = "demo/mismatch_data/data.json"
resultData = []
executeSP = """
select * from mismatch_data;
"""
cursor = redShiftDBConnection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
tableCreate = executeSP
cursor.execute(executeSP)
rows = cursor.fetchall()
for row in rows:
temp = {
"id" : row['id'],
"date" : row['date']
}
resultData.append(temp)
cursor.close()
final_data = {
"data" : resultData
}
# use this for storing in S3
# s3.Object(bucketName, out_file).put(Body = json.dumps(final_data))
redShiftDBConnection.close()
# storing in local machine
with open('data.json','w+') as file:
file.write(json.dumps(final_data))
Now the “data.json” file will have the entire mismatching data. Now In case if this is not a PROD region and you have required access to upload data to Elasticsearch and read access to DynamoDB table then you can directly start uploading data to Elasticsearch.
In case the data are huge and you are using AWS lambda, then split the file into multiple JSON files using this python script.
import json
import boto3
s3Client = boto3.resource('s3')
bucketName = 'BUCKET_NAME'
s3Folder = 'demo/data_{}.json'
with open('data.json','r') as file:
data = json.loads(file.read())
def divide_chunks(l, n):
# looping till length l
for i in range(0, len(l), n):
yield l[i:i + n]
n = 2000
data_list = list(divide_chunks(data['data'],n))
fileCount = 1
for item in data_list:
temp = {
"data" : item
}
s3Client.Object(bucketName, s3Folder.format(fileCount)).put(Body = json.dumps(temp,indent=4))
# use below command to store locally
# with open(f'data{fileCount}.json','w+') as temp_file:
# temp_file.write(json.dumps(temp))
fileCount += 1
# break
Now, we are at the final step. Upload the data to Elasticsearch!!
In case you are going to run this code on the local machine, ensure to execute the following commands.
pip install boto3
pip install requests-aws4auth
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
from requests_aws4auth import AWS4Auth
import logging
import sys
from boto3.dynamodb.types import TypeDeserializer
import decimal
import os
import uuid
deserializer = TypeDeserializer()
# sys.path.append('/opt/python/requests')
import requests
s3Client = boto3.client('s3')
# Get logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
tableName = "TABLE_NAME"
index_name = "INDEX_NAME"
Id = "id"
date = "date"
table = boto3.resource("dynamodb").Table(tableName)
my_session = boto3.session.Session()
region = my_session.region_name
service = "es"
credentials = my_session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
headers = { "Content-Type": "application/json" }
host = os.environ['HOST_NAME']
bulkUrl = host + "/_bulk"
s3_client = boto3.client('s3')
BUCKET = 'BUCKET_NAME'
KEY = 'PATH'
def lambda_handler(event, context):
isDataCompleted = True
nextContinuationToken = ""
total_count = 0
while isDataCompleted:
if nextContinuationToken == "":
response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=KEY)
else:
response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=KEY, ContinuationToken = nextContinuationToken)
if 'Contents' in response:
for object in response['Contents']:
print('inside key - ', object['Key'])
s3response = s3Client.get_object(
Bucket = BUCKET,
Key = object['Key']
)
data = json.loads(s3response['Body'].read())
document = ""
for item in data['data']:
key = int(item[Id])
sort_key = item[date]
response = table.query(KeyConditionExpression=Key(Id).eq(key) & Key(date).eq(sort_key))
if len(response["Items"]) > 0:
for item in response["Items"]:
header = {
"index": {
"_index": index_name,
"_id": str(item[Id]).replace('"',"").replace("'","")
}
}
document += json.dumps(header)+"\n"+json.dumps(item,cls=DecimalEncoder)+"\n"
total_count += len(data['data'])
if len(data['data']) > 0:
response = requests.post(bulkUrl,auth=awsauth, data = document,headers = headers)
if json.loads(response.text)['errors'] == True:
logger.error("Error in uploading document")
raise Exception(response.text)
else:
logger.info("Successfully uploaded data")
if response['IsTruncated'] == True:
nextContinuationToken = response["NextContinuationToken"]
else:
isDataCompleted = False
break
print("total count -",total_count)
return "Successfully synced"
# to handle decimal fields from dynamodb
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
if o % 1 == 0:
return int(o)
else:
return float(o)
return super(DecimalEncoder, self).default(o)
In case you are using lambda, add this layer to the function.
Happy Programming!!