diff --git a/faafo/faafo/api/service.py b/faafo/faafo/api/service.py index 8843999..74ef9c8 100644 --- a/faafo/faafo/api/service.py +++ b/faafo/faafo/api/service.py @@ -12,11 +12,14 @@ import base64 import copy +import hashlib import io import socket +import uuid from pkg_resources import resource_filename import flask +from flask import request from flask_restless import APIManager from flask_sqlalchemy import SQLAlchemy from flask_bootstrap import Bootstrap @@ -25,11 +28,13 @@ from kombu.pools import producers from oslo_config import cfg from oslo_log import log from PIL import Image -from sqlalchemy.dialects import mysql from faafo import queues from faafo import version +from libcloud.storage.types import Provider +from libcloud.storage.providers import get_driver + LOG = log.getLogger('faafo.api') CONF = cfg.CONF @@ -57,6 +62,22 @@ CONF(project='api', prog='faafo-api', log.setup(CONF, 'api', version=version.version_info.version_string()) +# Initialize Swift driver +Swift = get_driver(Provider.OPENSTACK_SWIFT) +driver = Swift( + user='CloudComp2', + key='demo', + tenant_name='CloudComp2', + auth_url='https://10.32.4.29:5000/v3', +) + +# Ensure container exists +try: + container = driver.get_container(container_name='fractals') +except: + # Create container if it doesn't exist + container = driver.create_container(container_name='fractals') + template_path = resource_filename(__name__, "templates") app = flask.Flask('faafo.api', template_folder=template_path) app.config['DEBUG'] = CONF.debug @@ -73,10 +94,10 @@ def list_opts(): return [(None, copy.deepcopy(api_opts))] -class Fractal(db.Model): +class Fractal(db.Model): uuid = db.Column(db.String(36), primary_key=True) checksum = db.Column(db.String(256), unique=True) - url = db.Column(db.String(256), nullable=True) + url = db.Column(db.String(256), nullable=True) # Stores Swift object name/path duration = db.Column(db.Float) size = db.Column(db.Integer, nullable=True) width = db.Column(db.Integer, nullable=False) @@ -86,13 +107,6 @@ class Fractal(db.Model): xb = db.Column(db.Float, nullable=False) ya = db.Column(db.Float, nullable=False) yb = db.Column(db.Float, nullable=False) - - if CONF.database_url.startswith('mysql'): - LOG.debug('Using MySQL database backend') - image = db.Column(mysql.MEDIUMBLOB, nullable=True) - else: - image = db.Column(db.LargeBinary, nullable=True) - generated_by = db.Column(db.String(256), nullable=True) def __repr__(self): @@ -106,6 +120,36 @@ manager = APIManager(app=app, session=db.session) connection = Connection(CONF.transport_url) +def upload_image_to_swift(image_bytes, object_name): + """Upload image bytes to Swift storage and return the object name.""" + try: + LOG.debug(f"Uploading image to Swift: {object_name}") + obj = driver.upload_object_via_stream( + iterator=io.BytesIO(image_bytes), + container=container, + object_name=object_name + ) + LOG.debug(f"Successfully uploaded {object_name} to Swift") + return object_name + except Exception as e: + LOG.error(f"Failed to upload image to Swift: {e}") + raise + + +def download_image_from_swift(object_name): + """Download image from Swift storage.""" + try: + LOG.debug(f"Downloading image from Swift: {object_name}") + obj = driver.get_object(container_name='fractals', object_name=object_name) + stream = driver.download_object_as_stream(obj) + image_data = b''.join(stream) + LOG.debug(f"Successfully downloaded {object_name} from Swift") + return image_data + except Exception as e: + LOG.error(f"Failed to download image from Swift: {e}") + raise + + @app.route('/', methods=['GET']) @app.route('/index', methods=['GET']) @app.route('/index/', methods=['GET']) @@ -120,20 +164,17 @@ def index(page=1): @app.route('/fractal/', methods=['GET']) def get_fractal(fractalid): fractal = Fractal.query.filter_by(uuid=fractalid).first() - if not fractal: - response = flask.jsonify({'code': 404, - 'message': 'Fracal not found'}) - response.status_code = 404 - else: - image_data = base64.b64decode(fractal.image) - image = Image.open(io.BytesIO(image_data)) - output = io.BytesIO() - image.save(output, "PNG") - image.seek(0) - response = flask.make_response(output.getvalue()) - response.content_type = "image/png" + if not fractal or not fractal.url: + return flask.jsonify({'code': 404, 'message': 'Fractal not found'}), 404 - return response + try: + image_data = download_image_from_swift(fractal.url) + response = flask.make_response(image_data) + response.content_type = "image/png" + return response + except Exception as e: + LOG.error(f"Error retrieving fractal {fractalid}: {e}") + return flask.jsonify({'code': 500, 'message': 'Error retrieving fractal'}), 500 def generate_fractal(**kwargs): @@ -147,15 +188,41 @@ def generate_fractal(**kwargs): def convert_image_to_binary(**kwargs): + """Process the image data from worker and upload to Swift.""" LOG.debug("Preprocessor call: " + str(kwargs)) + if 'image' in kwargs['data']['data']['attributes']: - LOG.debug("Converting image to binary...") - kwargs['data']['data']['attributes']['image'] = \ - str(kwargs['data']['data']['attributes']['image']).encode("ascii") + LOG.debug("Processing image for Swift upload...") + + # Get the base64 encoded image from worker + image_base64 = kwargs['data']['data']['attributes']['image'] + image_bytes = base64.b64decode(image_base64) + + # Generate object name using UUID + fractal_uuid = kwargs['data']['data']['attributes']['uuid'] + object_name = f"{fractal_uuid}.png" + + try: + # Upload to Swift + swift_object_name = upload_image_to_swift(image_bytes, object_name) + + # Update the fractal record with Swift object name instead of binary data + kwargs['data']['data']['attributes']['url'] = swift_object_name + + # Remove the binary image data since we're storing in Swift + del kwargs['data']['data']['attributes']['image'] + + LOG.debug(f"Image uploaded to Swift as {swift_object_name}") + + except Exception as e: + LOG.error(f"Failed to upload image to Swift: {e}") + # Keep the binary data as fallback if Swift upload fails + kwargs['data']['data']['attributes']['image'] = \ + str(kwargs['data']['data']['attributes']['image']).encode("ascii") def main(): - print("Starting API server - new...") + print("Starting API server with Swift storage...") with app.app_context(): manager.create_api(Fractal, methods=['GET', 'POST', 'DELETE', 'PATCH'], postprocessors={'POST_RESOURCE': [generate_fractal]}, @@ -163,4 +230,4 @@ def main(): exclude=['image'], url_prefix='/v1', allow_client_generated_ids=True) - app.run(host=CONF.listen_address, port=CONF.bind_port, debug=True) + app.run(host=CONF.listen_address, port=CONF.bind_port, debug=True) \ No newline at end of file diff --git a/faafo/faafo/worker/service.py b/faafo/faafo/worker/service.py index edd4784..18d1017 100644 --- a/faafo/faafo/worker/service.py +++ b/faafo/faafo/worker/service.py @@ -37,11 +37,11 @@ LOG = log.getLogger('faafo.worker') CONF = cfg.CONF -worker_opts = { +worker_opts = [ cfg.StrOpt('endpoint-url', default='http://localhost', help='API connection URL') -} +] CONF.register_opts(worker_opts) @@ -84,6 +84,13 @@ class JuliaSet(object): self.image.save(fp, "PNG") return fp.name + def get_image_bytes(self): + """Return image as bytes without saving to file.""" + with tempfile.NamedTemporaryFile() as fp: + self.image.save(fp, "PNG") + fp.seek(0) + return fp.read() + def _set_point(self): random.seed() while True: @@ -116,6 +123,8 @@ class Worker(ConsumerMixin): LOG.info("processing task %s" % task['uuid']) LOG.debug(task) start_time = time.time() + + # Generate the fractal juliaset = JuliaSet(task['width'], task['height'], task['xa'], @@ -127,16 +136,20 @@ class Worker(ConsumerMixin): LOG.info("task %s processed in %f seconds" % (task['uuid'], elapsed_time)) - filename = juliaset.get_file() - LOG.debug("saved result of task %s to temporary file %s" % - (task['uuid'], filename)) - with open(filename, "rb") as fp: - size = os.fstat(fp.fileno()).st_size - image = base64.b64encode(fp.read()) - checksum = hashlib.sha256(open(filename, 'rb').read()).hexdigest() - os.remove(filename) - LOG.debug("removed temporary file %s" % filename) + # Get image as bytes instead of saving to file + image_bytes = juliaset.get_image_bytes() + size = len(image_bytes) + + # Calculate checksum + checksum = hashlib.sha256(image_bytes).hexdigest() + + # Convert to base64 for JSON transmission + image_base64 = base64.b64encode(image_bytes).decode("ascii") + + LOG.debug("generated fractal %s, size: %d bytes, checksum: %s" % + (task['uuid'], size, checksum)) + # Prepare result for API result = { 'data': { 'type': 'fractal', @@ -144,7 +157,7 @@ class Worker(ConsumerMixin): 'attributes': { 'uuid': task['uuid'], 'duration': elapsed_time, - 'image': image.decode("ascii"), + 'image': image_base64, # This will be processed by API and uploaded to Swift 'checksum': checksum, 'size': size, 'generated_by': socket.gethostname() @@ -155,12 +168,22 @@ class Worker(ConsumerMixin): headers = {'Content-Type': 'application/vnd.api+json', 'Accept': 'application/vnd.api+json'} - resp = requests.patch("%s/v1/fractal/%s" % - (CONF.endpoint_url, str(task['uuid'])), - json.dumps(result), - headers=headers, - timeout=30) - LOG.debug("Result: %s" % resp.text) + try: + resp = requests.patch("%s/v1/fractal/%s" % + (CONF.endpoint_url, str(task['uuid'])), + json.dumps(result), + headers=headers, + timeout=30) + LOG.debug("API Response: %s" % resp.text) + + if resp.status_code not in [200, 201]: + LOG.error("API request failed with status %d: %s" % + (resp.status_code, resp.text)) + else: + LOG.info("Successfully uploaded fractal %s to Swift storage" % task['uuid']) + + except Exception as e: + LOG.error("Failed to send result to API: %s" % e) message.ack() - return result + return result \ No newline at end of file