|
@@ -1,15 +1,16 @@
|
|
|
'use strict';
|
|
'use strict';
|
|
|
|
|
+const _ = require('lodash');
|
|
|
|
|
+const { v4 } = require('uuid');
|
|
|
|
|
|
|
|
const logger = require('../logger');
|
|
const logger = require('../logger');
|
|
|
const nantum = require('../modules/nantum');
|
|
const nantum = require('../modules/nantum');
|
|
|
-const { v4 } = require('uuid');
|
|
|
|
|
|
|
|
|
|
const reportSubscriptionParameters = {
|
|
const reportSubscriptionParameters = {
|
|
|
dataGranularitySeconds: 60,
|
|
dataGranularitySeconds: 60,
|
|
|
reportBackSeconds: 60,
|
|
reportBackSeconds: 60,
|
|
|
subscriptionDurationSeconds: 60 * 60,
|
|
subscriptionDurationSeconds: 60 * 60,
|
|
|
resubscribeDurationSeconds: 59 * 60,
|
|
resubscribeDurationSeconds: 59 * 60,
|
|
|
- resubscribeAfterNoDataForSeconds: 90
|
|
|
|
|
|
|
+ resubscribeAfterNoDataForSeconds: 90,
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
function getSecondsSince(property, reportMetadata) {
|
|
function getSecondsSince(property, reportMetadata) {
|
|
@@ -22,6 +23,7 @@ function getSecondsSince(property, reportMetadata) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// this is called every time the VEN polls. Check to see if we have any stale subscriptions and resubscribe as needed.
|
|
|
async function pollForReports(
|
|
async function pollForReports(
|
|
|
oadrPoll,
|
|
oadrPoll,
|
|
|
clientCertificateCn,
|
|
clientCertificateCn,
|
|
@@ -34,30 +36,41 @@ async function pollForReports(
|
|
|
clientCertificateFingerprint,
|
|
clientCertificateFingerprint,
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
- const report = await nantum.fetchReport(clientCertificateFingerprint);
|
|
|
|
|
|
|
+ const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
|
|
+ if (!ven) {
|
|
|
|
|
+ // not an error, we shouldn't fail polling because the VEN hasn't registered yet
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
const createRequests = [];
|
|
const createRequests = [];
|
|
|
|
|
|
|
|
- if (report.venReportMetadata) {
|
|
|
|
|
- for (const reportMetadata of report.venReportMetadata) {
|
|
|
|
|
|
|
+ if (ven.reports) {
|
|
|
|
|
+ for (const reportMetadata of ven.reports) {
|
|
|
let sendCreate = false;
|
|
let sendCreate = false;
|
|
|
|
|
|
|
|
- if (!reportMetadata.lastSentCreate) {
|
|
|
|
|
|
|
+ if (!reportMetadata.last_sent_create) {
|
|
|
// if we've never sent a subscription request, do it
|
|
// if we've never sent a subscription request, do it
|
|
|
- logger.info('sending create because we never have', reportMetadata.reportSpecifierId);
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ 'sending create because we never have',
|
|
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
|
|
+ );
|
|
|
sendCreate = true;
|
|
sendCreate = true;
|
|
|
} else {
|
|
} else {
|
|
|
// have sent a create > 5s ago, not received a created
|
|
// have sent a create > 5s ago, not received a created
|
|
|
if (
|
|
if (
|
|
|
- !reportMetadata.lastReceivedCreated &&
|
|
|
|
|
- getSecondsSince('lastSentCreate', reportMetadata) > 5
|
|
|
|
|
|
|
+ !reportMetadata.last_received_created &&
|
|
|
|
|
+ getSecondsSince('last_sent_create', reportMetadata) > 5
|
|
|
) {
|
|
) {
|
|
|
- logger.info('no reply to creation request, send another', reportMetadata.reportSpecifierId);
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ 'no reply to creation request, send another',
|
|
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
|
|
+ );
|
|
|
sendCreate = true;
|
|
sendCreate = true;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (
|
|
if (
|
|
|
- getSecondsSince('lastReceivedUpdate', reportMetadata) >
|
|
|
|
|
|
|
+ getSecondsSince('last_received_update', reportMetadata) >
|
|
|
reportSubscriptionParameters.resubscribeAfterNoDataForSeconds
|
|
reportSubscriptionParameters.resubscribeAfterNoDataForSeconds
|
|
|
) {
|
|
) {
|
|
|
// previously received data, silent now
|
|
// previously received data, silent now
|
|
@@ -65,44 +78,50 @@ async function pollForReports(
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (
|
|
if (
|
|
|
- !reportMetadata.lastReceivedUpdate &&
|
|
|
|
|
- getSecondsSince('lastReceivedCreated', reportMetadata) >
|
|
|
|
|
|
|
+ !reportMetadata.last_received_update &&
|
|
|
|
|
+ getSecondsSince('last_received_created', reportMetadata) >
|
|
|
reportSubscriptionParameters.reportBackSeconds + 5
|
|
reportSubscriptionParameters.reportBackSeconds + 5
|
|
|
) {
|
|
) {
|
|
|
// if we haven't received any data but we've waited long enough for one data interval + 5 seconds
|
|
// if we haven't received any data but we've waited long enough for one data interval + 5 seconds
|
|
|
- logger.info('sending create because have not received data', reportMetadata.reportSpecifierId);
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ 'sending create because have not received data',
|
|
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
|
|
+ );
|
|
|
sendCreate = true;
|
|
sendCreate = true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (
|
|
if (
|
|
|
- getSecondsSince('lastReceivedCreated', reportMetadata) >
|
|
|
|
|
|
|
+ getSecondsSince('last_received_created', reportMetadata) >
|
|
|
reportSubscriptionParameters.resubscribeDurationSeconds
|
|
reportSubscriptionParameters.resubscribeDurationSeconds
|
|
|
) {
|
|
) {
|
|
|
// when we're close to the end of the subscription, trigger a resubscribe
|
|
// when we're close to the end of the subscription, trigger a resubscribe
|
|
|
- logger.info('sending create because close to end of subscription', reportMetadata.reportSpecifierId);
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ 'sending create because close to end of subscription',
|
|
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
|
|
+ );
|
|
|
sendCreate = true;
|
|
sendCreate = true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (sendCreate) {
|
|
if (sendCreate) {
|
|
|
const newReportRequestId = v4();
|
|
const newReportRequestId = v4();
|
|
|
// track the last 10 registration ids
|
|
// track the last 10 registration ids
|
|
|
- reportMetadata.reportRequestIds = [
|
|
|
|
|
|
|
+ reportMetadata.report_request_ids = [
|
|
|
newReportRequestId,
|
|
newReportRequestId,
|
|
|
- ...reportMetadata.reportRequestIds,
|
|
|
|
|
|
|
+ ...reportMetadata.report_request_ids,
|
|
|
].slice(0, 10);
|
|
].slice(0, 10);
|
|
|
createRequests.push({
|
|
createRequests.push({
|
|
|
reportRequestId: newReportRequestId,
|
|
reportRequestId: newReportRequestId,
|
|
|
- reportSpecifierId: reportMetadata.reportSpecifierId,
|
|
|
|
|
|
|
+ reportSpecifierId: reportMetadata.report_specifier_id,
|
|
|
granularityDuration: `PT${reportSubscriptionParameters.dataGranularitySeconds}S`,
|
|
granularityDuration: `PT${reportSubscriptionParameters.dataGranularitySeconds}S`,
|
|
|
reportBackDuration: `PT${reportSubscriptionParameters.reportBackSeconds}S`,
|
|
reportBackDuration: `PT${reportSubscriptionParameters.reportBackSeconds}S`,
|
|
|
startDate: new Date().toISOString(),
|
|
startDate: new Date().toISOString(),
|
|
|
duration: `PT${reportSubscriptionParameters.subscriptionDurationSeconds}S`,
|
|
duration: `PT${reportSubscriptionParameters.subscriptionDurationSeconds}S`,
|
|
|
specifiers: reportMetadata.descriptions.map(description => ({
|
|
specifiers: reportMetadata.descriptions.map(description => ({
|
|
|
- reportId: description.reportId,
|
|
|
|
|
|
|
+ reportId: description.report_id,
|
|
|
readingType: 'x-notApplicable',
|
|
readingType: 'x-notApplicable',
|
|
|
})),
|
|
})),
|
|
|
});
|
|
});
|
|
|
- reportMetadata.lastSentCreate = new Date().toISOString();
|
|
|
|
|
|
|
+ reportMetadata.last_sent_create = new Date().toISOString();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if (createRequests.length > 0) {
|
|
if (createRequests.length > 0) {
|
|
@@ -111,7 +130,10 @@ async function pollForReports(
|
|
|
requestId: v4(),
|
|
requestId: v4(),
|
|
|
requests: createRequests,
|
|
requests: createRequests,
|
|
|
};
|
|
};
|
|
|
- await nantum.updateReport(clientCertificateFingerprint, report);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ await nantum.updateVen(ven._id, {
|
|
|
|
|
+ reports: ven.reports,
|
|
|
|
|
+ });
|
|
|
return createReport;
|
|
return createReport;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -131,23 +153,38 @@ async function registerReports(
|
|
|
|
|
|
|
|
const requestVenId = oadrRegisterReport.venId;
|
|
const requestVenId = oadrRegisterReport.venId;
|
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
|
|
|
+ const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
|
|
+ if (!ven) {
|
|
|
|
|
+ throw new Error('VEN is not registered');
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
const venReportMetadata = (oadrRegisterReport.reports || []).map(report => {
|
|
const venReportMetadata = (oadrRegisterReport.reports || []).map(report => {
|
|
|
const { reportSpecifierId, descriptions } = report;
|
|
const { reportSpecifierId, descriptions } = report;
|
|
|
- const lastReceivedRegister = new Date().toISOString();
|
|
|
|
|
|
|
+
|
|
|
const reportRequestId = v4();
|
|
const reportRequestId = v4();
|
|
|
return {
|
|
return {
|
|
|
- reportRequestIds: [reportRequestId],
|
|
|
|
|
- reportSpecifierId,
|
|
|
|
|
- descriptions,
|
|
|
|
|
- lastReceivedRegister,
|
|
|
|
|
|
|
+ report_request_ids: [reportRequestId],
|
|
|
|
|
+ report_specifier_id: reportSpecifierId,
|
|
|
|
|
+ descriptions: descriptions.map(description => {
|
|
|
|
|
+ return {
|
|
|
|
|
+ report_id: description.reportId,
|
|
|
|
|
+ report_type: description.reportType,
|
|
|
|
|
+ reading_type: description.readingType,
|
|
|
|
|
+ sampling_rate: {
|
|
|
|
|
+ min_period: description.samplingRate.minPeriod,
|
|
|
|
|
+ max_period: description.samplingRate.maxPeriod,
|
|
|
|
|
+ on_change: description.samplingRate.onChange,
|
|
|
|
|
+ },
|
|
|
|
|
+ };
|
|
|
|
|
+ }),
|
|
|
|
|
+ last_received_register: new Date().toISOString(),
|
|
|
};
|
|
};
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
//TODO: whitelist based off Nantum API sensors
|
|
//TODO: whitelist based off Nantum API sensors
|
|
|
|
|
|
|
|
- await nantum.updateReport(clientCertificateFingerprint, {
|
|
|
|
|
- venReportMetadata,
|
|
|
|
|
|
|
+ await nantum.updateVen(ven._id, {
|
|
|
|
|
+ reports: venReportMetadata,
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
@@ -172,28 +209,30 @@ async function createdReports(
|
|
|
);
|
|
);
|
|
|
|
|
|
|
|
validateVenId(oadrCreatedReport.venId, clientCertificateFingerprint, false);
|
|
validateVenId(oadrCreatedReport.venId, clientCertificateFingerprint, false);
|
|
|
|
|
+ const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
|
|
+ if (!ven) {
|
|
|
|
|
+ throw new Error('VEN is not registered');
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
if (oadrCreatedReport.pendingReports) {
|
|
if (oadrCreatedReport.pendingReports) {
|
|
|
// flag reports as having been created
|
|
// flag reports as having been created
|
|
|
- const report = await nantum.fetchReport(clientCertificateFingerprint);
|
|
|
|
|
- if (report.venReportMetadata) {
|
|
|
|
|
|
|
+ if (ven.reports) {
|
|
|
for (const pendingReport of oadrCreatedReport.pendingReports) {
|
|
for (const pendingReport of oadrCreatedReport.pendingReports) {
|
|
|
const reportRequestId = pendingReport['reportRequestId'];
|
|
const reportRequestId = pendingReport['reportRequestId'];
|
|
|
- const match = report.venReportMetadata.filter(x =>
|
|
|
|
|
- x.reportRequestIds.includes(reportRequestId),
|
|
|
|
|
|
|
+ const match = ven.reports.filter(x =>
|
|
|
|
|
+ x.report_request_ids.includes(reportRequestId),
|
|
|
)[0];
|
|
)[0];
|
|
|
if (match) {
|
|
if (match) {
|
|
|
- match.lastReceivedCreated = new Date().toISOString();
|
|
|
|
|
|
|
+ match.last_received_created = new Date().toISOString();
|
|
|
} else {
|
|
} else {
|
|
|
- logger.info(
|
|
|
|
|
- 'could not match',
|
|
|
|
|
- reportRequestId,
|
|
|
|
|
- report.venReportMetadata,
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ logger.info('could not match', reportRequestId, ven.reports);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- await nantum.updateReport(clientCertificateFingerprint, report);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ await nantum.updateVen(ven._id, {
|
|
|
|
|
+ reports: ven.reports,
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
@@ -218,38 +257,44 @@ async function receiveReportData(
|
|
|
|
|
|
|
|
const requestVenId = oadrUpdateReport.venId;
|
|
const requestVenId = oadrUpdateReport.venId;
|
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
|
|
|
+ const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
|
|
+ if (!ven) {
|
|
|
|
|
+ throw new Error('VEN is not registered');
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- const report = await nantum.fetchReport(clientCertificateFingerprint);
|
|
|
|
|
- if (report.venReportMetadata) {
|
|
|
|
|
|
|
+ if (ven.reports) {
|
|
|
|
|
+ const readings = [];
|
|
|
for (const updateReport of oadrUpdateReport.reports) {
|
|
for (const updateReport of oadrUpdateReport.reports) {
|
|
|
const reportRequestId = updateReport.reportRequestId;
|
|
const reportRequestId = updateReport.reportRequestId;
|
|
|
- const match = report.venReportMetadata.filter(x =>
|
|
|
|
|
- x.reportRequestIds.includes(reportRequestId),
|
|
|
|
|
|
|
+ const match = ven.reports.filter(x =>
|
|
|
|
|
+ x.report_request_ids.includes(reportRequestId),
|
|
|
)[0];
|
|
)[0];
|
|
|
if (!match) {
|
|
if (!match) {
|
|
|
- logger.info(
|
|
|
|
|
- 'could not match',
|
|
|
|
|
- reportRequestId,
|
|
|
|
|
- report.venReportMetadata,
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ logger.info('could not match', reportRequestId, ven.reports);
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- match.lastReceivedUpdate = new Date().toISOString();
|
|
|
|
|
|
|
+ match.last_received_update = new Date().toISOString();
|
|
|
for (const interval of updateReport.intervals || []) {
|
|
for (const interval of updateReport.intervals || []) {
|
|
|
const reportId = interval.reportPayloads[0].reportId;
|
|
const reportId = interval.reportPayloads[0].reportId;
|
|
|
|
|
+ const reportDefinition = _.find(match.descriptions, {
|
|
|
|
|
+ report_id: reportId,
|
|
|
|
|
+ });
|
|
|
|
|
+ if (!reportDefinition) {
|
|
|
|
|
+ logger.info('Received data for unknown report ' + reportId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
const date = interval.startDate;
|
|
const date = interval.startDate;
|
|
|
|
|
|
|
|
if (interval.reportPayloads[0].payloadFloat) {
|
|
if (interval.reportPayloads[0].payloadFloat) {
|
|
|
const value = interval.reportPayloads[0].payloadFloat;
|
|
const value = interval.reportPayloads[0].payloadFloat;
|
|
|
- logger.info('received report', [
|
|
|
|
|
|
|
+ readings.push({
|
|
|
date,
|
|
date,
|
|
|
- clientCertificateFingerprint,
|
|
|
|
|
- updateReport.reportSpecifierId,
|
|
|
|
|
- updateReport.reportName,
|
|
|
|
|
- reportRequestId,
|
|
|
|
|
- reportId,
|
|
|
|
|
|
|
+ report_specifier_id: updateReport.reportSpecifierId,
|
|
|
|
|
+ report_name: updateReport.reportName,
|
|
|
|
|
+ report_id: reportId,
|
|
|
|
|
+ report_type: reportDefinition.report_type,
|
|
|
value,
|
|
value,
|
|
|
- ]);
|
|
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
if (
|
|
if (
|
|
|
interval.reportPayloads[0].payloadStatus &&
|
|
interval.reportPayloads[0].payloadStatus &&
|
|
@@ -261,24 +306,28 @@ async function receiveReportData(
|
|
|
const typeObj = loadControlState[type];
|
|
const typeObj = loadControlState[type];
|
|
|
Object.keys(typeObj).forEach(subType => {
|
|
Object.keys(typeObj).forEach(subType => {
|
|
|
const value = typeObj[subType];
|
|
const value = typeObj[subType];
|
|
|
- logger.info('received report', [
|
|
|
|
|
|
|
+ readings.push({
|
|
|
date,
|
|
date,
|
|
|
- clientCertificateFingerprint,
|
|
|
|
|
- updateReport.reportSpecifierId,
|
|
|
|
|
- updateReport.reportName,
|
|
|
|
|
- reportRequestId,
|
|
|
|
|
- reportId,
|
|
|
|
|
|
|
+ report_specifier_id: updateReport.reportSpecifierId,
|
|
|
|
|
+ report_name: updateReport.reportName,
|
|
|
|
|
+ report_id: reportId,
|
|
|
|
|
+ report_type: reportDefinition.report_type,
|
|
|
type,
|
|
type,
|
|
|
- subType,
|
|
|
|
|
|
|
+ sub_type: subType,
|
|
|
value,
|
|
value,
|
|
|
- ]);
|
|
|
|
|
|
|
+ });
|
|
|
});
|
|
});
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ if (readings.length > 0) {
|
|
|
|
|
+ await nantum.sendReportReadings(ven, readings);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- await nantum.updateReport(clientCertificateFingerprint, report);
|
|
|
|
|
|
|
+ await nantum.updateVen(ven._id, {
|
|
|
|
|
+ reports: ven.reports,
|
|
|
|
|
+ });
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
|
_type: 'oadrUpdatedReport',
|
|
_type: 'oadrUpdatedReport',
|