Skip to content

feat: LDES for all public DCAT data#104

Open
mirdono wants to merge 9 commits into
developmentfrom
feat/dcat-federation-combined-ldes
Open

feat: LDES for all public DCAT data#104
mirdono wants to merge 9 commits into
developmentfrom
feat/dcat-federation-combined-ldes

Conversation

@mirdono
Copy link
Copy Markdown
Member

@mirdono mirdono commented May 21, 2026

This PR adds a single LDES to the app that publishes all DCAT data in the app's public graph. Additional partner-specific streams will be added in follow-up PRs.

Proposed approach

Add the ldes-delta-pusher and ldes-serve-feed services to the stack.

The initialization file configures all RDF resource types that should be added to the LDES, these are all DCAT resource types as well as related ones such as foaf:Agent and skos:Concept. Using graphFilters we explicitly limit to data in the http://mu.semte.ch/graphs/public graph as this is the only one currently containing DCAT data. Furthermore, associated data resources are only considered relevant of they are (indirectly) linked to a DCAT resource. For example, a foaf:Agent resource is only considered interesting if it is used as dcterms:publisher or dcat:contactPoint for a catalog or dataset resource.

The dispatch file handles processing incoming delta messages. For this it relies on the configuration in the initialization file. For each received delta message it essentially does the following:

  1. Extract the set of unique subjects in the inserts contained in the delta message its changesets.
  2. Filter the subjects that are configured for the stream, i.e. filter out subjects that are not interesting.
  3. Retrieve all triples for each interesting subject from the triplestore.
  4. Pass the set of all retrieved triples on to the service's moveTriples function for further processing.

The healing file converts the contents of initialization to a format appropriate for the healing functionality of the service. I use dcterms:modified as healing predicate. This means we need to make sure to update (or add) this triple whenever changing a published resource. This could be automated using the track-modified service. But I don't expect the published resources to change (often) so I opted to avoid this additional complexity. Note, that dcterms:modified MAY be absent for DCAT resources, this means our healing will not work for DCAT resources where it is deliberately not added.

How to test

Initialising the LDES

  1. Check out the branch of this PR
  2. Create an ldes-feed folder in data
  3. To simplify testing add the following entries to your docker-compose.override.yml. The WRITE_INITIAL_STATE tells the delta pusher to create its initial feed upon starting. Setting the LDES_BASE and BASE_URL allows to more easily browse the feed locally, making links to, for example, other pages of the form http://localhost/ldes/public/3.
  ldes-delta-pusher:
    environment:
      WRITE_INITIAL_STATE: "true"
      LDES_BASE: "http://localhost/ldes/"
  ldes-serve-feed:
    environment:
      BASE_URL: "http://localhost/ldes/"
  1. Restart the dispatcher and deltantofier services such that they load their updated configurations.
  2. Up the ldes-delta-pusher and ldes-serve-feed. The ldes-delta-pusher should automatically initialise itself, the logs will contain something like ldes-delta-pusher-1 | 2026-05-21T14:32:41.413526565Z done writing initial state when it has finished.
  3. You can inspect the contents of the feed either by browsing to http://localhost/ldes/public/ or by inspecting the TTL files in the data/ldes-feed/public/ folder. The initial data should correspond to that inserted in this migration.

Adding new resources to the LDES

To test whether the ldes-delta-pusher properly reacts to new data you need to insert appropriate data via the database so that delta messages are forwarded. This can be simplified using a fake service that inserts data. To this end, add something like the following to you docker-compose.override.yml:

  fake-data:
    image: semtech/mu-javascript-template:1.9.1
    environment:
      NODE_ENV: "development"
    volumes:
      - "./config/fake-data/:/app"

Create the config/fake-data folder and insert the following files:

package.json

{
  "dependencies": {
    "@lblod/mu-auth-sudo": "^1.1.0"
  }
}

app.js

import { app, errorHandler } from "mu";
import { updateSudo } from "@lblod/mu-auth-sudo";

