falcon-wsgi-example/app/images.py

102 lines
2.8 KiB
Python

import io
import os
import re
import uuid
import mimetypes
import falcon
import msgpack
ALLOWED_IMAGE_TYPES = (
'image/gif',
'image/jpeg',
'image/png',
)
def validate_image_type(req, resp, resource, params):
if req.content_type not in ALLOWED_IMAGE_TYPES:
msg = 'Image type not allowed. Must be PNG, JPEG, or GIF'
raise falcon.HTTPBadRequest(title='Bad request', description=msg)
class Collection:
def __init__(self, image_store):
self._image_store = image_store
def on_get(self, req, resp):
# TODO: Modify this to return a list of href's based on
# what images are actually available.
doc = {
'images': [
{
'href': '/images/1eaf6ef1-7f2d-4ecc-a8d5-6e8adba7cc0e.png',
}
]
}
resp.data = msgpack.packb(doc, use_bin_type=True)
resp.content_type = falcon.MEDIA_MSGPACK
resp.status = falcon.HTTP_200
@falcon.before(validate_image_type)
def on_post(self, req, resp):
name = self._image_store.save(req.stream, req.content_type)
resp.status = falcon.HTTP_201
resp.location = '/images/' + name
class Item:
def __init__(self, image_store):
self._image_store = image_store
def on_get(self, req, resp, name):
resp.content_type = mimetypes.guess_type(name)[0]
try:
resp.stream, resp.content_length = self._image_store.open(name)
except IOError:
# Normally you would also log the error.
raise falcon.HTTPNotFound()
class ImageStore:
_CHUNK_SIZE_BYTES = 4096
_IMAGE_NAME_PATTERN = re.compile(
'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\.[a-z]{2,4}$'
)
def __init__(self, storage_path, uuidgen=uuid.uuid4, fopen=io.open):
self._storage_path = storage_path
self._uuidgen = uuidgen
self._fopen = fopen
def save(self, image_stream, image_content_type):
ext = mimetypes.guess_extension(image_content_type)
name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext)
image_path = os.path.join(self._storage_path, name)
with self._fopen(image_path, 'wb') as image_file:
while True:
chunk = image_stream.read(self._CHUNK_SIZE_BYTES)
if not chunk:
break
image_file.write(chunk)
return name
def open(self, name):
# Always validate untrusted input!
if not self._IMAGE_NAME_PATTERN.match(name):
raise IOError('File not found')
image_path = os.path.join(self._storage_path, name)
stream = self._fopen(image_path, 'rb')
content_length = os.path.getsize(image_path)
return stream, content_length