'use strict'; const _ = require('lodash'); const { v4 } = require('uuid'); const logger = require('../logger'); const nantum = require('../modules/nantum'); const reportSubscriptionParameters = { dataGranularitySeconds: 60, reportBackSeconds: 60, subscriptionDurationSeconds: 60 * 60, resubscribeDurationSeconds: 59 * 60, resubscribeAfterNoDataForSeconds: 90, }; function getSecondsSince(property, reportMetadata) { const value = reportMetadata[property]; if (value != null) { const millisDiff = new Date().getTime() - new Date(value).getTime(); if (millisDiff > 0) { return Math.round(millisDiff / 1000); } } } // this is called every time the VEN polls. Check to see if we have any stale subscriptions and resubscribe as needed. async function pollForReports( oadrPoll, clientCertificateCn, clientCertificateFingerprint, ) { logger.info( 'pollForReports', oadrPoll, clientCertificateCn, clientCertificateFingerprint, ); const ven = await nantum.getVenRegistration(clientCertificateFingerprint); if (!ven) { // not an error, we shouldn't fail polling because the VEN hasn't registered yet return; } const venReports = await nantum.getVenReports(ven._id); if (!venReports) { // not an error if no reports have been defined return; } const createRequests = []; for (const reportMetadata of venReports.reports) { let sendCreate = false; if (!reportMetadata.last_sent_create) { // if we've never sent a subscription request, do it logger.info( 'sending create because we never have', reportMetadata.report_specifier_id, ); sendCreate = true; } else { // have sent a create > 5s ago, not received a created if ( !reportMetadata.last_received_created && getSecondsSince('last_sent_create', reportMetadata) > 5 ) { logger.info( 'no reply to creation request, send another', reportMetadata.report_specifier_id, ); sendCreate = true; } } if ( getSecondsSince('last_received_update', reportMetadata) > reportSubscriptionParameters.resubscribeAfterNoDataForSeconds ) { // previously received data, silent now sendCreate = true; } if ( !reportMetadata.last_received_update && getSecondsSince('last_received_created', reportMetadata) > reportSubscriptionParameters.reportBackSeconds + 5 ) { // if we haven't received any data but we've waited long enough for one data interval + 5 seconds logger.info( 'sending create because have not received data', reportMetadata.report_specifier_id, ); sendCreate = true; } if ( getSecondsSince('last_received_created', reportMetadata) > reportSubscriptionParameters.resubscribeDurationSeconds ) { // when we're close to the end of the subscription, trigger a resubscribe logger.info( 'sending create because close to end of subscription', reportMetadata.report_specifier_id, ); sendCreate = true; } if (sendCreate) { const newReportRequestId = v4(); // track the last 10 registration ids reportMetadata.report_request_ids = [ newReportRequestId, ...reportMetadata.report_request_ids, ].slice(0, 10); createRequests.push({ reportRequestId: newReportRequestId, reportSpecifierId: reportMetadata.report_specifier_id, granularityDuration: `PT${reportSubscriptionParameters.dataGranularitySeconds}S`, reportBackDuration: `PT${reportSubscriptionParameters.reportBackSeconds}S`, startDate: new Date().toISOString(), duration: `PT${reportSubscriptionParameters.subscriptionDurationSeconds}S`, specifiers: reportMetadata.descriptions.map(description => ({ reportId: description.report_id, readingType: 'x-notApplicable', })), }); reportMetadata.last_sent_create = new Date().toISOString(); } } if (createRequests.length > 0) { const createReport = { _type: 'oadrCreateReport', requestId: v4(), requests: createRequests, }; await nantum.updateVenReports(venReports._id, venReports.reports); return createReport; } } async function registerReports( oadrRegisterReport, clientCertificateCn, clientCertificateFingerprint, ) { logger.info( 'registerReports', oadrRegisterReport, clientCertificateCn, clientCertificateFingerprint, ); const requestVenId = oadrRegisterReport.venId; validateVenId(requestVenId, clientCertificateFingerprint, false); const ven = await nantum.getVenRegistration(clientCertificateFingerprint); if (!ven) { throw new Error('VEN is not registered'); } const venReports = await nantum.getVenReports(ven._id); const venReportMetadata = (oadrRegisterReport.reports || []).map(report => { const { reportSpecifierId, descriptions } = report; const reportRequestId = v4(); return { report_request_ids: [reportRequestId], report_specifier_id: reportSpecifierId, descriptions: descriptions.map(description => { return { report_id: description.reportId, report_type: description.reportType, reading_type: description.readingType, sampling_rate: { min_period: description.samplingRate.minPeriod, max_period: description.samplingRate.maxPeriod, on_change: description.samplingRate.onChange, }, }; }), last_received_register: new Date().toISOString(), }; }); if (venReports) { //TODO: can we do an upsert here? await nantum.updateVenReports(venReports._id, venReportMetadata); } else { await nantum.createVenReports({ oadr_ven_registration_id: ven._id, reports: venReportMetadata }); } return { _type: 'oadrRegisteredReport', responseCode: '200', responseRequestId: oadrRegisterReport.requestId, responseDescription: 'OK', requests: [], }; } async function createdReports( oadrCreatedReport, clientCertificateCn, clientCertificateFingerprint, ) { logger.info( 'createdReports', oadrCreatedReport, clientCertificateCn, clientCertificateFingerprint, ); validateVenId(oadrCreatedReport.venId, clientCertificateFingerprint, false); const ven = await nantum.getVenRegistration(clientCertificateFingerprint); if (!ven) { throw new Error('VEN is not registered'); } const venReports = await nantum.getVenReports(ven._id); if (!venReports) { throw new Error('VEN does not have registered reports'); } if (oadrCreatedReport.pendingReports) { // flag reports as having been created for (const pendingReport of oadrCreatedReport.pendingReports) { const reportRequestId = pendingReport['reportRequestId']; const match = venReports.reports.filter(x => x.report_request_ids.includes(reportRequestId), )[0]; if (match) { match.last_received_created = new Date().toISOString(); } else { logger.info('could not match', reportRequestId, venReports.reports); } } await nantum.updateVenReports(venReports._id, venReports.reports); } return { _type: 'oadrResponse', responseCode: '200', responseDescription: 'OK', venId: clientCertificateFingerprint, }; } async function receiveReportData( oadrUpdateReport, clientCertificateCn, clientCertificateFingerprint, ) { logger.info( 'receiveReportData', oadrUpdateReport, clientCertificateCn, clientCertificateFingerprint, ); const requestVenId = oadrUpdateReport.venId; validateVenId(requestVenId, clientCertificateFingerprint, false); const ven = await nantum.getVenRegistration(clientCertificateFingerprint); if (!ven) { throw new Error('VEN is not registered'); } const venReports = await nantum.getVenReports(ven._id); if (!venReports) { throw new Error('VEN does not have registered reports'); } const readings = []; for (const updateReport of oadrUpdateReport.reports) { const reportRequestId = updateReport.reportRequestId; const match = venReports.reports.filter(x => x.report_request_ids.includes(reportRequestId), )[0]; if (!match) { logger.info('could not match', reportRequestId, venReports.reports); continue; } match.last_received_update = new Date().toISOString(); for (const interval of updateReport.intervals || []) { const reportId = interval.reportPayloads[0].reportId; const reportDefinition = _.find(match.descriptions, { report_id: reportId, }); if (!reportDefinition) { logger.info('Received data for unknown report ' + reportId); return; } const date = interval.startDate; if (interval.reportPayloads[0].payloadFloat) { const value = interval.reportPayloads[0].payloadFloat; readings.push({ date, report_specifier_id: updateReport.reportSpecifierId, report_name: updateReport.reportName, report_id: reportId, report_type: reportDefinition.report_type, value, }); } if ( interval.reportPayloads[0].payloadStatus && interval.reportPayloads[0].payloadStatus.loadControlState ) { const loadControlState = interval.reportPayloads[0].payloadStatus.loadControlState; Object.keys(loadControlState).forEach(type => { const typeObj = loadControlState[type]; Object.keys(typeObj).forEach(subType => { const value = typeObj[subType]; readings.push({ date, report_specifier_id: updateReport.reportSpecifierId, report_name: updateReport.reportName, report_id: reportId, report_type: reportDefinition.report_type, type, sub_type: subType, value, }); }); }); } } } if (readings.length > 0) { await nantum.sendReportReadings(ven, readings); } await nantum.updateVenReports(venReports._id, venReports.reports); return { _type: 'oadrUpdatedReport', responseCode: '200', responseRequestId: oadrUpdateReport.requestId, responseDescription: 'OK', venId: clientCertificateFingerprint, }; } function validateVenId(requestVenId, clientCertificateFingerprint, required) { if (requestVenId === clientCertificateFingerprint) { return; } if (!required && requestVenId == null) { return; } if (required && requestVenId == null) { const error = new Error('VenID is missing'); error.responseCode = 452; throw error; } const error = new Error('VenID does not match certificate'); error.responseCode = 452; throw error; } module.exports = { registerReports, createdReports, pollForReports, receiveReportData, };