Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apscheduler==3.6.0
certifi==2018.11.29
chardet==3.0.4
Click==7.0
firebase-admin==5.4.0
Flask==1.0.2
Flask-SQLAlchemy==2.3.2
Flask-Migrate==2.1.1
Expand Down
3 changes: 3 additions & 0 deletions timeline_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flask import Flask, request
from werkzeug.middleware.proxy_fix import ProxyFix
from rws_common import honeycomb
import firebase_admin

from .settings import config
from .api import init_api
Expand All @@ -16,6 +17,8 @@
init_app(app)
init_api(app) # Includes both private (timeline-sync) and public (timeline-api) APIs

default_app = firebase_admin.initialize_app()

@app.route('/heartbeat')
@app.route('/timeline-sync/heartbeat')
def heartbeat():
Expand Down
96 changes: 79 additions & 17 deletions timeline_sync/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
import requests
from .models import db, SandboxToken, TimelinePin, UserTimeline, TimelineTopic, TimelineTopicSubscription, AppGlance
from .utils import get_uid, api_error, pin_valid, glance_valid
from .utils import get_uid, api_error, pin_valid, glance_valid, send_fcm_message, send_fcm_message_to_topics, subscribe_to_fcm_topic, unsubscribe_from_fcm_topic
from .settings import config

import beeline
Expand Down Expand Up @@ -60,25 +60,41 @@ def sync():
if last_timeline is not None:
last_timeline_id = last_timeline.id

last_glance_id = request.args.get('glance')

app_glances = db.session.query(AppGlance).filter_by(user_id=user_id)
if last_glance_id is not None:
app_glances = app_glances.filter(AppGlance.id > last_glance_id)

last_glance = app_glances.order_by(AppGlance.id.desc()).first()
if last_glance is not None:
last_glance_id = last_glance.id

timeline_updates = [user_timeline_item.to_json() for user_timeline_item in user_timeline.order_by(UserTimeline.id.asc())]
glances_updates = [glance.to_json() for glance in app_glances.order_by(AppGlance.id.asc())]

result = {
"updates": timeline_updates + glances_updates,
"syncURL": url_for('api.sync', timeline=last_timeline_id, glance=last_glance_id, _external=True)
"updates": timeline_updates,
"syncURL": url_for('api.sync', timeline=last_timeline_id, _external=True)
}
return jsonify(result)

@api.route('/user/fcm_token/<token>')
def fcm_token():
user_id = get_uid()

if request.method == 'PUT':
fcm_token_json = request.json

fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).one_or_none()
if fcm_token is None:
fcm_token = FcmToken.from_json(fcm_token_json, token, user_id)
if fcm_token is None:
return api_error(400)

db.session.add(fcm_token)
db.session.commit()

subscriptions = TimelineTopicSubscription.query.filter_by(user_id=user_id)
for subscription in subscriptions:
subscribe_to_fcm_topic(user_id, subscription.topic)

elif request.method == 'DELETE':
fcm_token = FcmToken.query.filter_by(user_id=user_id, token=token).first_or_404()
fcm_token.delete()

db.session.commit()
return 'OK'


