diff --git a/requirements.txt b/requirements.txt index a8fb115..19a049c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ apscheduler==3.6.0 certifi==2018.11.29 chardet==3.0.4 Click==7.0 +firebase-admin==5.4.0 Flask==1.0.2 Flask-SQLAlchemy==2.3.2 Flask-Migrate==2.1.1 diff --git a/timeline_sync/__init__.py b/timeline_sync/__init__.py index 27732ce..620fb96 100644 --- a/timeline_sync/__init__.py +++ b/timeline_sync/__init__.py @@ -1,6 +1,7 @@ from flask import Flask, request from werkzeug.middleware.proxy_fix import ProxyFix from rws_common import honeycomb +import firebase_admin from .settings import config from .api import init_api @@ -16,6 +17,8 @@ init_app(app) init_api(app) # Includes both private (timeline-sync) and public (timeline-api) APIs +default_app = firebase_admin.initialize_app() + @app.route('/heartbeat') @app.route('/timeline-sync/heartbeat') def heartbeat(): diff --git a/timeline_sync/api.py b/timeline_sync/api.py index 949f51e..e2feb95 100644 --- a/timeline_sync/api.py +++ b/timeline_sync/api.py @@ -3,7 +3,7 @@ import uuid import requests from .models import db, SandboxToken, TimelinePin, UserTimeline, TimelineTopic, TimelineTopicSubscription, AppGlance -from .utils import get_uid, api_error, pin_valid, glance_valid +from .utils import get_uid, api_error, pin_valid, glance_valid, send_fcm_message, send_fcm_message_to_topics, subscribe_to_fcm_topic, unsubscribe_from_fcm_topic from .settings import config import beeline @@ -60,25 +60,41 @@ def sync(): if last_timeline is not None: last_timeline_id = last_timeline.id - last_glance_id = request.args.get('glance') - - app_glances = db.session.query(AppGlance).filter_by(user_id=user_id) - if last_glance_id is not None: - app_glances = app_glances.filter(AppGlance.id > last_glance_id) - - last_glance = app_glances.order_by(AppGlance.id.desc()).first() - if last_glance is not None: - last_glance_id = last_glance.id - timeline_updates = [user_timeline_item.to_json() for user_timeline_item in user_timeline.order_by(UserTimeline.id.asc())] - glances_updates = [glance.to_json() for glance in app_glances.order_by(AppGlance.id.asc())] result = { - "updates": timeline_updates + glances_updates, - "syncURL": url_for('api.sync', timeline=last_timeline_id, glance=last_glance_id, _external=True) + "updates": timeline_updates, + "syncURL": url_for('api.sync', timeline=last_timeline_id, _external=True) } return jsonify(result) +@api.route('/user/fcm_token/') +def fcm_token(): + user_id = get_uid() + + if request.method == 'PUT': + fcm_token_json = request.json + + fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).one_or_none() + if fcm_token is None: + fcm_token = FcmToken.from_json(fcm_token_json, token, user_id) + if fcm_token is None: + return api_error(400) + + db.session.add(fcm_token) + db.session.commit() + + subscriptions = TimelineTopicSubscription.query.filter_by(user_id=user_id) + for subscription in subscriptions: + subscribe_to_fcm_topic(user_id, subscription.topic) + + elif request.method == 'DELETE': + fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).first_or_404() + fcm_token.delete() + + db.session.commit() + return 'OK' + @api.route('/user/pins/', methods=['PUT', 'DELETE']) def user_pin(pin_id): @@ -107,6 +123,8 @@ def user_pin(pin_id): db.session.add(pin) db.session.add(user_timeline) db.session.commit() + + send_fcm_message(user_id, { 'type': 'timeline.pin.create' }) else: # update pin try: pin.update_from_json(pin_json) @@ -122,6 +140,8 @@ def user_pin(pin_id): db.session.add(pin) db.session.add(user_timeline) db.session.commit() + + send_fcm_message(user_id, { 'type': 'timeline.pin.create' }) except (KeyError, ValueError): beeline.add_context_field('timeline.failure.cause', 'update_pin') return api_error(400) @@ -138,6 +158,8 @@ def user_pin(pin_id): pin=pin) db.session.add(user_timeline) db.session.commit() + + send_fcm_message(user_id, { 'type': 'timeline.pin.delete' }) return 'OK' @@ -152,7 +174,6 @@ def get_app_info(timeline_token): return app_info['app_uuid'], f"uuid:{app_info['app_uuid']}" - @api.route('/shared/pins/', methods=['PUT', 'DELETE']) def shared_pin(pin_id): try: @@ -169,7 +190,7 @@ def shared_pin(pin_id): topic = TimelineTopic.query.filter_by(app_uuid=app_uuid, name=topic_string).one_or_none() if topic is None: - topic = TimelineTopic(app_uuid=app_uuid, name=topic_string) + topic = TimelineTopic(app_uuid=app_uuid, name=topic_string, create_time=datetime.datetime.utcnow()) db.session.add(topic) topics.append(topic) @@ -198,6 +219,8 @@ def shared_pin(pin_id): db.session.add(user_timeline) db.session.commit() + + send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' }) else: # update pin try: pin.update_from_json(pin_json) @@ -217,12 +240,15 @@ def shared_pin(pin_id): db.session.add(user_timeline) db.session.commit() + + send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' }) except (KeyError, ValueError): beeline.add_context_field('timeline.failure.cause', 'update_pin') return api_error(400) elif request.method == 'DELETE': pin = TimelinePin.query.filter_by(app_uuid=app_uuid, user_id=None, id=pin_id).first_or_404() + topics = pin.topics # No need to post even old create events, since nobody will render # them, after all. @@ -236,6 +262,9 @@ def shared_pin(pin_id): db.session.add(user_timeline) db.session.commit() + + send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.delete' }) + return 'OK' @@ -265,7 +294,7 @@ def user_subscriptions_manage(topic_string): topic = TimelineTopic.query.filter_by(app_uuid=app_uuid, name=topic_string).one_or_none() if topic is None: - topic = TimelineTopic(app_uuid=app_uuid, name=topic_string) + topic = TimelineTopic(app_uuid=app_uuid, name=topic_string, create_time=datetime.datetime.utcnow()) db.session.add(topic) if request.method == 'POST': @@ -274,13 +303,39 @@ def user_subscriptions_manage(topic_string): subscription = TimelineTopicSubscription(user_id=user_id, topic=topic) db.session.add(subscription) + # Add pins user now has a subscription for + for pin in topic.pins: + user_timeline = UserTimeline(user_id=subscription.user_id, + type='timeline.pin.create', + pin=pin) + db.session.add(user_timeline) + + user_timeline = UserTimeline(user_id=user_id, type='timeline.topic.subscription', topic=topic) + db.session.add(user_timeline) + db.session.commit() + subscribe_to_fcm_topic(user_id, topic) + send_fcm_message(user_id, { 'type': 'timeline.topic.subscription' }) + elif request.method == 'DELETE': TimelineTopicSubscription.query.filter_by(user_id=user_id, topic=topic).delete() + # Clean up pins user no longer has a subscription for + for pin in topic.pins: + user_timeline = UserTimeline(user_id=subscription.user_id, + type='timeline.pin.delete', + pin=pin) + db.session.add(user_timeline) + + user_timeline = UserTimeline(user_id=user_id, type='timeline.topic.unsubscription', topic=topic) + db.session.add(user_timeline) + db.session.commit() + unsubscribe_from_fcm_topic(user_id, topic) + send_fcm_message(user_id, { 'type': 'timeline.topic.unsubscription' }) + return 'OK' @@ -305,7 +360,14 @@ def user_app_glance(): return api_error(400) db.session.add(glance) + + user_timeline = UserTimeline(user_id=user_id, type='appglance.slice.create', app_glance=glance) + db.session.add(user_timeline) + db.session.commit() + + send_fcm_message(user_id, { 'type': 'appglance.slice.create' }) + return 'OK' diff --git a/timeline_sync/models.py b/timeline_sync/models.py index e89bfcc..26f1483 100644 --- a/timeline_sync/models.py +++ b/timeline_sync/models.py @@ -20,6 +20,28 @@ class SandboxToken(db.Model): db.Index('sandbox_token_uid_appuuid_index', SandboxToken.user_id, SandboxToken.app_uuid, unique=True) +class FcmToken(db.Model): + __tablename__ = 'fcm_tokens' + token = db.Column(db.String, primary_key=True) + user_id = db.Column(db.Integer) + device_id = db.Column(db.String) + platform = db.Column(db.String) + + @classmethod + def from_json(cls, fcm_token_json, token, user_id): + try: + fcm_token = cls( + token=token, + device_id=fcm_token_json['device_id'], + platform=fcm_token_json['platform'], + user_id=user_id, + ) + return fcm_token + except (KeyError, ValueError): + return None + + +db.Index('fcm_token_uid_token_index', FcmToken.user_id, FcmToken.token, unique=True) class TimelinePin(db.Model): __tablename__ = 'timeline_pins' @@ -109,25 +131,39 @@ class UserTimeline(db.Model): pin = db.relationship('TimelinePin', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True)) pin_id = db.Column(UUID(as_uuid=True), db.ForeignKey('timeline_pins.guid', ondelete='CASCADE')) + app_glance = db.relationship('TimelinePin', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True)) + app_glance_id = db.Column(db.Integer, db.ForeignKey('app_glances.id', ondelete='CASCADE')) + topic = db.relationship('TimelineTopic', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True)) + topic_id = db.Column(db.Integer, db.ForeignKey('timeline_topic.id', ondelete='CASCADE')) def to_json(self): if self.type == 'timeline.pin.create' or self.type == 'timeline.pin.delete': return {'type': self.type, 'data': self.pin.to_json()} + elif self.type == 'timeline.topic.subscription' or self.type == 'timeline.topic.unsubscription': + return {'type': self.type, 'data': self.topic.to_json()} + elif self.type == 'appglance.slice.create': + return {'type': self.type, 'data': self.app_glance.to_json()} else: return None -db.Index('user_timeline_userid_pinid', UserTimeline.user_id, UserTimeline.pin_id, unique = True) +db.Index('user_timeline_userid_pinid_appglanceid_topicid', UserTimeline.user_id, UserTimeline.pin_id, UserTimeline.app_glance_id, UserTimeline.topic_id, unique = True) class TimelineTopic(db.Model): __tablename__ = 'timeline_topics' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), nullable=False) app_uuid = db.Column(UUID(as_uuid=True), nullable=False) + create_time = db.Column(db.DateTime, nullable=False) subscriptions = db.relationship('TimelineTopicSubscription', backref='TimelineTopic') pins = db.relationship('TimelinePin', secondary='timeline_pin_topic', backref='TimelineTopic') + def to_json(self): + return {'createTime': time_to_str(self.create_time), + 'dataSource': f"uuid:{self.app_uuid}" + 'topic': self.name} + db.Index('timeline_topic_appuuid_name_index', TimelineTopic.app_uuid, TimelineTopic.name, unique=True) class TimelinePinTopic(db.Model): @@ -175,10 +211,9 @@ def from_json(cls, slices, app_uuid, user_id, data_source): return None def to_json(self): - return {'type': 'appglance.slice.create', - 'data': {'createTime': time_to_str(self.create_time), - 'dataSource': self.data_source, - 'slices': [glance_slice.to_json() for glance_slice in self.slices]}} + return {'createTime': time_to_str(self.create_time), + 'dataSource': self.data_source, + 'slices': [glance_slice.to_json() for glance_slice in self.slices]} db.Index('app_glance_userid_appuuid', AppGlance.user_id, AppGlance.app_uuid, unique = True) diff --git a/timeline_sync/utils.py b/timeline_sync/utils.py index d16291c..e813edd 100644 --- a/timeline_sync/utils.py +++ b/timeline_sync/utils.py @@ -2,6 +2,7 @@ from flask import request, abort, jsonify from .settings import config import datetime +import firebase_admin import beeline @@ -122,3 +123,68 @@ def glance_valid(glance_json): return False return True + +def send_fcm_message(user_id, data): + if user_id is None: + raise ValueError + + fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id) + tokens = [fcm_token.token for fcm_token in fcm_tokens] + + message = firebase_admin.messaging.Message( + data=data, + tokens=tokens, + ) + + response = firebase_admin.messaging.send_each_for_multicast(message) + + if response.failure_count > 0: + responses = response.responses + for idx, resp in enumerate(responses): + if not resp.success: + FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete() + + +def send_fcm_message_to_topics(topics, data): + condition = ' || '.join([f"'{str(topic.id)}' in topics" for topic in topics]) + + message = firebase_admin.messaging.Message( + data=data, + condition=condition, + ) + + response = firebase_admin.messaging.send(message) + + if not response.success: + return api_error(400) + + +def subscribe_to_fcm_topic(user_id, topic): + if user_id is None: + raise ValueError + + fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id) + tokens = [fcm_token.token for fcm_token in fcm_tokens] + + response = firebase_admin.messaging.subscribe_to_topic(tokens, str(topic.id)) + + if response.failure_count > 0: + responses = response.responses + for idx, resp in enumerate(responses): + if not resp.success: + FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete() + +def unsubscribe_from_fcm_topic(user_id, topic): + if user_id is None: + raise ValueError + + fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id) + tokens = [fcm_token.token for fcm_token in fcm_tokens] + + response = firebase_admin.messaging.unsubscribe_from_topic(tokens, str(topic.id)) + + if response.failure_count > 0: + responses = response.responses + for idx, resp in enumerate(responses): + if not resp.success: + FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()