// Insert DCAT data into the public graph, all this data should appear on the
// LDES.
app.get("/insert", async function (req, res) {
  try {
    const result = await updateSudo(`
      PREFIX cms: <http://mu.semte.ch/vocabulary/cms/>
      PREFIX dcat: <http://www.w3.org/ns/dcat#>
      PREFIX dcterms: <http://purl.org/dc/terms/>
      PREFIX ext: <http://mu.semte.ch/vocabularies/ext/>
      PREFIX foaf: <http://xmlns.com/foaf/0.1/>
      PREFIX mu: <http://mu.semte.ch/vocabularies/core/>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
      PREFIX vcard: <http://www.w3.org/2006/vcard/ns#>
      PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>

      INSERT DATA {
        GRAPH <http://mu.semte.ch/graphs/public> {
          # Catalog
          ext:TestCatalog a dcat:Catalog ;
                          mu:uuid """652740f4-3807-4726-9ad4-466adfa6edbb""" ;
                          dcterms:issued "2026-05-21"^^xsd:date ;
                          dcterms:modified "2026-05-21T14:00:00Z"^^xsd:dateTime ;
                          dcterms:language <http://publications.europa.eu/resource/authority/language/ENG> ;
                          foaf:homepage <https://www.vlaanderen.be/lokaal-bestuur/digitale-transformatie/slimme-lokale-databronnen/decide> ;
                          dcterms:publisher ext:TestAgent ;
                          dcat:themeTaxonomy ext:TestConceptScheme ;
                          dcat:dataset ext:TestDataset ;
                          dcat:record ext:TestCatalogRecord ;
                          dcterms:title """Test DCAT catalog""" ;
                          dcterms:description """A catalog resource used to test the DCAT LDES.""" .

          # Dataset
          ext:TestDataset a dcat:Dataset ;
                          mu:uuid """10e9ac45-3230-432f-99b5-f2a51506375c""" ;
                          dcterms:publisher ext:TestAgent ;
                          dcat:contactPoint ext:TestAgentContact ;
                          dcat:theme ext:TestConcept ;
                          dcterms:accessRights <http://publications.europa.eu/resource/authority/access-right/PUBLIC> ;
                          dcat:distribution ext:TestDistribution ;
                          dcterms:title """Test DCAT dataset""" ;
                          dcterms:description """A dataset resource used to test the DCAT LDES.""" .

          # Catalog record
          ext:TestCatalogRecord a dcat:CatalogRecord ;
                                mu:uuid """07841736-3183-4951-ac6a-84f8c0f0a387""" ;
                                dcterms:issued "2026-05-21"^^xsd:date ;
                                dcterms:modified "2026-05-21T14:00:00Z"^^xsd:dateTime ;
                                foaf:primaryTopic ext:TestDataset ;
                                dcterms:title """Test catalog record""" ;
                                dcterms:description """Test catalog record used to test the DCAT LDES. """ .


          # Distribution
          ext:TestDistribution a dcat:Distribution ;
                               mu:uuid """5be172f6-fda0-4264-ae9a-91d602378774""" ;
                               dcterms:format ext:TestFormat ;
                               dcat:accessURL <https://example.org> ;
                               dcat:accessService ext:TestDataService ;
                               dcterms:license <http://opendatacommons.org/licenses/pddl/1.0/> ;
                               dcterms:title """Test DCAT distributie"""@nl , """Test DCAT distribution"""@en ;
                               dcterms:description """Test distributie om the DCAT LDES te testen."""@nl , """Test distribution to test the DCAT LDES."""@en .


          # Data service
          ext:TestDataService a dcat:DataService ;
                              mu:uuid """adf78f11-0b01-47af-812c-23c6e7fae859""" ;
                              dcterms:conformsTo <https://www.w3.org/TR/sparql11-protocol/> ;
                              dcat:endpointURL <https://exampl.org/sparql> ;
                              dcterms:accessRights <http://publications.europa.eu/resource/authority/access-right/PUBLIC> ;
                              dcat:servesDataset ext:TestDataset ;
                              dcterms:title """Test data service"""@nl ;
                              dcat:endpointDescription """Test data service om DCAT LDES te testen"""@nl , """Test data service to test the DCAT LDES"""@en .

          # Agent
          ext:TestAgent a foaf:Agent ;
                        mu:uuid """84f1151a-1681-452c-9d3d-3beb92da2f37""" ;
                        vcard:hasEmail "test@example.org" ;
                        vcard:hasName """Test agent"""@nl .

          ext:TestAgentContact a foaf:Agent ;
                               mu:uuid """20260c41-7c10-4569-a780-4eb9a1b0fe17""" ;
                               vcard:hasEmail "contact@example.org" ;
                               vcard:hasName """Test contact agent"""@nl .

          # Format
          ext:TestFormat a dcterms:MediaTypeOrExtent ;
                         mu:uuid """4f3864ed-7727-4c56-9bab-72840bbaaaf6""" ;
                         cms:page ext:TestPage ;
                         rdfs:label """Test format""" ;
                                    dcterms:description """Test format to test the DCAT LDES.""" .


          # Page
          ext:TestPage a cms:Page ;
                       mu:uuid """75d024f7-6941-4ccb-b698-479dfb83b86a""" ;
                       dcterms:title """Test page""" ;
                                     cms:pageContent """Test page for testing DCAT LDES.""" .

          # Concept scheme
          ext:TestConceptScheme a skos:ConceptScheme ;
                                mu:uuid """b9c829ae-427d-414f-994f-e2a1697ffdea""" .

          # Concept
          ext:TestConcept a skos:Concept ;
                          mu:uuid """4645f09b-5e3f-4a47-ab5e-ba1b09402300""" ;
                          skos:inScheme ext:TestConcept .
        }
      }`);
  } catch {
    res.status(500).send("error occurred");
  }

  res.send("DCAT data inserted");
});

