Summary
In this article the following will be demonstrated:
- Ceph Nano – As the back end storage and S3 interface
- Python script to use the S3 API to multipart upload a file to the Ceph Nano using Python multi-threading
Introduction
Caph Nano is a Docker container providing basic Ceph services (mainly Ceph Monitor, Ceph MGR, Ceph OSD for managing the Container Storage and a RADOS Gateway to provide the S3 API interface). It also provides Web UI interface to view and manage buckets.
Multipart uploads is a feature in HTTP/1.1 protocol that allow download/upload of range of bytes in a file. For example, a 200 MB file can be downloaded in 2 rounds, first round can 50% of the file (byte 0 to 104857600) and then download the remaining 50% starting from byte 104857601 in the second round.
The Details
First Docker must be installed in local system, then download the Ceph Nano CLI using:
$ curl -L https://github.com/ceph/cn/releases/download/v2.3.1/cn-v2.3.1-linux-amd64 -o cn && chmod +x cn
This will install the binary cn version 2.3.1 in local folder and turn it executable.
To start the Ceph Nano cluster (container), run the following command:
$ ./cn cluster start ceph
2019/12/03 11:59:12 Starting cluster ceph...
Endpoint: http://166.87.163.10:8000
Dashboard: http://166.87.163.10:5000
Access key: 90WFLFQNZQ452XXI6851
Secret key: ISmL6Ru3I3MDiFwZITPCu8b1tL3BWyPDAmLoF0ZP
Working directory: /usr/share/ceph-nano
This will download the Ceph Nano image and run it as a Docker container. Web UI can be accessed on http://166.87.163.10:5000, API end point is at http://166.87.163.10:8000.
We can verify that using:
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0ba17ec716d3 ceph/daemon "/opt/ceph-contain..." 4 weeks ago Up 26 seconds 0.0.0.0:5000->5000/tcp, 0.0.0.0:8000->8000/tcp ceph-nano-ceph
Of course this is for demonstration purpose, the container here is created 4 weeks ago.
It can be accessed with the name ceph-nano-ceph using the command
$ docker exec -it ceph-nano-ceph /bin/bash
Which will drop me in a BASH shell inside the Ceph Nano container.
To examine the running processes inside the container:
[root@ceph-nano-ceph-faa32aebf00b /]# ps -ef
UID PID PPID C STIME TTY TIME CMD
root 1 0 0 08:59 ? 00:00:00 /bin/bash /opt/ceph-container/bin/entrypoint.sh
ceph 113 1 0 08:59 ? 00:00:43 /usr/bin/ceph-mon --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b --mon-data /var/lib/ceph/mo
ceph 194 1 1 08:59 ? 00:02:08 ceph-mgr --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b
ceph 240 1 0 08:59 ? 00:00:29 ceph-osd --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i 0
ceph 451 1 0 08:59 ? 00:00:17 radosgw --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -n client.rgw.ceph-nano-ceph-faa32aebf00b -k /var/lib/ceph/radosgw/c
root 457 1 0 08:59 ? 00:00:02 python app.py
root 461 1 0 08:59 ? 00:00:00 /usr/bin/python2.7 /usr/bin/ceph --cluster ceph -w
root 1093 0 0 11:02 ? 00:00:00 /bin/bash
root 1111 1093 0 11:03 ? 00:00:00 ps -ef
The first thing I need to do is to create a bucket, so when inside the Ceph Nano container I use the following command:
# s3cmd mb s3://nano
Which will create a Bucket called nano.
Now to create a user on the Ceph Nano cluster to access the S3 buckets. So here I created a user called test, with access and secret keys set to test.
$ radosgw-admin user create --uid=test --access-key=test --secret=test --display-name test
To upload a test file for testing:
# dd if=/dev/zero of=./zeros bs=15M count=1
# s3cmd put ./zeros s3://nano
And list the file in the bucket:
# s3cmd ls s3://nano
2019-10-29 11:58 15728640 s3://nano/zeors
The Python code
#!/usr/bin/env python
#
# Copyright (c) 2019 Tamer Embaby <tamer@redhat.com>
# All rights reserved.
#
# Main reference is: https://stackoverflow.com/questions/34303775/complete-a-multipart-upload-with-boto3
# Good code, but it will take too much time to complete especially for thread synchronization. (DONE)
#
# TODO:
# - Check return code of function calls everywhere.
# - Use logging instead of print's everywhere.
# - Address the XXX and FIXME's in the code
#
import boto3
import sys, os
import threading
import logging
b3_client = None
b3_s3 = None
mpu = None # Multipart upload handle
#
# Thread (safe) function responsible of uploading a part of the file
#
def upload_part_r(partid, part_start, part_end, thr_args):
filename = thr_args['FileName']
bucket = thr_args['BucketName']
upload_id = thr_args['UploadId']
logging.info("%d: >> Uploading part %d", partid, partid)
logging.info("%d: --> Upload starts at byte %d", partid, part_start)
logging.info("%d: --> Upload ends at byte %d", partid, part_end)
f = open(filename, "rb")
logging.info("%d: DEBUG: Seeking offset: %d", partid, part_start)
logging.info("%d: DEBUG: Reading size: %d", partid, part_end - part_start)
f.seek(part_start, 0)
# XXX: Would the next read fail if the portion is too large?
data = f.read(part_end - part_start + 1)
# DO WORK HERE
# TODO:
# - Variables like mpu, Bucket, Key should be passed from caller -- DONE
# - We should collect part['ETag'] from this part into array/list, so we must synchronize access
# to that list, this list is then used to construct part_info array to call .complete_multipart_upload(...)
# TODO.
#
# NOTES:
# - Since part id is zero based (from handle_mp_file function), we add 1 to it here as HTTP parts should start
# from 1
part = b3_client.upload_part(Bucket=bucket, Key=filename, PartNumber=partid+1, UploadId=upload_id, Body=data)
# Thread critical variable which should hold all information about ETag for all parts, access to this variable
# should be synchronized.
lock = thr_args['Lock']
if lock.acquire():
thr_args['PartInfo']['Parts'].append({'PartNumber': partid+1, 'ETag': part['ETag']})
lock.release()
f.close()
logging.info("%d: -><- Part ID %d is ending", partid, partid)
return
#
# Part size calculations.
# Thread dispatcher
#
def handle_mp_file(bucket, filename, nrparts):
print ">> Uploading file: " + filename + ", nr_parts = " + str(nrparts)
fsize = os.path.getsize(filename)
print "+ %s file size = %d " % (filename, fsize)
# do the part size calculations
part_size = fsize / nrparts
print "+ standard part size = " + str(part_size) + " bytes"
# Initiate multipart uploads for the file under the bucket
mpu = b3_client.create_multipart_upload(Bucket=bucket, Key=filename)
threads = list()
thr_lock = threading.Lock()
thr_args = { 'PartInfo': { 'Parts': [] } , 'UploadId': mpu['UploadId'], 'BucketName': bucket, 'FileName': filename,
'Lock': thr_lock }
for i in range(nrparts):
print "++ Part ID: " + str(i)
part_start = i * part_size
part_end = (part_start + part_size) - 1
if (i+1) == nrparts:
print "DEBUG: last chunk, part-end was/will %d/%d" % (part_end, fsize)
part_end = fsize
print "DEBUG: part_start=%d/part_end=%d" % (part_start, part_end)
thr = threading.Thread(target=upload_part_r, args=(i, part_start, part_end, thr_args, ) )
threads.append(thr)
thr.start()
# Wait for all threads to complete
for index, thr in enumerate(threads):
thr.join()
print "%d thread finished" % (index)
part_info = thr_args['PartInfo']
for p in part_info['Parts']:
print "DEBUG: PartNumber=%d" % (p['PartNumber'])
print "DEBUG: ETag=%s" % (p['ETag'])
print "+ Finishing up multi-part uploads"
b3_client.complete_multipart_upload(Bucket=bucket, Key=filename, UploadId=mpu['UploadId'], MultipartUpload=thr_args['PartInfo'])
return True
### MAIN ###
if __name__ == "__main__":
bucket = 'test' # XXX FIXME: Pass in arguments
if len(sys.argv) != 3:
print "usage: %s <filename to upload> <number of threads/parts>" % (sys.argv[0])
sys.exit(1)
# Filename: File to upload
# NR Parts: Number of parts to divide the file to, which is the number of threads to use
filename = sys.argv[1]
nrparts = int(sys.argv[2])
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# Initialize the connection with Ceph RADOS GW
b3_client = boto3.client(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')
b3_s3 = boto3.resource(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')
handle_mp_file(bucket, filename, nrparts)
### END ###
This code will using Python multithreading to upload multiple part of the file simultaneously as any modern download manager will do using the feature of HTTP/1.1.
To use this Python script, name the above code to a file called boto3-upload-mp.py and run is as:
$ ./boto3-upload-mp.py mp_file_original.bin 6
Here 6 means the script will divide the file into 6 parts and create 6 threads to upload these part simultaneously.
The uploaded file can be then redownloaded and checksummed against the original file to veridy it was uploaded successfully.