diff --git a/NucuCar.Domain/Telemetry/TelemetryPublisherAzure.cs b/NucuCar.Domain/Telemetry/TelemetryPublisherAzure.cs index 7316d1f..077d522 100644 --- a/NucuCar.Domain/Telemetry/TelemetryPublisherAzure.cs +++ b/NucuCar.Domain/Telemetry/TelemetryPublisherAzure.cs @@ -74,40 +74,67 @@ namespace NucuCar.Domain.Telemetry public override async Task PublishAsync(CancellationToken cancellationToken) { + var data = GetTelemetry(); + + var messageString = JsonConvert.SerializeObject(data); + Logger?.LogDebug($"Telemetry message: {messageString}"); + var message = new Message(Encoding.ASCII.GetBytes(messageString)); + + await PublishToCloudAsync(message, cancellationToken); + } + + private Dictionary GetTelemetry() + { + var data = new List>(); foreach (var telemeter in RegisteredTelemeters) { - var data = telemeter.GetTelemetryData(); - if (data == null) + var telemetryData = telemeter.GetTelemetryData(); + if (telemetryData == null) { Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!"); continue; } - var metadata = new Dictionary - { - ["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure), - ["id"] = telemeter.GetIdentifier(), - ["timestamp"] = DateTime.Now, - ["data"] = data, - }; - - await PublishViaMqtt(metadata, cancellationToken); + telemetryData["_id"] = telemeter.GetIdentifier(); + data.Add(telemetryData); } + + var metadata = new Dictionary + { + ["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure), + ["timestamp"] = DateTime.Now, + ["data"] = data.ToArray() + }; + return metadata; } - private async Task PublishViaMqtt(Dictionary data, CancellationToken cancellationToken) + private async Task PublishToCloudAsync(Message message, CancellationToken cancellationToken, int maxRetries = 3) { - if (cancellationToken.IsCancellationRequested) + var retry = 0; + while (retry < maxRetries) { - Logger?.LogInformation("Stopping the AzureTelemetryPublisher, cancellation requested."); - await DeviceClient.CloseAsync(cancellationToken); - return; - } + if (cancellationToken.IsCancellationRequested) + { + Logger?.LogInformation("Publishing telemetry cancelled!"); + break; + } - var messageString = JsonConvert.SerializeObject(data); - var message = new Message(Encoding.ASCII.GetBytes(messageString)); - Logger?.LogDebug($"Telemetry message: {messageString}"); - await DeviceClient.SendEventAsync(message, cancellationToken); + try + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(5000); + cts.Token.ThrowIfCancellationRequested(); + + /* Should throw OperationCanceledException on timeout or cancel. */ + await DeviceClient.SendEventAsync(message, cts.Token); + break; + } + catch (OperationCanceledException e) + { + retry += 1; + Logger?.LogWarning($"Telemetry not sent! Retry {retry}."); + } + } } public override void Dispose()