@api.route('/user/pins/<pin_id>', methods=['PUT', 'DELETE'])
def user_pin(pin_id):
Expand Down Expand Up @@ -107,6 +123,8 @@ def user_pin(pin_id):
db.session.add(pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.create' })
else: # update pin
try:
pin.update_from_json(pin_json)
Expand All @@ -122,6 +140,8 @@ def user_pin(pin_id):
db.session.add(pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.create' })
except (KeyError, ValueError):
beeline.add_context_field('timeline.failure.cause', 'update_pin')
return api_error(400)
Expand All @@ -138,6 +158,8 @@ def user_pin(pin_id):
pin=pin)
db.session.add(user_timeline)
db.session.commit()

send_fcm_message(user_id, { 'type': 'timeline.pin.delete' })
return 'OK'


Expand All @@ -152,7 +174,6 @@ def get_app_info(timeline_token):
return app_info['app_uuid'], f"uuid:{app_info['app_uuid']}"



@api.route('/shared/pins/<pin_id>', methods=['PUT', 'DELETE'])
def shared_pin(pin_id):
try:
Expand All @@ -169,7 +190,7 @@ def shared_pin(pin_id):
topic = TimelineTopic.query.filter_by(app_uuid=app_uuid, name=topic_string).one_or_none()

if topic is None:
topic = TimelineTopic(app_uuid=app_uuid, name=topic_string)
topic = TimelineTopic(app_uuid=app_uuid, name=topic_string, create_time=datetime.datetime.utcnow())
db.session.add(topic)

topics.append(topic)
Expand Down Expand Up @@ -198,6 +219,8 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' })
else: # update pin
try:
pin.update_from_json(pin_json)
Expand All @@ -217,12 +240,15 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.create' })
except (KeyError, ValueError):
beeline.add_context_field('timeline.failure.cause', 'update_pin')
return api_error(400)

elif request.method == 'DELETE':
pin = TimelinePin.query.filter_by(app_uuid=app_uuid, user_id=None, id=pin_id).first_or_404()
topics = pin.topics

# No need to post even old create events, since nobody will render
# them, after all.
Expand All @@ -236,6 +262,9 @@ def shared_pin(pin_id):
db.session.add(user_timeline)

db.session.commit()

send_fcm_message_to_topics(topics, { 'type': 'timeline.pin.delete' })

return 'OK'


Expand Down Expand Up @@ -265,7 +294,7 @@ def user_subscriptions_manage(topic_string):

topic = TimelineTopic.query.filter_by(app_uuid=app_uuid, name=topic_string).one_or_none()
if topic is None:
topic = TimelineTopic(app_uuid=app_uuid, name=topic_string)
topic = TimelineTopic(app_uuid=app_uuid, name=topic_string, create_time=datetime.datetime.utcnow())
db.session.add(topic)

if request.method == 'POST':
Expand All @@ -274,13 +303,39 @@ def user_subscriptions_manage(topic_string):
subscription = TimelineTopicSubscription(user_id=user_id, topic=topic)
db.session.add(subscription)

# Add pins user now has a subscription for
for pin in topic.pins:
user_timeline = UserTimeline(user_id=subscription.user_id,
type='timeline.pin.create',
pin=pin)
db.session.add(user_timeline)

user_timeline = UserTimeline(user_id=user_id, type='timeline.topic.subscription', topic=topic)
db.session.add(user_timeline)

db.session.commit()

subscribe_to_fcm_topic(user_id, topic)
send_fcm_message(user_id, { 'type': 'timeline.topic.subscription' })

elif request.method == 'DELETE':
TimelineTopicSubscription.query.filter_by(user_id=user_id, topic=topic).delete()

# Clean up pins user no longer has a subscription for
for pin in topic.pins:
user_timeline = UserTimeline(user_id=subscription.user_id,
type='timeline.pin.delete',
pin=pin)
db.session.add(user_timeline)

user_timeline = UserTimeline(user_id=user_id, type='timeline.topic.unsubscription', topic=topic)
db.session.add(user_timeline)

db.session.commit()

unsubscribe_from_fcm_topic(user_id, topic)
send_fcm_message(user_id, { 'type': 'timeline.topic.unsubscription' })

return 'OK'


Expand All @@ -305,7 +360,14 @@ def user_app_glance():
return api_error(400)

db.session.add(glance)

user_timeline = UserTimeline(user_id=user_id, type='appglance.slice.create', app_glance=glance)
db.session.add(user_timeline)

db.session.commit()

send_fcm_message(user_id, { 'type': 'appglance.slice.create' })

return 'OK'


Expand Down
45 changes: 40 additions & 5 deletions timeline_sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ class SandboxToken(db.Model):

db.Index('sandbox_token_uid_appuuid_index', SandboxToken.user_id, SandboxToken.app_uuid, unique=True)

class FcmToken(db.Model):
__tablename__ = 'fcm_tokens'
token = db.Column(db.String, primary_key=True)
user_id = db.Column(db.Integer)
device_id = db.Column(db.String)
platform = db.Column(db.String)

@classmethod
def from_json(cls, fcm_token_json, token, user_id):
try:
fcm_token = cls(
token=token,
device_id=fcm_token_json['device_id'],
platform=fcm_token_json['platform'],
user_id=user_id,
)
return fcm_token
except (KeyError, ValueError):
return None


db.Index('fcm_token_uid_token_index', FcmToken.user_id, FcmToken.token, unique=True)

class TimelinePin(db.Model):
__tablename__ = 'timeline_pins'
Expand Down Expand Up @@ -109,25 +131,39 @@ class UserTimeline(db.Model):

pin = db.relationship('TimelinePin', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True))
pin_id = db.Column(UUID(as_uuid=True), db.ForeignKey('timeline_pins.guid', ondelete='CASCADE'))
app_glance = db.relationship('TimelinePin', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True))
app_glance_id = db.Column(db.Integer, db.ForeignKey('app_glances.id', ondelete='CASCADE'))
topic = db.relationship('TimelineTopic', lazy=False, uselist=False, backref=db.backref('timelines', passive_deletes=True))
topic_id = db.Column(db.Integer, db.ForeignKey('timeline_topic.id', ondelete='CASCADE'))

