Ceph, AWS S3, and Multipart uploads using Python

Summary

In this article the following will be demonstrated:

  • Ceph Nano – As the back end storage and S3 interface
  • Python script to use the S3 API to multipart upload a file to the Ceph Nano using Python multi-threading

Introduction

Caph Nano is a Docker container providing basic Ceph services (mainly Ceph Monitor, Ceph MGR, Ceph OSD for managing the Container Storage and a RADOS Gateway to provide the S3 API interface). It also provides Web UI interface to view and manage buckets.

Multipart uploads is a feature in HTTP/1.1 protocol that allow download/upload of range of bytes in a file. For example, a 200 MB file can be downloaded in 2 rounds, first round can 50% of the file (byte 0 to 104857600) and then download the remaining 50% starting from byte 104857601 in the second round.

The Details

First Docker must be installed in local system, then download the Ceph Nano CLI using:

$ curl -L https://github.com/ceph/cn/releases/download/v2.3.1/cn-v2.3.1-linux-amd64 -o cn && chmod +x cn

This will install the binary cn version 2.3.1 in local folder and turn it executable.

To start the Ceph Nano cluster (container), run the following command:

$ ./cn cluster start ceph
2019/12/03 11:59:12 Starting cluster ceph...

Endpoint: http://166.87.163.10:8000
Dashboard: http://166.87.163.10:5000
Access key: 90WFLFQNZQ452XXI6851
Secret key: ISmL6Ru3I3MDiFwZITPCu8b1tL3BWyPDAmLoF0ZP
Working directory: /usr/share/ceph-nano

This will download the Ceph Nano image and run it as a Docker container. Web UI can be accessed on http://166.87.163.10:5000, API end point is at http://166.87.163.10:8000.

We can verify that using:

$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                            NAMES
0ba17ec716d3        ceph/daemon         "/opt/ceph-contain..."   4 weeks ago         Up 26 seconds       0.0.0.0:5000->5000/tcp, 0.0.0.0:8000->8000/tcp   ceph-nano-ceph

Of course this is for demonstration purpose, the container here is created 4 weeks ago.

It can be accessed with the name ceph-nano-ceph using the command

$ docker exec -it ceph-nano-ceph /bin/bash

Which will drop me in a BASH shell inside the Ceph Nano container.

To examine the running processes inside the container:

[root@ceph-nano-ceph-faa32aebf00b /]# ps -ef
UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  0 08:59 ?        00:00:00 /bin/bash /opt/ceph-container/bin/entrypoint.sh
ceph       113     1  0 08:59 ?        00:00:43 /usr/bin/ceph-mon --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b --mon-data /var/lib/ceph/mo
ceph       194     1  1 08:59 ?        00:02:08 ceph-mgr --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b
ceph       240     1  0 08:59 ?        00:00:29 ceph-osd --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i 0
ceph       451     1  0 08:59 ?        00:00:17 radosgw --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -n client.rgw.ceph-nano-ceph-faa32aebf00b -k /var/lib/ceph/radosgw/c
root       457     1  0 08:59 ?        00:00:02 python app.py
root       461     1  0 08:59 ?        00:00:00 /usr/bin/python2.7 /usr/bin/ceph --cluster ceph -w
root      1093     0  0 11:02 ?        00:00:00 /bin/bash
root      1111  1093  0 11:03 ?        00:00:00 ps -ef

The first thing I need to do is to create a bucket, so when inside the Ceph Nano container I use the following command:

# s3cmd mb s3://nano

Which will create a Bucket called nano.

Now to create a user on the Ceph Nano cluster to access the S3 buckets. So here I created a user called test, with access and secret keys set to test.

$ radosgw-admin user create --uid=test --access-key=test --secret=test --display-name test

To upload a test file for testing:

# dd if=/dev/zero of=./zeros bs=15M count=1
# s3cmd put ./zeros s3://nano

And list the file in the bucket:

# s3cmd ls s3://nano
2019-10-29 11:58  15728640   s3://nano/zeors

The Python code

#!/usr/bin/env python
#
# Copyright (c) 2019 Tamer Embaby <tamer@redhat.com>
# All rights reserved.
#
# Main reference is: https://stackoverflow.com/questions/34303775/complete-a-multipart-upload-with-boto3
# Good code, but it will take too much time to complete especially for thread synchronization. (DONE)
#
# TODO: 
#       - Check return code of function calls everywhere.
#       - Use logging instead of print's everywhere.
#       - Address the XXX and FIXME's in the code
#

import boto3
import sys, os
import threading
import logging

b3_client = None
b3_s3 = None
mpu = None              # Multipart upload handle

