Ceph, AWS S3, and Multipart uploads using Python

Summary

In this article the following will be demonstrated:

  • Ceph Nano – As the back end storage and S3 interface
  • Python script to use the S3 API to multipart upload a file to the Ceph Nano using Python multi-threading

Introduction

Caph Nano is a Docker container providing basic Ceph services (mainly Ceph Monitor, Ceph MGR, Ceph OSD for managing the Container Storage and a RADOS Gateway to provide the S3 API interface). It also provides Web UI interface to view and manage buckets.

Multipart uploads is a feature in HTTP/1.1 protocol that allow download/upload of range of bytes in a file. For example, a 200 MB file can be downloaded in 2 rounds, first round can 50% of the file (byte 0 to 104857600) and then download the remaining 50% starting from byte 104857601 in the second round.

The Details

First Docker must be installed in local system, then download the Ceph Nano CLI using:

$ curl -L https://github.com/ceph/cn/releases/download/v2.3.1/cn-v2.3.1-linux-amd64 -o cn && chmod +x cn

This will install the binary cn version 2.3.1 in local folder and turn it executable.

To start the Ceph Nano cluster (container), run the following command:

$ ./cn cluster start ceph
2019/12/03 11:59:12 Starting cluster ceph...

Endpoint: http://166.87.163.10:8000
Dashboard: http://166.87.163.10:5000
Access key: 90WFLFQNZQ452XXI6851
Secret key: ISmL6Ru3I3MDiFwZITPCu8b1tL3BWyPDAmLoF0ZP
Working directory: /usr/share/ceph-nano

This will download the Ceph Nano image and run it as a Docker container. Web UI can be accessed on http://166.87.163.10:5000, API end point is at http://166.87.163.10:8000.

We can verify that using:

$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                            NAMES
0ba17ec716d3        ceph/daemon         "/opt/ceph-contain..."   4 weeks ago         Up 26 seconds       0.0.0.0:5000->5000/tcp, 0.0.0.0:8000->8000/tcp   ceph-nano-ceph

Of course this is for demonstration purpose, the container here is created 4 weeks ago.

It can be accessed with the name ceph-nano-ceph using the command

$ docker exec -it ceph-nano-ceph /bin/bash

Which will drop me in a BASH shell inside the Ceph Nano container.

To examine the running processes inside the container:

[root@ceph-nano-ceph-faa32aebf00b /]# ps -ef
UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  0 08:59 ?        00:00:00 /bin/bash /opt/ceph-container/bin/entrypoint.sh
ceph       113     1  0 08:59 ?        00:00:43 /usr/bin/ceph-mon --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b --mon-data /var/lib/ceph/mo
ceph       194     1  1 08:59 ?        00:02:08 ceph-mgr --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i ceph-nano-ceph-faa32aebf00b
ceph       240     1  0 08:59 ?        00:00:29 ceph-osd --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -i 0
ceph       451     1  0 08:59 ?        00:00:17 radosgw --cluster ceph --default-log-to-file=false --default-mon-cluster-log-to-file=false --setuser ceph --setgroup ceph -n client.rgw.ceph-nano-ceph-faa32aebf00b -k /var/lib/ceph/radosgw/c
root       457     1  0 08:59 ?        00:00:02 python app.py
root       461     1  0 08:59 ?        00:00:00 /usr/bin/python2.7 /usr/bin/ceph --cluster ceph -w
root      1093     0  0 11:02 ?        00:00:00 /bin/bash
root      1111  1093  0 11:03 ?        00:00:00 ps -ef

The first thing I need to do is to create a bucket, so when inside the Ceph Nano container I use the following command:

# s3cmd mb s3://nano

Which will create a Bucket called nano.

Now to create a user on the Ceph Nano cluster to access the S3 buckets. So here I created a user called test, with access and secret keys set to test.

$ radosgw-admin user create --uid=test --access-key=test --secret=test --display-name test

To upload a test file for testing:

# dd if=/dev/zero of=./zeros bs=15M count=1
# s3cmd put ./zeros s3://nano

And list the file in the bucket:

# s3cmd ls s3://nano
2019-10-29 11:58  15728640   s3://nano/zeors

The Python code

#!/usr/bin/env python
#
# Copyright (c) 2019 Tamer Embaby <tamer@redhat.com>
# All rights reserved.
#
# Main reference is: https://stackoverflow.com/questions/34303775/complete-a-multipart-upload-with-boto3
# Good code, but it will take too much time to complete especially for thread synchronization. (DONE)
#
# TODO: 
#       - Check return code of function calls everywhere.
#       - Use logging instead of print's everywhere.
#       - Address the XXX and FIXME's in the code
#

import boto3
import sys, os
import threading
import logging

b3_client = None
b3_s3 = None
mpu = None              # Multipart upload handle

#
# Thread (safe) function responsible of uploading a part of the file
#
def upload_part_r(partid, part_start, part_end, thr_args):
        filename = thr_args['FileName']
        bucket = thr_args['BucketName']
        upload_id = thr_args['UploadId']

        logging.info("%d: >> Uploading part %d", partid, partid)
        logging.info("%d: --> Upload starts at byte %d", partid, part_start)
        logging.info("%d: --> Upload ends at byte %d", partid, part_end)

        f = open(filename, "rb")
        logging.info("%d: DEBUG: Seeking offset: %d", partid, part_start)
        logging.info("%d: DEBUG: Reading size: %d", partid, part_end - part_start)
        f.seek(part_start, 0)
        # XXX: Would the next read fail if the portion is too large?
        data = f.read(part_end - part_start + 1)

        # DO WORK HERE
        # TODO:
        # - Variables like mpu, Bucket, Key should be passed from caller -- DONE
        # - We should collect part['ETag'] from this part into array/list, so we must synchronize access
        #   to that list, this list is then used to construct part_info array to call .complete_multipart_upload(...)
        # TODO.
        #
        # NOTES:
        # - Since part id is zero based (from handle_mp_file function), we add 1 to it here as HTTP parts should start
        #   from 1
        part = b3_client.upload_part(Bucket=bucket, Key=filename, PartNumber=partid+1, UploadId=upload_id, Body=data)

        # Thread critical variable which should hold all information about ETag for all parts, access to this variable
        # should be synchronized.
        lock = thr_args['Lock']
        if lock.acquire():
                thr_args['PartInfo']['Parts'].append({'PartNumber': partid+1, 'ETag': part['ETag']})
                lock.release()

        f.close()
        logging.info("%d: -><- Part ID %d is ending", partid, partid)
        return

#
# Part size calculations.
# Thread dispatcher
#
def handle_mp_file(bucket, filename, nrparts):

        print ">> Uploading file: " + filename + ", nr_parts = " + str(nrparts)

        fsize = os.path.getsize(filename)
        print "+ %s file size = %d " % (filename, fsize)

        # do the part size calculations
        part_size = fsize / nrparts
        print "+ standard part size = " + str(part_size) + " bytes"

        # Initiate multipart uploads for the file under the bucket
        mpu = b3_client.create_multipart_upload(Bucket=bucket, Key=filename)

        threads = list()
        thr_lock = threading.Lock()
        thr_args = { 'PartInfo': { 'Parts': [] } , 'UploadId': mpu['UploadId'], 'BucketName': bucket, 'FileName': filename,
                'Lock': thr_lock }

        for i in range(nrparts):
                print "++ Part ID: " + str(i)

                part_start = i * part_size
                part_end = (part_start + part_size) - 1

                if (i+1) == nrparts:
                        print "DEBUG: last chunk, part-end was/will %d/%d" % (part_end, fsize)
                        part_end = fsize

                print "DEBUG: part_start=%d/part_end=%d" % (part_start, part_end)

                thr = threading.Thread(target=upload_part_r, args=(i, part_start, part_end, thr_args, ) )
                threads.append(thr)
                thr.start()

        # Wait for all threads to complete
        for index, thr in enumerate(threads):
                thr.join()
                print "%d thread finished" % (index)

        part_info = thr_args['PartInfo']
        for p in part_info['Parts']:
                print "DEBUG: PartNumber=%d" % (p['PartNumber'])
                print "DEBUG: ETag=%s" % (p['ETag'])

        print "+ Finishing up multi-part uploads"
        b3_client.complete_multipart_upload(Bucket=bucket, Key=filename, UploadId=mpu['UploadId'], MultipartUpload=thr_args['PartInfo'])
        return True

### MAIN ###

