I am currently (still) seeking a job in data/software engineering area, and I am preparing for all kinds of technical interviews, ranging from coding, algorithm, system design, SQL to computer science fundamental quiz. Data engineer is a role with vague definition, and people with this title functions as an ETL (extract, transformation, load) engineer in some companies. Thus, topics on data transformation could be covered during the interview. In this blog, I am trying to hack interview focusing on data tranformation.

Prerequisites: boto3

Boto is the AWS SDK for python, which provides easy-to-use, object-oriented API and low-level access to AWS services. We could find the documentation here.

We could easily install the latest Boto 3 release via pip:

1
$ pip install boto3

Then configure the credential file at ~/.aws/credentials:

1
2
3
[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

And set the default region at ~/.aws/config:

1
2
[default]
region=us-east-1

To use boto3, we could import it and pass the service-to-use to it. For example, to use Amazon S3, we could choose a resource by:

1
2
import boto3
s3 = boto3.resource('s3')

Create a s3 bucket over client level:

1
2
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket='BUCKET_NAME')

List existing buckets for the AWS account:

1
2
3
response = s3_client.list_buckets()
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Upload file into a s3 bucket:

1
response = s3_client.upload_file(file_name, bucket, object_name)

Or upload object in binary mode into a s3 bucket:

1
2
with open('test.jpg', 'rb') as f:
    s3.upload_fileobj(f, "BUCKET_NAME", "OBJECT_NAME")

Or directly put object into a s3 bucket over the bucket level:

1
2
3
data = open('test.jpg', 'rb')
bucket = s3.Bucket('my-bucket')
bucket.put_object(Key=object_name, Body=data)

To download file from a S3 bucket, we could:

1
s3.download_file('BUCKET_NAME', 'OBJECT_NAME', 'FILE_NAME')

Or:

1
2
with open('FILE_NAME', 'wb') as f:
    s3.download_fileobj('BUCKET_NAME', 'OBJECT_NAME', f)

We could also set configuration when uploading, downloading, or copying a file or S3 object by:

1
2
3
4
5
6
from boto3.s3.transfer import TransferConfig
GB = 1024 ** 3
config = TransferConfig(multipart_threshold=5*GB)
config = TransferConfig(max_concurrency=5)
config = TransferConfig(use_threads=False)
s3.upload_file('FILE_NAME', 'BUCKET_NAME', 'OBJECT_NAME', Config=config)

Prerequisites: psycopg2

Psycopg2 is a popular PostgreSQL database adapter for python. We could find documentation here.

We could easily install psycopg2 via pip:

1
$ pip install psycopg2-binary

To use psycopg2, we could import it, connect to an existing database and open a cursor to perform database operations:

1
2
3
import psycopg2
conn = psycopg2.connect(dbname="DATABASE_NAME", user="USER", host="HOST", port="PORT", password="PASSWORD")
cur = conn.cursor()

Query the database and fetch data in an iteration-like way:

1
2
3
4
cur.execute(sql_query)
cur.fetchone()
cur.fetchmany(2)
cur.fetchall()

Pass parameters to SQL queries:

1
2
3
4
5
cur.execute("""
    INSERT INTO some_table (an_int, a_date, another_date, a_string)
    VALUES (%(int)s, %(date)s, %(date)s, %(str)s);
    """,
    {'int': 10, 'str': "O'Reilly", 'date': datetime.date(2005, 11, 18)})

Make the changes to the database persistent:

1
conn.commit()

Close communication with the database

1
2
cur.close()
conn.close()

Upload data from S3 into Redshift

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def main():
    connection_parameters = {
        'dbname': dbname,
        'user': user,
        'host': host,
        'port': port,
        'password': password
    }
    try:
        conn = psycopg2.connect(**connection_parameters)
        print("Connected to Redshift.")
    except:
        print("Unable to connect to Redshift.")

    cur = conn.cursor()

    upload_statement = "
            COPY {}.{}
            FROM {}
            IAM_ROLE {}
            CSV;
            COMMIT;
    ".format(table_name, schema, file_name, iam_role)

    try:
        cur.execute(upload_statement)
        print("Upload successfully")
    except psycopg2.Error:
        raise ExecuteFailure("Failed to upload the file.")
    cur.close()
    conn.close()

if __name__ == "__main__":
    main()

Load data from Redshift to pandas

1
2
3
4
5
import sqlalchemy as sa
import pandas as pd
surl = 'redshift+psycopg2://'
engine = sa.create_engine(surl+user+':'+password+'@'+host+':'+port+'/'+dbname,echo=False)
df = pd.read_sql_query('SELECT * FROM table ;', engine)