def to_json(self):
if self.type == 'timeline.pin.create' or self.type == 'timeline.pin.delete':
return {'type': self.type, 'data': self.pin.to_json()}
elif self.type == 'timeline.topic.subscription' or self.type == 'timeline.topic.unsubscription':
return {'type': self.type, 'data': self.topic.to_json()}
elif self.type == 'appglance.slice.create':
return {'type': self.type, 'data': self.app_glance.to_json()}
else:
return None

db.Index('user_timeline_userid_pinid', UserTimeline.user_id, UserTimeline.pin_id, unique = True)
db.Index('user_timeline_userid_pinid_appglanceid_topicid', UserTimeline.user_id, UserTimeline.pin_id, UserTimeline.app_glance_id, UserTimeline.topic_id, unique = True)

class TimelineTopic(db.Model):
__tablename__ = 'timeline_topics'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False)
app_uuid = db.Column(UUID(as_uuid=True), nullable=False)
create_time = db.Column(db.DateTime, nullable=False)

subscriptions = db.relationship('TimelineTopicSubscription', backref='TimelineTopic')

pins = db.relationship('TimelinePin', secondary='timeline_pin_topic', backref='TimelineTopic')

def to_json(self):
return {'createTime': time_to_str(self.create_time),
'dataSource': f"uuid:{self.app_uuid}"
'topic': self.name}

db.Index('timeline_topic_appuuid_name_index', TimelineTopic.app_uuid, TimelineTopic.name, unique=True)

class TimelinePinTopic(db.Model):
Expand Down Expand Up @@ -175,10 +211,9 @@ def from_json(cls, slices, app_uuid, user_id, data_source):
return None

def to_json(self):
return {'type': 'appglance.slice.create',
'data': {'createTime': time_to_str(self.create_time),
'dataSource': self.data_source,
'slices': [glance_slice.to_json() for glance_slice in self.slices]}}
return {'createTime': time_to_str(self.create_time),
'dataSource': self.data_source,
'slices': [glance_slice.to_json() for glance_slice in self.slices]}

db.Index('app_glance_userid_appuuid', AppGlance.user_id, AppGlance.app_uuid, unique = True)

Expand Down
66 changes: 66 additions & 0 deletions timeline_sync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask import request, abort, jsonify
from .settings import config
import datetime
import firebase_admin

import beeline

Expand Down Expand Up @@ -122,3 +123,68 @@ def glance_valid(glance_json):
return False
return True


def send_fcm_message(user_id, data):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

message = firebase_admin.messaging.Message(
data=data,
tokens=tokens,
)

response = firebase_admin.messaging.send_each_for_multicast(message)

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()


def send_fcm_message_to_topics(topics, data):
condition = ' || '.join([f"'{str(topic.id)}' in topics" for topic in topics])

message = firebase_admin.messaging.Message(
data=data,
condition=condition,
)

response = firebase_admin.messaging.send(message)

if not response.success:
return api_error(400)


def subscribe_to_fcm_topic(user_id, topic):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

response = firebase_admin.messaging.subscribe_to_topic(tokens, str(topic.id))

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()

def unsubscribe_from_fcm_topic(user_id, topic):
if user_id is None:
raise ValueError

fcm_tokens = db.session.query(FcmToken).filter_by(user_id=user_id)
tokens = [fcm_token.token for fcm_token in fcm_tokens]

response = firebase_admin.messaging.unsubscribe_from_topic(tokens, str(topic.id))

if response.failure_count > 0:
responses = response.responses
for idx, resp in enumerate(responses):
if not resp.success:
FcmToken.query.filter_by(user_id=user_id, token=tokens[idx]).delete()