if __name__ == "__main__":
        bucket = 'test'                 # XXX FIXME: Pass in arguments

        if len(sys.argv) != 3:
                print "usage: %s <filename to upload> <number of threads/parts>" % (sys.argv[0])
                sys.exit(1)

        # Filename: File to upload
        # NR Parts: Number of parts to divide the file to, which is the number of threads to use
        filename = sys.argv[1]
        nrparts = int(sys.argv[2])

        format = "%(asctime)s: %(message)s"
        logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

        # Initialize the connection with Ceph RADOS GW
        b3_client = boto3.client(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')
        b3_s3 = boto3.resource(service_name = 's3', endpoint_url = 'http://127.0.0.1:8000', aws_access_key_id = 'test', aws_secret_access_key = 'test')

        handle_mp_file(bucket, filename, nrparts)

### END ###

This code will using Python multithreading to upload multiple part of the file simultaneously as any modern download manager will do using the feature of HTTP/1.1.

To use this Python script, name the above code to a file called boto3-upload-mp.py and run is as:

$ ./boto3-upload-mp.py mp_file_original.bin 6

Here 6 means the script will divide the file into 6 parts and create 6 threads to upload these part simultaneously.

The uploaded file can be then redownloaded and checksummed against the original file to veridy it was uploaded successfully.

Deploying Ceph – I: Initial environment

What is Ceph

Ceph is an Open Source Distributed Storage Platform, it stores data on a single cluster running on distributed computers. It and provides interfaces for object, block and file-level storage. Ceph is completely free and aims to be with no single point of failure, and to be scalable to the exabyte level.

Ceph is fault tolerant, it does so by replicating data across cluster. By default data stored in Ceph is replicated to 3 different storage devices. The replication level can be changed of course.

For simplicity, from a high level, when one of the storage devices fails, Ceph will replicate missing blocks to new device replacing the faulted one. An in depth technical explanation of this process will follow in the document.

Basic Ceph components

Cluster monitors (ceph-mon) that keep track of active and failed cluster nodes, it monitors the Ceph cluster, and report on its health and status.

Metadata servers (ceph-mds) that store the metadata of inodes and directories.

Object storage devices (ceph-osd) that actually store the content of files. This daemon is responsible for storing data on a local file system and providing access to this data over the network via different client software or access mediums. OSD daemon is responsible as well of data replication.

Data placement

Ceph stores, replicates and re-balances data objects across the cluster dynamically. To understand how Ceph places data in a cluster the following definition must be known:

Pools: Ceph stores data within pools, which are logical groups for storing objects. Pools manage the number of placement groups, the number of replicas, and the rule-set for the pool.

Placement Groups: This is the most important thing to understand. Ceph maps objects to placement groups (PGs). Placement groups (PGs) are shards or fragments of a logical object pool that place objects as a group into OSDs. Placement groups reduce the amount of per-object metadata when Ceph stores the data in OSDs. A larger number of placement groups (e.g., 100 per OSD) leads to better balancing.

In a nut-shell, if the replication level is 3 (default), each placement group will be assigned the exact number of OSDs (3), So when you have 4 way replication each PG will have 4 OSDs. When data is stored in the cluster, it will be assigned by the CRUSH algorithm (next section) to a PG to be stored. The PG will then replicate the data across all OSDs assigned to it, thus the data is stored 3 times (3 way replication).

Each OSD daemon is responsible for one storage device, so there is a one to one mapping between the number of OSDs and storage devices.

CRUSH Maps: CRUSH is a big part of what allows Ceph to scale without performance bottlenecks, without limitations to scalability, and without a single point of failure. CRUSH maps provide the physical topology of the cluster to the CRUSH algorithm to determine where the data for an object and its replicas should be stored, and how to do so across failure domains for added data safety among other things.

Explanation Scenario

This is not a real world example, it’s simple for explanation purposes only.

A Ceph cluster has 4 OSDs and a 3 way replication. Containing 10 PGs

PG1: OSD1, OSD2, OSD3
PG2: OSD2, OSD3, OSD4
PG3: OSD3, OSD4, OSD1
...

So when we store data to the Ceph cluster, the CRUSH algorithm will uniquely select the appropriate PG to place the data in the cluster, for example is selects PG2. PG2 will then replicate the data to the assigned OSDs (in this case OSD2, OSD3, OSD4 (3 way replication).

In the ideal case the PG will be in the state clean when all its OSDs are in state up and in, and when all the data is replicated correctly.

Failure scenario

In the above scenario, suppose had disk served by OSD1 has failed, PG1 and PG3 will be degraded and unclean state until the problem is fixed with the drive or get replaced. A new OSD can be introduced and the degraded PGs will assign in to replace OSD1 and start the process of re-balancing and replicating the data.

Deployment steps

The below setup is initial, it’s not the optimum of course for running a production storage based on Ceph. It’s a 3 way replication with two OSDs, so it’s degraded and unclean to begin with until we add more OSDs.

First disable firewall.

Command to be run as root will be indicated by using the # prompt, otherwise user ceph is used.

In this scenario:

– 1 manager node with: Ceph monitor. It’s used as well as the administration node. It has the following specs:

  • 2 vCPUs
  • 6 GB RAM
  • 60 GB Storage
  • VM running on XenServer 6.2
  • CentOS 7.2 OS

– 1 node used as storage node with the following specs:

  • 2 x quad code Intel Xeon processors with hyper-threading enabled (total of 16 vCPUs
  • 128 GB RAM
  • 2 x 2T SAS disk drives
  • CentOS 7.2 OS

Each disk drive above will be assigned to an OSD. Of course more OSDs and Monitor node will be added later.

We will install Ceph Hammer, although Jewel was release by the time of this deployment.

1- Add user ceph, to all nodes and give it passwordless sudo access:

# vi /etc/sudoers.d/ceph
Defaults:ceph !requiretty
ceph ALL = (root) NOPASSWD:ALL

Give it approperiate permissions:

# chmod 440 /etc/sudoers.d/ceph

2- Install needed packages to add yum repository for EPEL and Ceph on all nodes

# yum -y install centos-release-ceph-hammer epel-release yum-plugin-priorities

3- Make Ceph repository have priority over other repositories on all nodes.

Add “priority=1” after the “enabled=1” value in the /etc/yum.repos.d/CentOS-Ceph-Hammer.repo file.

4- Install the Ceph deployer package on the administration node

# yum -y install ceph-deploy

Now login as user ceph on the administration machine and continue with the following steps.

5- Generate and distribute SSH keys for the user ceph:

$ ssh-copy-id ceph@ceph-stor-01

Since the administration node is the first Ceph monitor, we need to set up the SSH key on the local machine as well:

$ cp id_rsa.pub .ssh/authorized_keys
$ chmod 600 .ssh/authorized_keys

6- Prepare local directoy for the installation

$ mkdir ceph
$ cd ceph/

7- Start deploying a new cluster, and write a .conf and keyring for it.

$ ceph-deploy new --cluster-network 192.168.140.0/24 --public-network 10.0.140.0/16 ceph-mon-01

Cluster network is the local private network for cluster communication, public network is were the Ceph nodes and Storage will be accessible on.

8- Start installing Ceph components on the monitor node:

Components to be installed, Ceph Monitor, Ceph MDS, Ceph RESt gateway

$ ceph-deploy install --mon --rgw --mds --release hammer ceph-mon-01

9- Deploy for monitors defined in `ceph.conf:mon_initial_members`, wait until they form quorum and then gatherkeys, reporting the monitor status along the process. If monitors don’t form quorum the command will eventually time out.

$ ceph-deploy mon create-initial

10- On the OSD node, install the OSD daemon on the Storage node containing the initial 2 OSDs:

$ ceph-deploy install --release hammer --osd ceph-stor-01

11- Copy the key rings and configuration files on all nodes:

$ ceph-deploy admin ceph-mon-01 ceph-stor-01
$ sudo chmod 644 /etc/ceph/ceph.client.admin.keyring

12- Prepare Object Storage Daemon on the initial OSD node:

First ceph-stor-01 node has 2 disk drives, we dedicate 1.6T on each drive for a separate OSD. File system used is XFS.

$ ceph-deploy osd prepare ceph-stor-01:/ceph/storage01 ceph-stor-01:/ceph/storage02

13- Activate Object Storage Daemon

$ ceph-deploy osd activate ceph-stor-01:/ceph/storage01 ceph-stor-01:/ceph/storage02

14- Some commands o make sure we are on the right track

Check available storage:

$ ceph df
GLOBAL:
SIZE AVAIL RAW USED %RAW USED
3214G 3204G 10305M 0.31
POOLS:
NAME ID USED %USED MAX AVAIL OBJECTS
rbd 0 0 0 1068G 0

Check cluster status:

$ ceph -w
cluster 1ad7d5ae-3414-4286-b42e-7082b3355fde
health HEALTH_WARN
64 pgs degraded
64 pgs stuck degraded
64 pgs stuck inactive
64 pgs stuck unclean
64 pgs stuck undersized
64 pgs undersized
monmap e1: 1 mons at {ceph-mon-01=10.0.140.154:6789/0}
election epoch 2, quorum 0 ceph-mon-01
osdmap e9: 2 osds: 2 up, 2 in
pgmap v15: 64 pgs, 1 pools, 0 bytes data, 0 objects
10305 MB used, 3204 GB / 3214 GB avail
64 undersized+degraded+peered</code>
 
2016-04-28 23:41:34.137681 mon.0 [INF] pgmap v15: 64 pgs: 64 undersized+degraded+peered; 0 bytes data, 10305 MB used, 3204 GB / 3214 GB avail

It’s degraded, will remain so until we add more OSDs (again, because it’s a 3 way replication with only 2 OSDs)