Skip to content

RewanshChoudhary/SensorDetailsAndAlertSystems

Repository files navigation

Geo-Spatial Data Streaming

Real-time backend system for ingesting air-quality sensor data, storing geospatial records, computing streaming analytics, and pushing live updates to clients.

Key Engineering Highlights

  • Designed event-driven architecture with Kafka topics for raw, average, and alert streams.
  • Implemented geospatial persistence (geometry(Point,4326)) with PostgreSQL/PostGIS.
  • Added windowed stream aggregation (Kafka Streams tumbling windows).
  • Standardized a single JSON event contract across producer, stream processor, and listeners.
  • Added Flyway migrations and profile-based config (dev / prod) for reproducibility.
  • Added backend unit tests for core services and listener behavior.

Tech Stack

  • Java 25
  • Spring Boot 3.5
  • Spring Kafka + Kafka Streams
  • Spring Data JPA + Hibernate Spatial
  • PostgreSQL + PostGIS
  • Flyway
  • JUnit 5 + Mockito

System Components

  • DataSendingService: generates sample events and publishes to Kafka sample topic.
  • DataStoringService: stores each event into sensor_data.
  • PublishAverageToKafkaStreamService: computes windowed averages + emits alerts.
  • KafkaMessageListener: consumes Kafka outputs and pushes WebSocket updates.
  • DataRetentionScheduler: archives and prunes old records.

Data Flow (End-to-End)

flowchart LR
  A[SampleDataProvider] --> B[DataSendingService]
  B -->|JSON event| C[(Kafka: sample topic)]
  B --> D[(PostGIS: sensor_data)]
  C --> E[Kafka Streams Processor]
  E -->|avg per sensor| F[(Kafka: avg topic)]
  E -->|threshold breach| G[(Kafka: alert topic)]
  C --> H[KafkaMessageListener]
  F --> H
  G --> H
  H --> I[/topic/sensor-data]
  H --> J[/topic/sensor-data-avg]
  H --> K[/topic/sensor-data-alert]
  D --> L[DataRetentionScheduler]
  L --> M[(sensor_data_archive)]
Loading

Step-by-step

  1. SampleDataProvider selects a sensor payload from sample JSON.
  2. DataSendingService serializes it as SensorReadingEvent and publishes it to app.kafka.sample-topic.
  3. DataStoringService stores the same event in PostGIS for historical querying.
  4. PublishAverageToKafkaStreamService reads sample-topic messages, extracts value, and:
    • computes tumbling-window averages by sensorId -> app.kafka.avg-topic
    • emits alert events when reading exceeds app.kafka.alert-threshold -> app.kafka.alert-topic
  5. KafkaMessageListener consumes sample/avg/alert topics and forwards live updates over STOMP WebSocket.
  6. DataRetentionScheduler periodically archives old rows and deletes expired operational/archive records.

Event Contract

{
  "sensorId": "air-co-001",
  "harmfulSubstance": "CO",
  "value": 0.75,
  "unit": "ppm",
  "eventTimestamp": "2025-08-23T23:00:00",
  "latitude": 12.9716,
  "longitude": 77.5946
}

Topics and WebSocket Endpoints

  • Kafka topics:
    • app.kafka.sample-topic (raw readings)
    • app.kafka.avg-topic (windowed averages)
    • app.kafka.alert-topic (threshold breaches)
  • WebSocket topics:
    • /topic/sensor-data
    • /topic/sensor-data-avg
    • /topic/sensor-data-alert

Database

  • Managed via Flyway migration:
    • src/main/resources/db/migration/V1__init_schema.sql
  • Core tables:
    • sensor_data
    • sensor_data_archive
    • sensor_statistics

Configuration

Use .env.example as the template for runtime variables.

Common variables:

  • APP_PROFILE, APP_PORT
  • KAFKA_BOOTSTRAP_SERVERS
  • KAFKA_SAMPLE_TOPIC, KAFKA_AVG_TOPIC, KAFKA_ALERT_TOPIC
  • DB_URL, DB_USERNAME, DB_PASSWORD
  • RETENTION_SENSOR_DAYS, RETENTION_ARCHIVE_DAYS

Run Locally

  1. Start infra:
    • docker compose up -d
  2. Run backend:
    • ./mvnw spring-boot:run
  3. Run tests:
    • ./mvnw test

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors