Refactor ITelemetryPublisher to remove Start(), StartAsync(), Publish()

This commit is contained in:
Denis-Cosmin Nutiu 2019-11-23 20:53:04 +02:00
parent 2d4d3c347d
commit 70f00ca39d
11 changed files with 127 additions and 77 deletions

View file

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework> <TargetFramework>netcoreapp3.0</TargetFramework>

View file

@ -7,11 +7,8 @@ namespace NucuCar.Domain.Telemetry
{ {
public interface ITelemetryPublisher public interface ITelemetryPublisher
{ {
void Start();
Task StartAsync();
bool RegisterTelemeter(ITelemeter t); bool RegisterTelemeter(ITelemeter t);
bool UnRegisterTelemeter(ITelemeter t); bool UnRegisterTelemeter(ITelemeter t);
Task PublishAsync(CancellationToken cancellationToken); Task PublishAsync(CancellationToken cancellationToken);
bool Publish(int timeout);
} }
} }

View file

@ -6,21 +6,22 @@ using Microsoft.Extensions.Logging;
namespace NucuCar.Domain.Telemetry namespace NucuCar.Domain.Telemetry
{ {
public abstract class TelemetryPublisher : ITelemetryPublisher public abstract class TelemetryPublisher : ITelemetryPublisher, IDisposable
{ {
protected string ConnectionString { get; set; }
protected string TelemetrySource { get; set; }
protected readonly List<ITelemeter> RegisteredTelemeters; protected readonly List<ITelemeter> RegisteredTelemeters;
// ReSharper disable once UnassignedField.Global // ReSharper disable once UnassignedField.Global
public ILogger Logger; protected readonly ILogger Logger;
protected TelemetryPublisher() protected TelemetryPublisher(TelemetryPublisherBuilderOptions opts)
{ {
ConnectionString = opts.ConnectionString;
TelemetrySource = opts.TelemetrySource;
Logger = opts.Logger;
RegisteredTelemeters = new List<ITelemeter>(5); RegisteredTelemeters = new List<ITelemeter>(5);
} }
public abstract void Start();
public abstract Task StartAsync();
public abstract bool Publish(int timeout);
public abstract Task PublishAsync(CancellationToken cancellationToken); public abstract Task PublishAsync(CancellationToken cancellationToken);
public bool RegisterTelemeter(ITelemeter t) public bool RegisterTelemeter(ITelemeter t)
{ {
@ -36,5 +37,7 @@ namespace NucuCar.Domain.Telemetry
RegisteredTelemeters.Remove(t); RegisteredTelemeters.Remove(t);
return true; return true;
} }
public abstract void Dispose();
} }
} }

View file

@ -9,14 +9,11 @@ using Newtonsoft.Json;
namespace NucuCar.Domain.Telemetry namespace NucuCar.Domain.Telemetry
{ {
public class TelemetryPublisherAzure : TelemetryPublisher, IDisposable public class TelemetryPublisherAzure : TelemetryPublisher
{ {
// Needs to be configured via the Configure method or setup directly. protected readonly DeviceClient DeviceClient;
public string ConnectionString { get; set; }
public string TelemetrySource { private get; set; } public TelemetryPublisherAzure(TelemetryPublisherBuilderOptions opts) : base(opts)
protected DeviceClient DeviceClient;
public override void Start()
{ {
try try
{ {
@ -24,10 +21,31 @@ namespace NucuCar.Domain.Telemetry
} }
catch (FormatException) catch (FormatException)
{ {
Logger.LogCritical("Can't start telemetry service! Malformed connection string!"); Logger?.LogCritical("Can't start telemetry service! Malformed connection string!");
throw; throw;
} }
Logger.LogInformation("Started the AzureTelemetryPublisher!");
Logger?.LogInformation("Started the AzureTelemetryPublisher!");
}
public static TelemetryPublisher CreateFromConnectionString(string connectionString)
{
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
{ConnectionString = connectionString, TelemetrySource = "TelemetryPublisherAzure"});
}
public static TelemetryPublisher CreateFromConnectionString(string connectionString,
string telemetrySource)
{
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
{ConnectionString = connectionString, TelemetrySource = telemetrySource});
}
public static TelemetryPublisher CreateFromConnectionString(string connectionString,
string telemetrySource, ILogger logger)
{
return new TelemetryPublisherAzure(new TelemetryPublisherBuilderOptions()
{ConnectionString = connectionString, TelemetrySource = telemetrySource, Logger = logger});
} }
public override async Task PublishAsync(CancellationToken cancellationToken) public override async Task PublishAsync(CancellationToken cancellationToken)
@ -37,9 +55,10 @@ namespace NucuCar.Domain.Telemetry
var data = telemeter.GetTelemetryData(); var data = telemeter.GetTelemetryData();
if (data == null) if (data == null)
{ {
Logger.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!"); Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!");
continue; continue;
} }
var metadata = new Dictionary<string, object> var metadata = new Dictionary<string, object>
{ {
["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure), ["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure),
@ -47,7 +66,7 @@ namespace NucuCar.Domain.Telemetry
["timestamp"] = DateTime.Now, ["timestamp"] = DateTime.Now,
["data"] = data, ["data"] = data,
}; };
await PublishViaMqtt(metadata, cancellationToken); await PublishViaMqtt(metadata, cancellationToken);
} }
} }
@ -56,31 +75,20 @@ namespace NucuCar.Domain.Telemetry
{ {
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested)
{ {
Logger.LogInformation("Stopping the AzureTelemetryPublisher, cancellation requested."); Logger?.LogInformation("Stopping the AzureTelemetryPublisher, cancellation requested.");
await DeviceClient.CloseAsync(cancellationToken); await DeviceClient.CloseAsync(cancellationToken);
return; return;
} }
var messageString = JsonConvert.SerializeObject(data); var messageString = JsonConvert.SerializeObject(data);
var message = new Message(Encoding.ASCII.GetBytes(messageString)); var message = new Message(Encoding.ASCII.GetBytes(messageString));
Logger.LogDebug($"Telemetry message: {message}"); Logger?.LogDebug($"Telemetry message: {message}");
await DeviceClient.SendEventAsync(message, cancellationToken); await DeviceClient.SendEventAsync(message, cancellationToken);
} }
public void Dispose() public override void Dispose()
{ {
DeviceClient.CloseAsync().GetAwaiter().GetResult(); DeviceClient?.CloseAsync().GetAwaiter().GetResult();
} }
public override bool Publish(int timeout)
{
throw new NotImplementedException();
}
#pragma warning disable 1998
public override async Task StartAsync()
{
throw new NotImplementedException();
}
#pragma warning restore 1998
} }
} }