#
# Thread (safe) function responsible of uploading a part of the file
#
def upload_part_r(partid, part_start, part_end, thr_args):
        filename = thr_args['FileName']
        bucket = thr_args['BucketName']
        upload_id = thr_args['UploadId']

        logging.info("%d: >> Uploading part %d", partid, partid)
        logging.info("%d: --> Upload starts at byte %d", partid, part_start)
        logging.info("%d: --> Upload ends at byte %d", partid, part_end)

        f = open(filename, "rb")
        logging.info("%d: DEBUG: Seeking offset: %d", partid, part_start)
        logging.info("%d: DEBUG: Reading size: %d", partid, part_end - part_start)
        f.seek(part_start, 0)
        # XXX: Would the next read fail if the portion is too large?
        data = f.read(part_end - part_start + 1)

        # DO WORK HERE
        # TODO:
        # - Variables like mpu, Bucket, Key should be passed from caller -- DONE
        # - We should collect part['ETag'] from this part into array/list, so we must synchronize access
        #   to that list, this list is then used to construct part_info array to call .complete_multipart_upload(...)
        # TODO.
        #
        # NOTES:
        # - Since part id is zero based (from handle_mp_file function), we add 1 to it here as HTTP parts should start
        #   from 1
        part = b3_client.upload_part(Bucket=bucket, Key=filename, PartNumber=partid+1, UploadId=upload_id, Body=data)

        # Thread critical variable which should hold all information about ETag for all parts, access to this variable
        # should be synchronized.
        lock = thr_args['Lock']
        if lock.acquire():
                thr_args['PartInfo']['Parts'].append({'PartNumber': partid+1, 'ETag': part['ETag']})
                lock.release()

        f.close()
        logging.info("%d: -><- Part ID %d is ending", partid, partid)
        return

#
# Part size calculations.
# Thread dispatcher
#
def handle_mp_file(bucket, filename, nrparts):

        print ">> Uploading file: " + filename + ", nr_parts = " + str(nrparts)

        fsize = os.path.getsize(filename)
        print "+ %s file size = %d " % (filename, fsize)

        # do the part size calculations
        part_size = fsize / nrparts
        print "+ standard part size = " + str(part_size) + " bytes"

        # Initiate multipart uploads for the file under the bucket
        mpu = b3_client.create_multipart_upload(Bucket=bucket, Key=filename)

        threads = list()
        thr_lock = threading.Lock()
        thr_args = { 'PartInfo': { 'Parts': [] } , 'UploadId': mpu['UploadId'], 'BucketName': bucket, 'FileName': filename,
                'Lock': thr_lock }

        for i in range(nrparts):
                print "++ Part ID: " + str(i)

                part_start = i * part_size
                part_end = (part_start + part_size) - 1

                if (i+1) == nrparts:
                        print "DEBUG: last chunk, part-end was/will %d/%d" % (part_end, fsize)
                        part_end = fsize

                print "DEBUG: part_start=%d/part_end=%d" % (part_start, part_end)

                thr = threading.Thread(target=upload_part_r, args=(i, part_start, part_end, thr_args, ) )
                threads.append(thr)
                thr.start()

        # Wait for all threads to complete
        for index, thr in enumerate(threads):
                thr.join()
                print "%d thread finished" % (index)

        part_info = thr_args['PartInfo']
        for p in part_info['Parts']:
                print "DEBUG: PartNumber=%d" % (p['PartNumber'])
                print "DEBUG: ETag=%s" % (p['ETag'])

        print "+ Finishing up multi-part uploads"
        b3_client.complete_multipart_upload(Bucket=bucket, Key=filename, UploadId=mpu['UploadId'], MultipartUpload=thr_args['PartInfo'])
        return True

### MAIN ###

if __name__ == "__main__":
        bucket = 'test'                 # XXX FIXME: Pass in arguments

        if len(sys.argv) != 3:
                print "usage: %s <filename to upload> <number of threads/parts>" % (sys.argv[0])
                sys.exit(1)

        # Filename: File to upload
        # NR Parts: Number of parts to divide the file to, which is the number of threads to use
        filename = sys.argv[1]
        nrparts = int(sys.argv[2])

        format = "%(asctime)s: %(message)s"
        logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

        # Initialize the connection with Ceph RADOS GW
        b3_client = boto3.client(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')
        b3_s3 = boto3.resource(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')

        handle_mp_file(bucket, filename, nrparts)

### END ###

This code will using Python multithreading to upload multiple part of the file simultaneously as any modern download manager will do using the feature of HTTP/1.1.

To use this Python script, name the above code to a file called boto3-upload-mp.py and run is as:

$ ./boto3-upload-mp.py mp_file_original.bin 6

Here 6 means the script will divide the file into 6 parts and create 6 threads to upload these part simultaneously.

The uploaded file can be then redownloaded and checksummed against the original file to veridy it was uploaded successfully.