To import data from Metry to store it in another system, Streams is the preferred method. Consumptions values and other data can be pushed through streams as soon as they are saved in Metry that other systems can consume.
See the Streams API reference for more details
If the meters in Metry needs to be mapped to other objects in your system for it to be able to save or process the data, see Mapping meters to objects in your system for how this can be done.
Importing consumption data
Any application can create a stream for an authenticated Metry account to start importing data continuously.
Creating a stream
The first step is to create a Stream, which is likely going to happen just after the the user in Metry has approved your application and your application has received a refresh token and an access token. This is done via the API with a request like below (including the access token as usual).
POST https://app.metry.io/api/v2/streams
{
"resource": "consumption"
}
This will return a response like below.
{
"code": 200,
"message": "OK",
"data": {
"_id": "5c90afaf775578007d6da703",
"created": "2019-03-19T09:00:31+0000",
"holder_id": "5c90afa377557800970067f6",
"client_id": "5c90afaa7755780097006802",
"resource": "consumption",
"granularity": null,
"last_offset": null
}
}
The “_id” property should preferably be saved in your application’s database to later be able to consume the stream. However, it is also possible to list all streams that exist for an account to get the stream’s id.
Consuming the stream
After creating the stream, all historical data for the account will be added to the stream for your application to consume.
Consuming a stream is done via the API using an offset that is returned in each response. If no offset is specified in the request, such as when first starting to consume the stream, it will return the first data in the stream or from the last offset which is set after requesting with a greater offset. A response does not necessarily include all data in the stream. This means that the client should keep track of the last offset locally in memory, making additional requests, updating the local last offset each time, until no more records are returned from the stream.
The first request will look like below. It is a POST request with the stream’s id.
POST https://app.metry.io/api/v2/streams/5c90afaf775578007d6da703/consume
{
"offset": null
}
The response contains a batch of data and its format depends on the resource, which in this case is consumption. It contains a property called records that holds consumption data from the batch of data returned from the stream. Consumption values are grouped by their meter in each record.
The property offset should be extracted and used in the next request to consume the stream to get new data.
{
"code": 200,
"message": "OK",
"data": {
"offset": 1552988044643026000,
"resource": "consumption",
"records": [
{
"meter": {
"_id": "5c90bb1077557800b028b68a",
"ean": "735999000000000000",
"timezone": "Etc/GMT-1"
},
"values": [
{
"value": 0.5312,
"granularity": "hour",
"metric": "flow",
"updated": false,
"period": "2017100100",
"pushed_at":"2018-05-24 11:15:25"
},
{
"value": 1.3543,
"granularity": "hour",
"metric": "flow",
"updated": false,
"period": "2017100101",
"pushed_at":"2018-05-24 11:17:43"
}
]
}
]
}
}
Example implementation (Python)
The following code demonstrate the creation of a stream and consuming it.
import requests
import json
import time
import os
class MetryClient():
BASE_URL = 'https://app.metry.io/api/v2/'
def __init__(self, access_token):
self.headers = {
'Authorization': 'Bearer ' + access_token
}
def get(self, path):
response = requests.get(MetryClient.BASE_URL + path, headers=self.headers)
return response.json()['data']
def post(self, path, payload):
response = requests.post(MetryClient.BASE_URL + path, json=payload, headers=self.headers)
return response.json()['data']
def run():
client = MetryClient(os.getenv('METRY_ACCESS_TOKEN'))
streams = client.get('streams')
if len(streams) > 0:
stream_id = streams[0]['_id']
print("Using stream with ID " + stream_id)
else:
stream = client.post('streams', {'resource': 'consumption', 'granularity': 'month'})
stream_id = stream['_id']
print("Created a new stream with id " + stream_id)
print("Waiting for initial data to be pushed...")
# It is possible that it will take longer for data to become available.
# In which case, just rerun the import later.
time.sleep(10)
last_offset = None
while True:
batch = client.post('streams/%s/consume' % stream_id, {
'offset': last_offset
})
if len(batch['records']) == 0:
print("Nothing more in the stream for now. Stopping.")
break
else:
last_offset = batch['offset']
for record in batch['records']:
print(json.dumps(record))
if __name__ == '__main__':
run()