View file

@ -0,0 +1,11 @@
using Microsoft.Extensions.Logging;
namespace NucuCar.Domain.Telemetry
{
public class TelemetryPublisherBuilderOptions
{
public string ConnectionString { get; set; }
public string TelemetrySource { get; set; }
public ILogger Logger { get; set; }
}
}

View file

@ -19,10 +19,9 @@ namespace NucuCar.Sensors.EnvironmentSensor
public BackgroundWorker(ILogger<BackgroundWorker> logger, IConfiguration config) public BackgroundWorker(ILogger<BackgroundWorker> logger, IConfiguration config)
{ {
_logger = logger; _logger = logger;
var configSection = config.GetSection("EnvironmentSensor"); _serviceEnabled = config.GetValue<bool>("EnvironmentSensor:Enabled");
_serviceEnabled = configSection.GetValue<bool>("Enabled"); _telemetryEnabled = config.GetValue<bool>("EnvironmentSensor:Telemetry");
_telemetryEnabled = configSection.GetValue<bool>("Telemetry"); _measurementDelay = config.GetValue<int>("EnvironmentSensor:MeasurementInterval");
_measurementDelay = configSection.GetValue<int>("MeasurementInterval");
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@ -37,7 +36,7 @@ namespace NucuCar.Sensors.EnvironmentSensor
sensor.InitializeSensor(); sensor.InitializeSensor();
if (_telemetryEnabled) if (_telemetryEnabled)
{ {
TelemetryPublisher.Instance.RegisterTelemeter(sensor); SensorTelemetryPublisher.Instance.RegisterTelemeter(sensor);
} }
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
@ -56,7 +55,7 @@ namespace NucuCar.Sensors.EnvironmentSensor
await Task.Delay(_measurementDelay, stoppingToken); await Task.Delay(_measurementDelay, stoppingToken);
} }
TelemetryPublisher.Instance.UnRegisterTelemeter(sensor); SensorTelemetryPublisher.Instance.UnRegisterTelemeter(sensor);
} }
} }
} }

View file

@ -1,4 +1,3 @@
using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
@ -9,7 +8,6 @@ namespace NucuCar.Sensors.Telemetry
{ {
public class BackgroundWorker : BackgroundService public class BackgroundWorker : BackgroundService
{ {
private readonly string _azureIotHubConnectionString;
private readonly bool _serviceEnabled; private readonly bool _serviceEnabled;
private readonly int _interval; private readonly int _interval;
private readonly ILogger _logger; private readonly ILogger _logger;
@ -19,7 +17,8 @@ namespace NucuCar.Sensors.Telemetry
_logger = logger; _logger = logger;
_serviceEnabled = configuration.GetValue<bool>("Telemetry:Enabled"); _serviceEnabled = configuration.GetValue<bool>("Telemetry:Enabled");
_interval = configuration.GetValue<int>("Telemetry:Interval"); _interval = configuration.GetValue<int>("Telemetry:Interval");
_azureIotHubConnectionString = configuration.GetValue<string>("Telemetry:AzureIotHubConnectionString"); var azureIotHubConnectionString = configuration.GetValue<string>("Telemetry:AzureIotHubConnectionString");
SensorTelemetryPublisher.CreateSingleton(azureIotHubConnectionString, "NucuCar.Sensors", logger);
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@ -31,13 +30,8 @@ namespace NucuCar.Sensors.Telemetry
await Task.Delay(_interval, stoppingToken); await Task.Delay(_interval, stoppingToken);
using var telemetryService = TelemetryPublisher.Instance; var telemetryService = SensorTelemetryPublisher.Instance;
telemetryService.Logger = _logger;
telemetryService.ConnectionString = _azureIotHubConnectionString;
telemetryService.TelemetrySource = "NucuCar.Sensors";
telemetryService.Start();
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
{ {
_logger.LogInformation("Publishing telemetry data!"); _logger.LogInformation("Publishing telemetry data!");
@ -45,5 +39,11 @@ namespace NucuCar.Sensors.Telemetry
await Task.Delay(_interval, stoppingToken); await Task.Delay(_interval, stoppingToken);
} }
} }
public override void Dispose()
{
base.Dispose();
SensorTelemetryPublisher.Instance?.Dispose();
}
} }
} }

View file

@ -0,0 +1,44 @@
using System;
using Microsoft.Extensions.Logging;
using NucuCar.Domain.Telemetry;
namespace NucuCar.Sensors.Telemetry
{
public class SensorTelemetryPublisher : IDisposable
{
private static object _palock = new object();
public static TelemetryPublisher Instance { get; private set; }
public static TelemetryPublisher CreateSingleton(string connectionString, string telemetrySource,
ILogger logger)
{
if (Instance == null)
{
lock (_palock)
{
var telemetryPublisher =
TelemetryPublisherAzure.CreateFromConnectionString(connectionString, telemetrySource, logger);
Instance = telemetryPublisher;
}
}
return Instance;
}
private static void ReleaseUnmanagedResources()
{
Instance?.Dispose();
}
public void Dispose()
{
ReleaseUnmanagedResources();
GC.SuppressFinalize(this);
}
~SensorTelemetryPublisher()
{
ReleaseUnmanagedResources();
}
}
}

View file

@ -1,14 +0,0 @@
using NucuCar.Domain.Telemetry;
namespace NucuCar.Sensors.Telemetry
{
public class TelemetryPublisher : TelemetryPublisherAzure
{
/* Singleton Instance */
public static TelemetryPublisher Instance { get; } = new TelemetryPublisher();
static TelemetryPublisher()
{
}
}
}

View file

@ -17,7 +17,7 @@ namespace NucuCar.TestClient.Telemetry
{ {
[Option('c', "connectionString", Required = true, [Option('c', "connectionString", Required = true,
HelpText = "The publisher's connection string. Get it from the Device.")] HelpText = "The publisher's connection string. Get it from the Device.")]
public string PublisherConnectionString { get; set; } public string PublisherConnectionString { get; set; }
[Option('m', "message", Required = true, HelpText = "The message to publish")] [Option('m', "message", Required = true, HelpText = "The message to publish")]
public string PublisherJsonMessage { get; set; } public string PublisherJsonMessage { get; set; }
@ -45,18 +45,20 @@ namespace NucuCar.TestClient.Telemetry
public static async Task RunAzurePublisherTelemetryTest(AzureTelemetryPublishOptions opts) public static async Task RunAzurePublisherTelemetryTest(AzureTelemetryPublishOptions opts)
{ {
var telemetryPublisher = new TelemetryPublisherAzure {TelemetrySource = "TestClient"};
var anonymousTelemeter =
new DummyTelemeter(
JsonConvert.DeserializeObject<Dictionary<string, object>>(opts.PublisherJsonMessage));
var logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) var logger = LoggerFactory.Create(builder => { builder.AddConsole(); })
.CreateLogger<AzureTelemetryPublishCmd>(); .CreateLogger<AzureTelemetryPublishCmd>();
var telemetryPublisher =
TelemetryPublisherAzure.CreateFromConnectionString(opts.PublisherConnectionString,
"NucuCar.TestClient", logger);
var anonymousTelemeter =
new DummyTelemeter(
JsonConvert.DeserializeObject<Dictionary<string, object>>(opts.PublisherJsonMessage));
logger.LogInformation($"Publishing message: {opts.PublisherJsonMessage}"); logger.LogInformation($"Publishing message: {opts.PublisherJsonMessage}");
telemetryPublisher.ConnectionString = opts.PublisherConnectionString;
telemetryPublisher.RegisterTelemeter(anonymousTelemeter); telemetryPublisher.RegisterTelemeter(anonymousTelemeter);
telemetryPublisher.Logger = logger;
telemetryPublisher.Start();
await telemetryPublisher.PublishAsync(CancellationToken.None); await telemetryPublisher.PublishAsync(CancellationToken.None);
} }
} }

View file

@ -30,7 +30,7 @@ namespace NucuCar.TestClient.Telemetry
{ {
_logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) _logger = LoggerFactory.Create(builder => { builder.AddConsole(); })
.CreateLogger<AzureTelemetryReaderCmd>(); .CreateLogger<AzureTelemetryReaderCmd>();
_eventHubClient = EventHubClient.CreateFromConnectionString(opts.EventHubConnectionString); _eventHubClient = EventHubClient.CreateFromConnectionString(opts.EventHubConnectionString);
var runtimeInfo = await _eventHubClient.GetRuntimeInformationAsync(); var runtimeInfo = await _eventHubClient.GetRuntimeInformationAsync();