// Insert a concept scheme that is not related to DCAT data, should NOT be added
// to the LDES
app.get("/concept-scheme", async function (req, res) {
  try {
    await updateSudo(`
  PREFIX ext: <http://mu.semte.ch/vocabularies/ext/>
  PREFIX mu: <http://mu.semte.ch/vocabularies/core/>
  PREFIX skos: <http://www.w3.org/2004/02/skos/core#>

  INSERT DATA {
    GRAPH <http://mu.semte.ch/graphs/public> {
      ext:SomeConceptScheme a skos:ConceptScheme ;
                            mu:uuid """bba983e4-9a1e-4303-85ab-a1c0d78ff4e4""" .
    }
  }
`);
  } catch {
    res.status(500).send("error occurred");
  }
  res.send("Concept scheme inserted");
});

// Inserts a resource type that is not relevant for the LDES, should NOT be
// added to the LDES
app.get("/person", async function (req, res) {
  try {
    const result = await updateSudo(`
     PREFIX foaf: <http://xmlns.com/foaf/0.1/>
     PREFIX ext: <http://mu.semte.ch/vocabularies/ext/>
     PREFIX dcat: <http://www.w3.org/ns/dcat#>
     PREFIX dcterms: <http://purl.org/dc/terms/>
     PREFIX mu: <http://mu.semte.ch/vocabularies/core/>

     INSERT DATA {
       GRAPH <http://mu.semte.ch/graphs/public> {
         ext:subject a foaf:Person ;
                     foaf:firstName "foo" ;
                     foaf:familyName "bar" .
       }
     }`);
  } catch {
    res.status(500).send("error occurred");
  }

  res.send("Person resource inserted");
});

// Insert data in another graph than the public one, data should NOT be added to
// the LDES
app.get("/alt-graph", async function (req, res) {
  try {
    const result = await updateSudo(`
     PREFIX ext: <http://mu.semte.ch/vocabularies/ext/>
     PREFIX dcat: <http://www.w3.org/ns/dcat#>
     PREFIX dcterms: <http://purl.org/dc/terms/>
     PREFIX mu: <http://mu.semte.ch/vocabularies/core/>

     INSERT DATA {
       GRAPH <http://mu.semte.ch/graphs/public/gent> {
         ext:dcatSubjectTwo a dcat:Dataset ;
                            dcterms:description """Dataset description""" ;
                            dcterms:title """Dataset title""" ;
                            mu:uuid """10e9ac45-3230-432f-99b5-f2a51506375c""" .
       }
     }`);
  } catch {
    res.status(500).send("error occurred");
  }

  res.send("Date inserted in non-public graph");
});

app.use(errorHandler);

Up the fake-data service. You can call the different endpoints using docker compose exec fake-data curl http://localhost/ENDPOINT. The comments for each endpoint in app.js shortly explain what behaviour is expected. For example, the insert endpoint inserts a bunch of DCAT data all of which should appear in the LDES. Note, it might take a few moments before the data shows up, keep an eye on the ldes-delta-pusher logs for its progress.

