changed worker and api to use s3

This commit is contained in:
yash-2264
2025-07-01 21:22:27 +05:30
parent a28bc043f5
commit 2737535866
2 changed files with 137 additions and 47 deletions

View File

@ -12,11 +12,14 @@
import base64 import base64
import copy import copy
import hashlib
import io import io
import socket import socket
import uuid
from pkg_resources import resource_filename from pkg_resources import resource_filename
import flask import flask
from flask import request
from flask_restless import APIManager from flask_restless import APIManager
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_bootstrap import Bootstrap from flask_bootstrap import Bootstrap
@ -25,11 +28,13 @@ from kombu.pools import producers
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from PIL import Image from PIL import Image
from sqlalchemy.dialects import mysql
from faafo import queues from faafo import queues
from faafo import version from faafo import version
from libcloud.storage.types import Provider
from libcloud.storage.providers import get_driver
LOG = log.getLogger('faafo.api') LOG = log.getLogger('faafo.api')
CONF = cfg.CONF CONF = cfg.CONF
@ -57,6 +62,22 @@ CONF(project='api', prog='faafo-api',
log.setup(CONF, 'api', log.setup(CONF, 'api',
version=version.version_info.version_string()) version=version.version_info.version_string())
# Initialize Swift driver
Swift = get_driver(Provider.OPENSTACK_SWIFT)
driver = Swift(
user='CloudComp2',
key='demo',
tenant_name='CloudComp2',
auth_url='https://10.32.4.29:5000/v3',
)
# Ensure container exists
try:
container = driver.get_container(container_name='fractals')
except:
# Create container if it doesn't exist
container = driver.create_container(container_name='fractals')
template_path = resource_filename(__name__, "templates") template_path = resource_filename(__name__, "templates")
app = flask.Flask('faafo.api', template_folder=template_path) app = flask.Flask('faafo.api', template_folder=template_path)
app.config['DEBUG'] = CONF.debug app.config['DEBUG'] = CONF.debug
@ -73,10 +94,10 @@ def list_opts():
return [(None, copy.deepcopy(api_opts))] return [(None, copy.deepcopy(api_opts))]
class Fractal(db.Model): class Fractal(db.Model):
uuid = db.Column(db.String(36), primary_key=True) uuid = db.Column(db.String(36), primary_key=True)
checksum = db.Column(db.String(256), unique=True) checksum = db.Column(db.String(256), unique=True)
url = db.Column(db.String(256), nullable=True) url = db.Column(db.String(256), nullable=True) # Stores Swift object name/path
duration = db.Column(db.Float) duration = db.Column(db.Float)
size = db.Column(db.Integer, nullable=True) size = db.Column(db.Integer, nullable=True)
width = db.Column(db.Integer, nullable=False) width = db.Column(db.Integer, nullable=False)
@ -86,13 +107,6 @@ class Fractal(db.Model):
xb = db.Column(db.Float, nullable=False) xb = db.Column(db.Float, nullable=False)
ya = db.Column(db.Float, nullable=False) ya = db.Column(db.Float, nullable=False)
yb = db.Column(db.Float, nullable=False) yb = db.Column(db.Float, nullable=False)
if CONF.database_url.startswith('mysql'):
LOG.debug('Using MySQL database backend')
image = db.Column(mysql.MEDIUMBLOB, nullable=True)
else:
image = db.Column(db.LargeBinary, nullable=True)
generated_by = db.Column(db.String(256), nullable=True) generated_by = db.Column(db.String(256), nullable=True)
def __repr__(self): def __repr__(self):
@ -106,6 +120,36 @@ manager = APIManager(app=app, session=db.session)
connection = Connection(CONF.transport_url) connection = Connection(CONF.transport_url)
def upload_image_to_swift(image_bytes, object_name):
"""Upload image bytes to Swift storage and return the object name."""
try:
LOG.debug(f"Uploading image to Swift: {object_name}")
obj = driver.upload_object_via_stream(
iterator=io.BytesIO(image_bytes),
container=container,
object_name=object_name
)
LOG.debug(f"Successfully uploaded {object_name} to Swift")
return object_name
except Exception as e:
LOG.error(f"Failed to upload image to Swift: {e}")
raise
def download_image_from_swift(object_name):
"""Download image from Swift storage."""
try:
LOG.debug(f"Downloading image from Swift: {object_name}")
obj = driver.get_object(container_name='fractals', object_name=object_name)
stream = driver.download_object_as_stream(obj)
image_data = b''.join(stream)
LOG.debug(f"Successfully downloaded {object_name} from Swift")
return image_data
except Exception as e:
LOG.error(f"Failed to download image from Swift: {e}")
raise
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
@app.route('/index', methods=['GET']) @app.route('/index', methods=['GET'])
@app.route('/index/<int:page>', methods=['GET']) @app.route('/index/<int:page>', methods=['GET'])
@ -120,20 +164,17 @@ def index(page=1):
@app.route('/fractal/<string:fractalid>', methods=['GET']) @app.route('/fractal/<string:fractalid>', methods=['GET'])
def get_fractal(fractalid): def get_fractal(fractalid):
fractal = Fractal.query.filter_by(uuid=fractalid).first() fractal = Fractal.query.filter_by(uuid=fractalid).first()
if not fractal: if not fractal or not fractal.url:
response = flask.jsonify({'code': 404, return flask.jsonify({'code': 404, 'message': 'Fractal not found'}), 404
'message': 'Fracal not found'})
response.status_code = 404
else:
image_data = base64.b64decode(fractal.image)
image = Image.open(io.BytesIO(image_data))
output = io.BytesIO()
image.save(output, "PNG")
image.seek(0)
response = flask.make_response(output.getvalue())
response.content_type = "image/png"
return response try:
image_data = download_image_from_swift(fractal.url)
response = flask.make_response(image_data)
response.content_type = "image/png"
return response
except Exception as e:
LOG.error(f"Error retrieving fractal {fractalid}: {e}")
return flask.jsonify({'code': 500, 'message': 'Error retrieving fractal'}), 500
def generate_fractal(**kwargs): def generate_fractal(**kwargs):
@ -147,15 +188,41 @@ def generate_fractal(**kwargs):
def convert_image_to_binary(**kwargs): def convert_image_to_binary(**kwargs):
"""Process the image data from worker and upload to Swift."""
LOG.debug("Preprocessor call: " + str(kwargs)) LOG.debug("Preprocessor call: " + str(kwargs))
if 'image' in kwargs['data']['data']['attributes']: if 'image' in kwargs['data']['data']['attributes']:
LOG.debug("Converting image to binary...") LOG.debug("Processing image for Swift upload...")
kwargs['data']['data']['attributes']['image'] = \
str(kwargs['data']['data']['attributes']['image']).encode("ascii") # Get the base64 encoded image from worker
image_base64 = kwargs['data']['data']['attributes']['image']
image_bytes = base64.b64decode(image_base64)
# Generate object name using UUID
fractal_uuid = kwargs['data']['data']['attributes']['uuid']
object_name = f"{fractal_uuid}.png"
try:
# Upload to Swift
swift_object_name = upload_image_to_swift(image_bytes, object_name)
# Update the fractal record with Swift object name instead of binary data
kwargs['data']['data']['attributes']['url'] = swift_object_name
# Remove the binary image data since we're storing in Swift
del kwargs['data']['data']['attributes']['image']
LOG.debug(f"Image uploaded to Swift as {swift_object_name}")
except Exception as e:
LOG.error(f"Failed to upload image to Swift: {e}")
# Keep the binary data as fallback if Swift upload fails
kwargs['data']['data']['attributes']['image'] = \
str(kwargs['data']['data']['attributes']['image']).encode("ascii")
def main(): def main():
print("Starting API server - new...") print("Starting API server with Swift storage...")
with app.app_context(): with app.app_context():
manager.create_api(Fractal, methods=['GET', 'POST', 'DELETE', 'PATCH'], manager.create_api(Fractal, methods=['GET', 'POST', 'DELETE', 'PATCH'],
postprocessors={'POST_RESOURCE': [generate_fractal]}, postprocessors={'POST_RESOURCE': [generate_fractal]},
@ -163,4 +230,4 @@ def main():
exclude=['image'], exclude=['image'],
url_prefix='/v1', url_prefix='/v1',
allow_client_generated_ids=True) allow_client_generated_ids=True)
app.run(host=CONF.listen_address, port=CONF.bind_port, debug=True) app.run(host=CONF.listen_address, port=CONF.bind_port, debug=True)

View File

@ -37,11 +37,11 @@ LOG = log.getLogger('faafo.worker')
CONF = cfg.CONF CONF = cfg.CONF
worker_opts = { worker_opts = [
cfg.StrOpt('endpoint-url', cfg.StrOpt('endpoint-url',
default='http://localhost', default='http://localhost',
help='API connection URL') help='API connection URL')
} ]
CONF.register_opts(worker_opts) CONF.register_opts(worker_opts)
@ -84,6 +84,13 @@ class JuliaSet(object):
self.image.save(fp, "PNG") self.image.save(fp, "PNG")
return fp.name return fp.name
def get_image_bytes(self):
"""Return image as bytes without saving to file."""
with tempfile.NamedTemporaryFile() as fp:
self.image.save(fp, "PNG")
fp.seek(0)
return fp.read()
def _set_point(self): def _set_point(self):
random.seed() random.seed()
while True: while True:
@ -116,6 +123,8 @@ class Worker(ConsumerMixin):
LOG.info("processing task %s" % task['uuid']) LOG.info("processing task %s" % task['uuid'])
LOG.debug(task) LOG.debug(task)
start_time = time.time() start_time = time.time()
# Generate the fractal
juliaset = JuliaSet(task['width'], juliaset = JuliaSet(task['width'],
task['height'], task['height'],
task['xa'], task['xa'],
@ -127,16 +136,20 @@ class Worker(ConsumerMixin):
LOG.info("task %s processed in %f seconds" % LOG.info("task %s processed in %f seconds" %
(task['uuid'], elapsed_time)) (task['uuid'], elapsed_time))
filename = juliaset.get_file() # Get image as bytes instead of saving to file
LOG.debug("saved result of task %s to temporary file %s" % image_bytes = juliaset.get_image_bytes()
(task['uuid'], filename)) size = len(image_bytes)
with open(filename, "rb") as fp:
size = os.fstat(fp.fileno()).st_size # Calculate checksum
image = base64.b64encode(fp.read()) checksum = hashlib.sha256(image_bytes).hexdigest()
checksum = hashlib.sha256(open(filename, 'rb').read()).hexdigest()
os.remove(filename) # Convert to base64 for JSON transmission
LOG.debug("removed temporary file %s" % filename) image_base64 = base64.b64encode(image_bytes).decode("ascii")
LOG.debug("generated fractal %s, size: %d bytes, checksum: %s" %
(task['uuid'], size, checksum))
# Prepare result for API
result = { result = {
'data': { 'data': {
'type': 'fractal', 'type': 'fractal',
@ -144,7 +157,7 @@ class Worker(ConsumerMixin):
'attributes': { 'attributes': {
'uuid': task['uuid'], 'uuid': task['uuid'],
'duration': elapsed_time, 'duration': elapsed_time,
'image': image.decode("ascii"), 'image': image_base64, # This will be processed by API and uploaded to Swift
'checksum': checksum, 'checksum': checksum,
'size': size, 'size': size,
'generated_by': socket.gethostname() 'generated_by': socket.gethostname()
@ -155,12 +168,22 @@ class Worker(ConsumerMixin):
headers = {'Content-Type': 'application/vnd.api+json', headers = {'Content-Type': 'application/vnd.api+json',
'Accept': 'application/vnd.api+json'} 'Accept': 'application/vnd.api+json'}
resp = requests.patch("%s/v1/fractal/%s" % try:
(CONF.endpoint_url, str(task['uuid'])), resp = requests.patch("%s/v1/fractal/%s" %
json.dumps(result), (CONF.endpoint_url, str(task['uuid'])),
headers=headers, json.dumps(result),
timeout=30) headers=headers,
LOG.debug("Result: %s" % resp.text) timeout=30)
LOG.debug("API Response: %s" % resp.text)
if resp.status_code not in [200, 201]:
LOG.error("API request failed with status %d: %s" %
(resp.status_code, resp.text))
else:
LOG.info("Successfully uploaded fractal %s to Swift storage" % task['uuid'])
except Exception as e:
LOG.error("Failed to send result to API: %s" % e)
message.ack() message.ack()
return result return result