2019-11-17 16:27:58 +00:00
|
|
|
using System;
|
|
|
|
using System.Collections.Generic;
|
|
|
|
using System.Text;
|
|
|
|
using System.Threading;
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
using Microsoft.Azure.Devices.Client;
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
using Newtonsoft.Json;
|
|
|
|
|
|
|
|
namespace NucuCar.Domain.Telemetry
|
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
public class TelemetryPublisherAzure : TelemetryPublisher
|
2019-11-17 16:27:58 +00:00
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
protected readonly DeviceClient DeviceClient;
|
|
|
|
|
|
|
|
public TelemetryPublisherAzure(TelemetryPublisherBuilderOptions opts) : base(opts)
|
2019-11-17 16:27:58 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2019-11-23 14:09:44 +00:00
|
|
|
DeviceClient = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
|
2019-11-17 16:27:58 +00:00
|
|
|
}
|
|
|
|
catch (FormatException)
|
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
Logger?.LogCritical("Can't start telemetry service! Malformed connection string!");
|
2019-11-17 16:27:58 +00:00
|
|
|
throw;
|
|
|
|
}
|
2019-11-23 18:53:04 +00:00
|
|
|
|
|
|
|
Logger?.LogInformation("Started the AzureTelemetryPublisher!");
|
|
|
|
}
|
|
|
|
|
|
|
|
public static TelemetryPublisher CreateFromConnectionString(string connectionString)
|
|
|
|
{
|
2019-11-23 20:18:25 +00:00
|
|
|
Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString);
|
2019-11-23 18:53:04 +00:00
|
|
|
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
|
|
|
|
{ConnectionString = connectionString, TelemetrySource = "TelemetryPublisherAzure"});
|
|
|
|
}
|
|
|
|
|
|
|
|
public static TelemetryPublisher CreateFromConnectionString(string connectionString,
|
|
|
|
string telemetrySource)
|
|
|
|
{
|
2019-11-23 20:18:25 +00:00
|
|
|
Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString);
|
|
|
|
Guard.ArgumentNotNullOrWhiteSpace(nameof(telemetrySource), telemetrySource);
|
2019-11-23 18:53:04 +00:00
|
|
|
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
|
|
|
|
{ConnectionString = connectionString, TelemetrySource = telemetrySource});
|
|
|
|
}
|
|
|
|
|
|
|
|
public static TelemetryPublisher CreateFromConnectionString(string connectionString,
|
|
|
|
string telemetrySource, ILogger logger)
|
|
|
|
{
|
2019-11-23 20:18:25 +00:00
|
|
|
Guard.ArgumentNotNullOrWhiteSpace(nameof(connectionString), connectionString);
|
|
|
|
Guard.ArgumentNotNullOrWhiteSpace(nameof(telemetrySource), telemetrySource);
|
|
|
|
Guard.ArgumentNotNull(nameof(logger), logger);
|
2019-11-23 18:53:04 +00:00
|
|
|
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
|
|
|
|
{ConnectionString = connectionString, TelemetrySource = telemetrySource, Logger = logger});
|
2019-11-17 16:27:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public override async Task PublishAsync(CancellationToken cancellationToken)
|
|
|
|
{
|
|
|
|
foreach (var telemeter in RegisteredTelemeters)
|
|
|
|
{
|
|
|
|
var data = telemeter.GetTelemetryData();
|
|
|
|
if (data == null)
|
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!");
|
2019-11-17 16:27:58 +00:00
|
|
|
continue;
|
|
|
|
}
|
2019-11-23 18:53:04 +00:00
|
|
|
|
2019-11-23 14:09:44 +00:00
|
|
|
var metadata = new Dictionary<string, object>
|
|
|
|
{
|
|
|
|
["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure),
|
|
|
|
["id"] = telemeter.GetIdentifier(),
|
|
|
|
["timestamp"] = DateTime.Now,
|
|
|
|
["data"] = data,
|
|
|
|
};
|
2019-11-23 18:53:04 +00:00
|
|
|
|
2019-11-23 14:09:44 +00:00
|
|
|
await PublishViaMqtt(metadata, cancellationToken);
|
2019-11-17 16:27:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private async Task PublishViaMqtt(Dictionary<string, object> data, CancellationToken cancellationToken)
|
|
|
|
{
|
|
|
|
if (cancellationToken.IsCancellationRequested)
|
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
Logger?.LogInformation("Stopping the AzureTelemetryPublisher, cancellation requested.");
|
2019-11-17 16:27:58 +00:00
|
|
|
await DeviceClient.CloseAsync(cancellationToken);
|
|
|
|
return;
|
|
|
|
}
|
2019-11-23 18:53:04 +00:00
|
|
|
|
2019-11-17 16:27:58 +00:00
|
|
|
var messageString = JsonConvert.SerializeObject(data);
|
|
|
|
var message = new Message(Encoding.ASCII.GetBytes(messageString));
|
2019-11-23 18:53:04 +00:00
|
|
|
Logger?.LogDebug($"Telemetry message: {message}");
|
2019-11-17 16:27:58 +00:00
|
|
|
await DeviceClient.SendEventAsync(message, cancellationToken);
|
|
|
|
}
|
2019-11-23 18:53:04 +00:00
|
|
|
|
|
|
|
public override void Dispose()
|
2019-11-17 16:27:58 +00:00
|
|
|
{
|
2019-11-23 18:53:04 +00:00
|
|
|
DeviceClient?.CloseAsync().GetAwaiter().GetResult();
|
2019-11-17 16:27:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|