Healing

To test the healing functionality you have to insert/update data directly in the triplestore, for example using a migration or by querying it directly. For instance, the following query updates the title of the Catalogue resource.

PREFIX dcterms: <http://purl.org/dc/terms/>

DELETE {
  GRAPH <http://mu.semte.ch/graphs/public> {
    ?s dcterms:modified ?modified ;
       dcterms:title ?title .
  }
}
INSERT {
  GRAPH <http://mu.semte.ch/graphs/public> {
    ?s dcterms:modified ?now ;
       dcterms:title """Jeej, updated the catalogue's title!!""" .

  }
} WHERE {
  VALUES ?s {
    <http://data.lblod.info/id/catalogs/80fb699a-0d95-4089-843d-79c89138cdb7>
  }
  GRAPH <http://mu.semte.ch/graphs/public> {
    ?s dcterms:title ?title .
    OPTIONAL {
      # Optional because not all resources already have a `dcterms:modified`
      # triple.
      ?s dcterms:modified ?modified .
    }
  }
  BIND (NOW() AS ?now)
}

After executing the query, manually trigger the healing: docker compose exec ldes-delta-pusher curl -X POST http://localhost/manual-healing. Afterwards you should be able the find a new version of the catalogue resource with the updated title.

TODO

  • The healing configuration is currently incomplete. Not all existing DCAT data previously added has a dcterms:modified triples. Furthermore, the associated resources, such as foaf:Agent and skos:Concept, have no suitable triples. While the former can be fixed in the data itself we need to be careful not change the meaning of the dcterms:modifed triples. Possibly we need to add track-modified service to support proper healing.

Notes

  • Support for multiple streams is planned for a follow-up ticket (LBRON-1348)

Related tickets

  • LBRON-892

@mirdono mirdono self-assigned this May 21, 2026
@mirdono mirdono added the enhancement New feature or request label May 21, 2026
@mirdono mirdono force-pushed the feat/dcat-federation-combined-ldes branch from af20bfb to 6214713 Compare May 28, 2026 06:19
mirdono added 8 commits May 28, 2026 14:39
This service will be used to add data to the LDES feeds.  This commit just adds
it without further configuration.  So it will use its default one, which does
nothing for this app.  Proper configuration that is relevant for the data
handled by this app will be added later.
Configuration for the `ldes-delta-pusher` service to initialise a public feed
with all DCAT-related resources.  This `initialization` configuration will also
be re-used by the dispatch and healing functionality.
Process incoming delta messages by
- extracting any inserts for the public graph
- extract subjects for relevant resource types
- retrieve all triples for the subject
- insert these triples into the stream
The `dispatch` function is also reused in the healing flow.  There changeset
inserts are faked with `http://mu.semte.ch/graphs/application` as graph[1].
Filtering inserts based on public graph therefore breaks the healing
functionality.

Alternatively, one could filter inserts for the `public` AND `application`
graphs.  But this might to similar subtle bugs when changing the configuration
in the future.  It seems better to rely fully on the filters in `initialization`
to ensure only correct data is put on the LDES.

[1]: https://github.com/redpencilio/ldes-delta-pusher-service/blob/f0b272e4cda9bbcef69547d7073ff6b8710e4701/self-healing/heal-ldes-data.ts#L19-L55
@mirdono mirdono force-pushed the feat/dcat-federation-combined-ldes branch from 6214713 to b67e510 Compare May 28, 2026 15:23
Add some basic healing functionality.  We rely on the `dcterms:modified`
predicate to determine whether a resource has changed.  This has two
disadvantages:
- When changing an existing resource we need to make sure the appropriate value
  is set for `dcterms:modified`.  This could be automated using the
  `track-modified-service` but considering DCAT data should not change often we
  opt to avoid that additional complexity.
- For DCAT the absence of `dcterms:modified` has a specific meaning.  If the
  upstream source for a DCAT resource chooses not to set a timestamp, e.g. to
  support continuous updates, out healing will fail to pick up any changes
  unless we add a timestamp ourselves.
@mirdono mirdono force-pushed the feat/dcat-federation-combined-ldes branch from b67e510 to 9f15b73 Compare May 28, 2026 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant