|
|
@@ -36,107 +36,110 @@ async function pollForReports(
|
|
|
clientCertificateFingerprint,
|
|
|
);
|
|
|
|
|
|
- const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
+ const ven = await nantum.getVenRegistration(clientCertificateFingerprint);
|
|
|
if (!ven) {
|
|
|
// not an error, we shouldn't fail polling because the VEN hasn't registered yet
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- const createRequests = [];
|
|
|
-
|
|
|
- if (ven.reports) {
|
|
|
- for (const reportMetadata of ven.reports) {
|
|
|
- let sendCreate = false;
|
|
|
-
|
|
|
- if (!reportMetadata.last_sent_create) {
|
|
|
- // if we've never sent a subscription request, do it
|
|
|
- logger.info(
|
|
|
- 'sending create because we never have',
|
|
|
- reportMetadata.report_specifier_id,
|
|
|
- );
|
|
|
- sendCreate = true;
|
|
|
- } else {
|
|
|
- // have sent a create > 5s ago, not received a created
|
|
|
- if (
|
|
|
- !reportMetadata.last_received_created &&
|
|
|
- getSecondsSince('last_sent_create', reportMetadata) > 5
|
|
|
- ) {
|
|
|
- logger.info(
|
|
|
- 'no reply to creation request, send another',
|
|
|
- reportMetadata.report_specifier_id,
|
|
|
- );
|
|
|
- sendCreate = true;
|
|
|
- }
|
|
|
- }
|
|
|
+ const venReports = await nantum.getVenReports(ven._id);
|
|
|
+ if (!venReports) {
|
|
|
+ // not an error if no reports have been defined
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (
|
|
|
- getSecondsSince('last_received_update', reportMetadata) >
|
|
|
- reportSubscriptionParameters.resubscribeAfterNoDataForSeconds
|
|
|
- ) {
|
|
|
- // previously received data, silent now
|
|
|
- sendCreate = true;
|
|
|
- }
|
|
|
+ const createRequests = [];
|
|
|
|
|
|
+ for (const reportMetadata of venReports.reports) {
|
|
|
+ let sendCreate = false;
|
|
|
+
|
|
|
+ if (!reportMetadata.last_sent_create) {
|
|
|
+ // if we've never sent a subscription request, do it
|
|
|
+ logger.info(
|
|
|
+ 'sending create because we never have',
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
+ );
|
|
|
+ sendCreate = true;
|
|
|
+ } else {
|
|
|
+ // have sent a create > 5s ago, not received a created
|
|
|
if (
|
|
|
- !reportMetadata.last_received_update &&
|
|
|
- getSecondsSince('last_received_created', reportMetadata) >
|
|
|
- reportSubscriptionParameters.reportBackSeconds + 5
|
|
|
+ !reportMetadata.last_received_created &&
|
|
|
+ getSecondsSince('last_sent_create', reportMetadata) > 5
|
|
|
) {
|
|
|
- // if we haven't received any data but we've waited long enough for one data interval + 5 seconds
|
|
|
logger.info(
|
|
|
- 'sending create because have not received data',
|
|
|
+ 'no reply to creation request, send another',
|
|
|
reportMetadata.report_specifier_id,
|
|
|
);
|
|
|
sendCreate = true;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (
|
|
|
- getSecondsSince('last_received_created', reportMetadata) >
|
|
|
- reportSubscriptionParameters.resubscribeDurationSeconds
|
|
|
- ) {
|
|
|
- // when we're close to the end of the subscription, trigger a resubscribe
|
|
|
- logger.info(
|
|
|
- 'sending create because close to end of subscription',
|
|
|
- reportMetadata.report_specifier_id,
|
|
|
- );
|
|
|
- sendCreate = true;
|
|
|
- }
|
|
|
+ if (
|
|
|
+ getSecondsSince('last_received_update', reportMetadata) >
|
|
|
+ reportSubscriptionParameters.resubscribeAfterNoDataForSeconds
|
|
|
+ ) {
|
|
|
+ // previously received data, silent now
|
|
|
+ sendCreate = true;
|
|
|
+ }
|
|
|
|
|
|
- if (sendCreate) {
|
|
|
- const newReportRequestId = v4();
|
|
|
- // track the last 10 registration ids
|
|
|
- reportMetadata.report_request_ids = [
|
|
|
- newReportRequestId,
|
|
|
- ...reportMetadata.report_request_ids,
|
|
|
- ].slice(0, 10);
|
|
|
- createRequests.push({
|
|
|
- reportRequestId: newReportRequestId,
|
|
|
- reportSpecifierId: reportMetadata.report_specifier_id,
|
|
|
- granularityDuration: `PT${reportSubscriptionParameters.dataGranularitySeconds}S`,
|
|
|
- reportBackDuration: `PT${reportSubscriptionParameters.reportBackSeconds}S`,
|
|
|
- startDate: new Date().toISOString(),
|
|
|
- duration: `PT${reportSubscriptionParameters.subscriptionDurationSeconds}S`,
|
|
|
- specifiers: reportMetadata.descriptions.map(description => ({
|
|
|
- reportId: description.report_id,
|
|
|
- readingType: 'x-notApplicable',
|
|
|
- })),
|
|
|
- });
|
|
|
- reportMetadata.last_sent_create = new Date().toISOString();
|
|
|
- }
|
|
|
+ if (
|
|
|
+ !reportMetadata.last_received_update &&
|
|
|
+ getSecondsSince('last_received_created', reportMetadata) >
|
|
|
+ reportSubscriptionParameters.reportBackSeconds + 5
|
|
|
+ ) {
|
|
|
+ // if we haven't received any data but we've waited long enough for one data interval + 5 seconds
|
|
|
+ logger.info(
|
|
|
+ 'sending create because have not received data',
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
+ );
|
|
|
+ sendCreate = true;
|
|
|
}
|
|
|
- if (createRequests.length > 0) {
|
|
|
- const createReport = {
|
|
|
- _type: 'oadrCreateReport',
|
|
|
- requestId: v4(),
|
|
|
- requests: createRequests,
|
|
|
- };
|
|
|
-
|
|
|
- await nantum.updateVen(ven._id, {
|
|
|
- reports: ven.reports,
|
|
|
+
|
|
|
+ if (
|
|
|
+ getSecondsSince('last_received_created', reportMetadata) >
|
|
|
+ reportSubscriptionParameters.resubscribeDurationSeconds
|
|
|
+ ) {
|
|
|
+ // when we're close to the end of the subscription, trigger a resubscribe
|
|
|
+ logger.info(
|
|
|
+ 'sending create because close to end of subscription',
|
|
|
+ reportMetadata.report_specifier_id,
|
|
|
+ );
|
|
|
+ sendCreate = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (sendCreate) {
|
|
|
+ const newReportRequestId = v4();
|
|
|
+ // track the last 10 registration ids
|
|
|
+ reportMetadata.report_request_ids = [
|
|
|
+ newReportRequestId,
|
|
|
+ ...reportMetadata.report_request_ids,
|
|
|
+ ].slice(0, 10);
|
|
|
+ createRequests.push({
|
|
|
+ reportRequestId: newReportRequestId,
|
|
|
+ reportSpecifierId: reportMetadata.report_specifier_id,
|
|
|
+ granularityDuration: `PT${reportSubscriptionParameters.dataGranularitySeconds}S`,
|
|
|
+ reportBackDuration: `PT${reportSubscriptionParameters.reportBackSeconds}S`,
|
|
|
+ startDate: new Date().toISOString(),
|
|
|
+ duration: `PT${reportSubscriptionParameters.subscriptionDurationSeconds}S`,
|
|
|
+ specifiers: reportMetadata.descriptions.map(description => ({
|
|
|
+ reportId: description.report_id,
|
|
|
+ readingType: 'x-notApplicable',
|
|
|
+ })),
|
|
|
});
|
|
|
- return createReport;
|
|
|
+ reportMetadata.last_sent_create = new Date().toISOString();
|
|
|
}
|
|
|
}
|
|
|
+ if (createRequests.length > 0) {
|
|
|
+ const createReport = {
|
|
|
+ _type: 'oadrCreateReport',
|
|
|
+ requestId: v4(),
|
|
|
+ requests: createRequests,
|
|
|
+ };
|
|
|
+
|
|
|
+ await nantum.updateVenReports(venReports._id, venReports.reports);
|
|
|
+
|
|
|
+ return createReport;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
async function registerReports(
|
|
|
@@ -153,11 +156,13 @@ async function registerReports(
|
|
|
|
|
|
const requestVenId = oadrRegisterReport.venId;
|
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
|
- const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
+ const ven = await nantum.getVenRegistration(clientCertificateFingerprint);
|
|
|
if (!ven) {
|
|
|
throw new Error('VEN is not registered');
|
|
|
}
|
|
|
|
|
|
+ const venReports = await nantum.getVenReports(ven._id);
|
|
|
+
|
|
|
const venReportMetadata = (oadrRegisterReport.reports || []).map(report => {
|
|
|
const { reportSpecifierId, descriptions } = report;
|
|
|
|
|
|
@@ -181,11 +186,17 @@ async function registerReports(
|
|
|
};
|
|
|
});
|
|
|
|
|
|
- //TODO: whitelist based off Nantum API sensors
|
|
|
+ if (venReports) {
|
|
|
+ //TODO: can we do an upsert here?
|
|
|
+ await nantum.updateVenReports(venReports._id, venReportMetadata);
|
|
|
+ } else {
|
|
|
+ await nantum.createVenReports({
|
|
|
+ oadr_ven_registration_id: ven._id,
|
|
|
+ reports: venReportMetadata
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
- await nantum.updateVen(ven._id, {
|
|
|
- reports: venReportMetadata,
|
|
|
- });
|
|
|
|
|
|
return {
|
|
|
_type: 'oadrRegisteredReport',
|
|
|
@@ -209,30 +220,31 @@ async function createdReports(
|
|
|
);
|
|
|
|
|
|
validateVenId(oadrCreatedReport.venId, clientCertificateFingerprint, false);
|
|
|
- const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
+ const ven = await nantum.getVenRegistration(clientCertificateFingerprint);
|
|
|
if (!ven) {
|
|
|
throw new Error('VEN is not registered');
|
|
|
}
|
|
|
|
|
|
+ const venReports = await nantum.getVenReports(ven._id);
|
|
|
+ if (!venReports) {
|
|
|
+ throw new Error('VEN does not have registered reports');
|
|
|
+ }
|
|
|
+
|
|
|
if (oadrCreatedReport.pendingReports) {
|
|
|
// flag reports as having been created
|
|
|
- if (ven.reports) {
|
|
|
- for (const pendingReport of oadrCreatedReport.pendingReports) {
|
|
|
- const reportRequestId = pendingReport['reportRequestId'];
|
|
|
- const match = ven.reports.filter(x =>
|
|
|
- x.report_request_ids.includes(reportRequestId),
|
|
|
- )[0];
|
|
|
- if (match) {
|
|
|
- match.last_received_created = new Date().toISOString();
|
|
|
- } else {
|
|
|
- logger.info('could not match', reportRequestId, ven.reports);
|
|
|
- }
|
|
|
+ for (const pendingReport of oadrCreatedReport.pendingReports) {
|
|
|
+ const reportRequestId = pendingReport['reportRequestId'];
|
|
|
+ const match = venReports.reports.filter(x =>
|
|
|
+ x.report_request_ids.includes(reportRequestId),
|
|
|
+ )[0];
|
|
|
+ if (match) {
|
|
|
+ match.last_received_created = new Date().toISOString();
|
|
|
+ } else {
|
|
|
+ logger.info('could not match', reportRequestId, venReports.reports);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- await nantum.updateVen(ven._id, {
|
|
|
- reports: ven.reports,
|
|
|
- });
|
|
|
+ await nantum.updateVenReports(venReports._id, venReports.reports);
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
@@ -257,77 +269,79 @@ async function receiveReportData(
|
|
|
|
|
|
const requestVenId = oadrUpdateReport.venId;
|
|
|
validateVenId(requestVenId, clientCertificateFingerprint, false);
|
|
|
- const ven = await nantum.getVen(clientCertificateFingerprint);
|
|
|
+ const ven = await nantum.getVenRegistration(clientCertificateFingerprint);
|
|
|
if (!ven) {
|
|
|
throw new Error('VEN is not registered');
|
|
|
}
|
|
|
|
|
|
- if (ven.reports) {
|
|
|
- const readings = [];
|
|
|
- for (const updateReport of oadrUpdateReport.reports) {
|
|
|
- const reportRequestId = updateReport.reportRequestId;
|
|
|
- const match = ven.reports.filter(x =>
|
|
|
- x.report_request_ids.includes(reportRequestId),
|
|
|
- )[0];
|
|
|
- if (!match) {
|
|
|
- logger.info('could not match', reportRequestId, ven.reports);
|
|
|
- continue;
|
|
|
+ const venReports = await nantum.getVenReports(ven._id);
|
|
|
+ if (!venReports) {
|
|
|
+ throw new Error('VEN does not have registered reports');
|
|
|
+ }
|
|
|
+
|
|
|
+ const readings = [];
|
|
|
+ for (const updateReport of oadrUpdateReport.reports) {
|
|
|
+ const reportRequestId = updateReport.reportRequestId;
|
|
|
+ const match = venReports.reports.filter(x =>
|
|
|
+ x.report_request_ids.includes(reportRequestId),
|
|
|
+ )[0];
|
|
|
+ if (!match) {
|
|
|
+ logger.info('could not match', reportRequestId, venReports.reports);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ match.last_received_update = new Date().toISOString();
|
|
|
+ for (const interval of updateReport.intervals || []) {
|
|
|
+ const reportId = interval.reportPayloads[0].reportId;
|
|
|
+ const reportDefinition = _.find(match.descriptions, {
|
|
|
+ report_id: reportId,
|
|
|
+ });
|
|
|
+ if (!reportDefinition) {
|
|
|
+ logger.info('Received data for unknown report ' + reportId);
|
|
|
+ return;
|
|
|
}
|
|
|
- match.last_received_update = new Date().toISOString();
|
|
|
- for (const interval of updateReport.intervals || []) {
|
|
|
- const reportId = interval.reportPayloads[0].reportId;
|
|
|
- const reportDefinition = _.find(match.descriptions, {
|
|
|
+ const date = interval.startDate;
|
|
|
+
|
|
|
+ if (interval.reportPayloads[0].payloadFloat) {
|
|
|
+ const value = interval.reportPayloads[0].payloadFloat;
|
|
|
+ readings.push({
|
|
|
+ date,
|
|
|
+ report_specifier_id: updateReport.reportSpecifierId,
|
|
|
+ report_name: updateReport.reportName,
|
|
|
report_id: reportId,
|
|
|
+ report_type: reportDefinition.report_type,
|
|
|
+ value,
|
|
|
});
|
|
|
- if (!reportDefinition) {
|
|
|
- logger.info('Received data for unknown report ' + reportId);
|
|
|
- return;
|
|
|
- }
|
|
|
- const date = interval.startDate;
|
|
|
-
|
|
|
- if (interval.reportPayloads[0].payloadFloat) {
|
|
|
- const value = interval.reportPayloads[0].payloadFloat;
|
|
|
- readings.push({
|
|
|
- date,
|
|
|
- report_specifier_id: updateReport.reportSpecifierId,
|
|
|
- report_name: updateReport.reportName,
|
|
|
- report_id: reportId,
|
|
|
- report_type: reportDefinition.report_type,
|
|
|
- value,
|
|
|
- });
|
|
|
- }
|
|
|
- if (
|
|
|
- interval.reportPayloads[0].payloadStatus &&
|
|
|
- interval.reportPayloads[0].payloadStatus.loadControlState
|
|
|
- ) {
|
|
|
- const loadControlState =
|
|
|
- interval.reportPayloads[0].payloadStatus.loadControlState;
|
|
|
- Object.keys(loadControlState).forEach(type => {
|
|
|
- const typeObj = loadControlState[type];
|
|
|
- Object.keys(typeObj).forEach(subType => {
|
|
|
- const value = typeObj[subType];
|
|
|
- readings.push({
|
|
|
- date,
|
|
|
- report_specifier_id: updateReport.reportSpecifierId,
|
|
|
- report_name: updateReport.reportName,
|
|
|
- report_id: reportId,
|
|
|
- report_type: reportDefinition.report_type,
|
|
|
- type,
|
|
|
- sub_type: subType,
|
|
|
- value,
|
|
|
- });
|
|
|
+ }
|
|
|
+ if (
|
|
|
+ interval.reportPayloads[0].payloadStatus &&
|
|
|
+ interval.reportPayloads[0].payloadStatus.loadControlState
|
|
|
+ ) {
|
|
|
+ const loadControlState =
|
|
|
+ interval.reportPayloads[0].payloadStatus.loadControlState;
|
|
|
+ Object.keys(loadControlState).forEach(type => {
|
|
|
+ const typeObj = loadControlState[type];
|
|
|
+ Object.keys(typeObj).forEach(subType => {
|
|
|
+ const value = typeObj[subType];
|
|
|
+ readings.push({
|
|
|
+ date,
|
|
|
+ report_specifier_id: updateReport.reportSpecifierId,
|
|
|
+ report_name: updateReport.reportName,
|
|
|
+ report_id: reportId,
|
|
|
+ report_type: reportDefinition.report_type,
|
|
|
+ type,
|
|
|
+ sub_type: subType,
|
|
|
+ value,
|
|
|
});
|
|
|
});
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
- if (readings.length > 0) {
|
|
|
- await nantum.sendReportReadings(ven, readings);
|
|
|
- }
|
|
|
}
|
|
|
- await nantum.updateVen(ven._id, {
|
|
|
- reports: ven.reports,
|
|
|
- });
|
|
|
+ if (readings.length > 0) {
|
|
|
+ await nantum.sendReportReadings(ven, readings);
|
|
|
+ }
|
|
|
+
|
|
|
+ await nantum.updateVenReports(venReports._id, venReports.reports);
|
|
|
|
|
|
return {
|
|
|
_type: 'oadrUpdatedReport',
|