Publish telemetry in batch with retrying

This commit is contained in:
Denis-Cosmin Nutiu 2019-12-05 22:40:24 +02:00
parent 88e62fa486
commit 1c522d5a61

View file

@ -74,40 +74,67 @@ namespace NucuCar.Domain.Telemetry
public override async Task PublishAsync(CancellationToken cancellationToken) public override async Task PublishAsync(CancellationToken cancellationToken)
{ {
var data = GetTelemetry();
var messageString = JsonConvert.SerializeObject(data);
Logger?.LogDebug($"Telemetry message: {messageString}");
var message = new Message(Encoding.ASCII.GetBytes(messageString));
await PublishToCloudAsync(message, cancellationToken);
}
private Dictionary<string, object> GetTelemetry()
{
var data = new List<Dictionary<string, object>>();
foreach (var telemeter in RegisteredTelemeters) foreach (var telemeter in RegisteredTelemeters)
{ {
var data = telemeter.GetTelemetryData(); var telemetryData = telemeter.GetTelemetryData();
if (data == null) if (telemetryData == null)
{ {
Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!"); Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!");
continue; continue;
} }
var metadata = new Dictionary<string, object> telemetryData["_id"] = telemeter.GetIdentifier();
{ data.Add(telemetryData);
["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure),
["id"] = telemeter.GetIdentifier(),
["timestamp"] = DateTime.Now,
["data"] = data,
};
await PublishViaMqtt(metadata, cancellationToken);
} }
var metadata = new Dictionary<string, object>
{
["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure),
["timestamp"] = DateTime.Now,
["data"] = data.ToArray()
};
return metadata;
} }
private async Task PublishViaMqtt(Dictionary<string, object> data, CancellationToken cancellationToken) private async Task PublishToCloudAsync(Message message, CancellationToken cancellationToken, int maxRetries = 3)
{ {
if (cancellationToken.IsCancellationRequested) var retry = 0;
while (retry < maxRetries)
{ {
Logger?.LogInformation("Stopping the AzureTelemetryPublisher, cancellation requested."); if (cancellationToken.IsCancellationRequested)
await DeviceClient.CloseAsync(cancellationToken); {
return; Logger?.LogInformation("Publishing telemetry cancelled!");
} break;
}
var messageString = JsonConvert.SerializeObject(data); try
var message = new Message(Encoding.ASCII.GetBytes(messageString)); {
Logger?.LogDebug($"Telemetry message: {messageString}"); var cts = new CancellationTokenSource();
await DeviceClient.SendEventAsync(message, cancellationToken); cts.CancelAfter(5000);
cts.Token.ThrowIfCancellationRequested();
/* Should throw OperationCanceledException on timeout or cancel. */
await DeviceClient.SendEventAsync(message, cts.Token);
break;
}
catch (OperationCanceledException e)
{
retry += 1;
Logger?.LogWarning($"Telemetry not sent! Retry {retry}.");
}
}
} }
public override void Dispose() public override void Dispose()