Skip to content

Wikipedia Page Create

Full Recipe

Shared by: Landon Kuhn

Wikipedia page creation events are instantiated in the graph with relationships to a reified time model.

Wikipedia Page Create Recipe
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
version: 1
title: Ingest Wikipedia Page Create stream
contributor: https://github.com/landon9720
summary: Consume events about new Wikipedia pages to build a time series reified graph
description: |-
  Wikipedia page creation events are instantiated in the graph with relationships to a reified time model.
  Additionally, page creation event comments are echoed to standard output.

  Data source documentation: https://stream.wikimedia.org/?doc#/streams/get_v2_stream_page_create
ingestStreams:
  - type: ServerSentEventsIngest
    url: https://stream.wikimedia.org/v2/stream/page-create
    format:
      type: CypherJson
      query: |-
        MATCH (revNode), (dbNode), (userNode) 
        WHERE id(revNode) = idFrom("revision", $that.rev_id)
          AND id(dbNode) = idFrom("db", $that.database)
          AND id(userNode) = idFrom("id", $that.performer.user_id)

        // Set labels for nodes //
        CALL create.setLabels(revNode, ["rev:" + $that.page_title])
        CALL create.setLabels(dbNode, ["db:" + $that.database])
        CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])

        // Create timeNode node to provide day/hour/minute bucketing and counting of revNodes //
        CALL reify.time(datetime($that.rev_timestamp), ["year", "month", "day", "hour", "minute", "second"]) YIELD node AS timeNode
        CALL incrementCounter(timeNode, "count", 1) YIELD count AS timeNodeCount

        // Set properties for nodes //
        SET revNode = $that,
            revNode.type = "rev"

        SET dbNode.database = $that.database,
            dbNode.type = "db"

        SET userNode = $that.performer,
            userNode.type = "user"

        // Create edges between nodes //
        CREATE (revNode)-[:DB]->(dbNode),
               (revNode)-[:BY]->(userNode),
               (revNode)-[:AT]->(timeNode)
standingQueries:
  - pattern:
      type: Cypher
      query: |-
        MATCH (n)
        WHERE n.comment IS NOT NULL
        RETURN DISTINCT id(n) AS id
    outputs:
      output-1:
        type: CypherQuery
        query: |-
          MATCH (n)
          WHERE id(n) = $that.data.id
          RETURN n.comment AS line
        andThen:
          type: PrintToStandardOut
nodeAppearances: []
quickQueries: []
sampleQueries:
  - name: Show time nodes
    query: >
      MATCH (n)
      WHERE n.period IS NOT NULL
      RETURN n
  - name: Show revision nodes
    query: >
      MATCH (n)
      WHERE n.type = "rev"
      RETURN n
  - name: Show database nodes
    query: >
      MATCH (n)
      WHERE n.type = "db"
      RETURN n
  - name: Show user nodes
    query: >
      MATCH (n)
      WHERE n.type = "user"
      RETURN n

Download Recipe

Scenario

In this scenario, Quine consumes Wikipedia first revision page create events from the Mediawiki EventStreams service.

Sample Data

Data source documentation: /streams/get_v2_stream_page_create

How it Works

The recipe receives Server Sent Events (SSE) using an ingest stream to manifest a graph in Quine.

INGEST-1 processes the SSE stream consisting of JSON records like:

{
  "$schema": "/mediawiki/revision/create/1.1.0",
  "meta": {
    "uri": "https://commons.wikimedia.org/wiki/User_talk:Florentin_Bart",
    "request_id": "c11b80bf-26ea-4e0f-9369-5f80ccaa276d",
    "id": "c34b93bc-14a8-4642-9aba-d4d07821ff33",
    "dt": "2023-02-07T21:03:26Z",
    "domain": "commons.wikimedia.org",
    "stream": "mediawiki.page-create",
    "topic": "eqiad.mediawiki.page-create",
    "partition": 0,
    "offset": 260591941
  },
  "database": "commonswiki",
  "page_id": 128505356,
  "page_title": "User_talk:Florentin_Bart",
  "page_namespace": 3,
  "rev_id": 730781127,
  "rev_timestamp": "2023-02-07T21:03:26Z",
  "rev_sha1": "en26ue963xtjl98402aitklflzn6srp",
  "rev_minor_edit": true,
  "rev_len": 238,
  "rev_content_model": "wikitext",
  "rev_content_format": "text/x-wiki",
  "performer": {
    "user_text": "Wikimedia Commons Welcome",
    "user_groups": [
      "autopatrolled",
      "*",
      "user",
      "autoconfirmed"
    ],
    "user_is_bot": false,
    "user_id": 302461,
    "user_registration_dt": "2008-05-28T14:23:02Z",
    "user_edit_count": 11295026
  },
  "page_is_redirect": false,
  "comment": "Adding [[Template:Welcome|welcome message]] to new user's talk page",
  "parsedcomment": "Adding <a href=\"/wiki/Template:Welcome\" title=\"Template:Welcome\">welcome message</a> to new user&#039;s talk page",
  "rev_slots": {
    "main": {
      "rev_slot_content_model": "wikitext",
      "rev_slot_sha1": "en26ue963xtjl98402aitklflzn6srp",
      "rev_slot_size": 238,
      "rev_slot_origin_rev_id": 730781127
    }
  }
}

