I have a Kinesis cluster that's pushing data into Amazon Redshift via Lambda.
Currently my lambda code looks something like this:
client = boto3.client('redshift-data')
for tx in txs:
query = # prepare an INSERT query here
resp = client.execute_statement(
ClusterIdentifier=redshift_cluster_id,
Database=redshift_db,
DbUser=redshift_user,
Sql=query
)
The trouble is that as soon as I try to scale up kinesis (more shards) or lambda (concurrent procesing from a single shard) - I get this:
[ERROR] ActiveStatementsExceededException: An error occurred (ActiveStatementsExceededException) when calling the ExecuteStatement operation: Active statements exceeded the allowed quota (200).
Traceback (most recent call last):
File "/opt/python/lib/python3.8/site-packages/codeguru_profiler_agent/aws_lambda/profiler_decorator.py", line 52, in profiler_decorate
return function(event, context)
File "/opt/python/lib/python3.8/site-packages/codeguru_profiler_agent/aws_lambda/lambda_handler.py", line 91, in call_handler
return handler_function(event, context)
File "/var/task/lambda_function.py", line 71, in lambda_handler
resp = client.execute_statement(
File "/var/runtime/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 705, in _make_api_call
raise error_class(parsed_response, operation_name)
From AWS docs I gather this means I'm trying to run too many execute_statement
s in parallel.
How do I get around this? Is the only way to work with Redshift by batching records and inserting them all together?
CodePudding user response:
The comment in your code gives me pause - "query = # prepare an INSERT query here". This seems to imply that you are reading the S3 data into Lambda and INSERTing the this data into Redshift. If so this is not a good pattern.
First off Redshift expects data to be brought into the cluster through COPY (or Spectrum or ...) but not through INSERT. This will create issues in Redshift with managing the transactions and create a tremendous waste or disk space / need for VACUUM. The INSERT approach for putting data in Redshift is an anti-pattern and shouldn't be done for even moderate sizes of data.
More generally the concern is the data movement impedance mismatch. Kinesis is lots of independent streams of data and code generating small files. Redshift is a massive database that works on large data segments. Mismatching these tools in a way that misses their designed targets will make either of them perform very poorly. You need to match the data requirement by batching up S3 into Redshift. This means COPYing many S3 files into Redshift in a single COPY command. This can be done with manifests or by "directory" structure in S3. "COPY everything from S3 path ..." This process of COPYing data into Redshift can be run every time interval (2 or 5 or 10 minutes). So you want your Kinesis Lambdas to organize the data in S3 (or add to a manifest) so that a "batch" of S3 files can be collected up for a COPY execution. This way a large number of S3 files can be brought into Redshift at once (its preferred data size) and will also greatly reduce your execute API calls.
Now if you have a very large Kinesis pipe set up and the data is very large there is another data movement "preference" to take into account. This only matters when you are moving a lot of data per minute. This extra preference is for S3. S3 being an object store means that there is a significant amount of time taken up by "looking up" a requested object key. It is about .5 sec. So reading a thousand S3 objects will take 500 require (in total) 500 seconds of key lookup time. Redshift will make requests to S3 in parallel, one per slice in the cluster, so some of this time is in parallel. If the files being read are 1KB in size the data transfer of the data, after S3 lookup is complete, will be about 1.25 sec. total. Again this time is in parallel but you can see how much time is spent in lookup vs. transfer. To get the maximum bandwidth out of S3 for reading many files, these files need to be 1GB in size (100MB is ok in my experience). You can see if you are to ingest millions of files per minute from Kinesis into Redshift you will need a process to combine many small files into bigger files to avoid this hazard of S3. Since you are using Lambda as your Kinesis reader I expect that you aren't to this data rate yet but it is good to have your eyes on this issue if you expect to expand to a very large scale.
Just because tools have high bandwidth doesn't mean that they can be piped together. Bandwidth comes in many styles.