The ingest query identifies revNode, dbNode, userNode nodes, loads them into the graph, and populates them with properties. The query also converts timestamps into timeNode nodes using the reify.time procedure for event bucketing.

    - type: ServerSentEventsIngest
      url: https://stream.wikimedia.org/v2/stream/page-create
      format:
        type: CypherJson
        query: |-
          MATCH (revNode), (dbNode), (userNode) 
          WHERE id(revNode) = idFrom("revision", $that.rev_id)
            AND id(dbNode) = idFrom("db", $that.database)
            AND id(userNode) = idFrom("id", $that.performer.user_id)

          // Set labels for nodes //
          CALL create.setLabels(revNode, ["rev:" + $that.page_title])
          CALL create.setLabels(dbNode, ["db:" + $that.database])
          CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])

          // Create timeNode node to provide day/hour/minute bucketing and counting of revNodes //
          CALL reify.time(datetime($that.rev_timestamp), ["year", "month", "day", "hour", "minute", "second"]) YIELD node AS timeNode
          CALL incrementCounter(timeNode, "count", 1) YIELD count AS timeNodeCount

          // Set properties for nodes //
          SET revNode = $that,
              revNode.type = "rev"

          SET dbNode.database = $that.database,
              dbNode.type = "db"

          SET userNode = $that.performer,
              userNode.type = "user"

          // Create edges between nodes //
          CREATE (revNode)-[:DB]->(dbNode),
                (revNode)-[:BY]->(userNode),
                (revNode)-[:AT]->(timeNode)
POST /api/v1/ingest/INGEST-1
{
  "type": "ServerSentEventsIngest",
  "url": "https://stream.wikimedia.org/v2/stream/page-create",
  "format": {
    "type": "CypherJson",
    "query": "MATCH (revNode), (dbNode), (userNode) \nWHERE id(revNode) = idFrom(\"revision\", $that.rev_id)\n  AND id(dbNode) = idFrom(\"db\", $that.database)\n  AND id(userNode) = idFrom(\"id\", $that.performer.user_id)\n\n// Set labels for nodes //\nCALL create.setLabels(revNode, [\"rev:\" + $that.page_title])\nCALL create.setLabels(dbNode, [\"db:\" + $that.database])\nCALL create.setLabels(userNode, [\"user:\" + $that.performer.user_text])\n\n// Create timeNode node to provide day/hour/minute bucketing and counting of revNodes //\nCALL reify.time(datetime($that.rev_timestamp), [\"year\", \"month\", \"day\", \"hour\", \"minute\", \"second\"]) YIELD node AS timeNode\nCALL incrementCounter(timeNode, \"count\", 1) YIELD count AS timeNodeCount\n\n// Set properties for nodes //\nSET revNode = $that,\n    revNode.type = \"rev\"\n\nSET dbNode.database = $that.database,\n    dbNode.type = \"db\"\n\nSET userNode = $that.performer,\n    userNode.type = \"user\"\n\n// Create edges between nodes //\nCREATE (revNode)-[:DB]->(dbNode),\n      (revNode)-[:BY]->(userNode),\n      (revNode)-[:AT]->(timeNode)"
  }
}

A standing query is configured to detect when new nodes are added to the graph and prints the event to standard out.

- pattern:
    type: Cypher
    query: |-
      MATCH (n)
      WHERE n.comment IS NOT NULL
      RETURN DISTINCT id(n) AS id
  outputs:
    output-1:
      type: CypherQuery
      query: |-
        MATCH (n)
        WHERE id(n) = $that.data.id
        RETURN n.comment AS line
      andThen:
        type: PrintToStandardOut
/api/v1/query/standing/STANDING-1
{
  "pattern": {
    "type": "Cypher",
    "query": "MATCH (n)\nWHERE n.comment IS NOT NULL\nRETURN DISTINCT id(n) AS id"
  },
  "outputs": {
    "output-1": {
      "type": "CypherQuery",
      "query": "MATCH (n)\nWHERE id(n) = $that.data.id\nRETURN n.comment AS line",
      "andThen": {
        "type": "PrintToStandardOut"
      }
    }
  }
}

The resulting event stream looks like this in the console.

2023-02-07 15:39:37,967 Standing query `output-1` match: {"meta":{"isPositiveMatch":true,"resultId":"b995e3a0-12d2-2139-d349-4757801ad666"},"data":{"line":"Adding [[Template:Welcome|welcome message]] to new user's talk page"}}

Running the Recipe

 java -jar quine-1.8.2.jar -r wikipedia.yaml
Graph is ready
Running Recipe: Ingest Wikipedia Page Create stream
Using 4 sample queries
Running Standing Query STANDING-1
Running Ingest Stream INGEST-1
Quine web server available at http://localhost:8080 

Summary

This recipe can serve as a boilerplate for other streaming recipes using the Wikipedia EventStreams source. We use variations of this recipe in our getting started guide and product demos.