Implement TelemetryPublisherDisk

This commit is contained in:
Denis-Cosmin Nutiu 2019-12-18 22:57:33 +02:00
parent 8bcdd0ea1f
commit d8a0ddd9bc
4 changed files with 138 additions and 41 deletions

View file

@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using DotNetty.Common;
namespace NucuCar.Domain.Services
{
/// <summary>
/// ConnectionStringParser is an utility service to parse and validate connection strings.
/// </summary>
public static class ConnectionStringParser
{
/// <summary>
/// Parse parses and validates a provided connection string.
/// </summary>
/// <param name="connectionString">The connection string to parse</param>
/// <returns>A dictionary object containing the values of the connection string.</returns>
/// <exception cref="ArgumentException"></exception>
public static Dictionary<string, string> Parse(string connectionString)
{
// TODO: Write tests for this.
var items = connectionString.Split(";");
var parsedConnectionString = new Dictionary<string, string>();
foreach (var item in items)
{
var keyValue = item.Split("=");
if (keyValue.Length != 2)
{
throw new ArgumentException(
$"Invalid argument for connection string, expected KEY=VALUE got {item}");
}
parsedConnectionString[keyValue[0]] = parsedConnectionString[keyValue[1]];
}
return parsedConnectionString;
}
}
}

View file

@ -11,13 +11,6 @@ namespace NucuCar.Domain.Telemetry
/// </summary>
public abstract class TelemetryPublisher : IDisposable
{
/// <summary>
/// Parameter less constructor, mainly used for testing.
/// </summary>
public TelemetryPublisher()
{
}
/// <summary>
/// Raw connection string that is used to connect to the cloud service. Should be parsed if required.
/// </summary>
@ -40,15 +33,10 @@ namespace NucuCar.Domain.Telemetry
protected readonly ILogger Logger;
/// <summary>
/// Constructor for <see cref="TelemetryPublisher"/>.
/// Parameter less constructor, mainly used for testing.
/// </summary>
/// <param name="opts">TelemetryPublisher options, see: <see cref="TelemetryPublisherBuilderOptions"/></param>
protected TelemetryPublisher(TelemetryPublisherBuilderOptions opts)
public TelemetryPublisher()
{
ConnectionString = opts.ConnectionString;
TelemetrySource = opts.TelemetrySource;
Logger = opts.Logger;
RegisteredTelemeters = new List<ITelemeter>(5);
}
/// <summary>
@ -88,5 +76,47 @@ namespace NucuCar.Domain.Telemetry
RegisteredTelemeters.Remove(t);
return true;
}
/// <summary>
/// Constructor for <see cref="TelemetryPublisher"/>.
/// </summary>
/// <param name="opts">TelemetryPublisher options, see: <see cref="TelemetryPublisherBuilderOptions"/></param>
protected TelemetryPublisher(TelemetryPublisherBuilderOptions opts)
{
ConnectionString = opts.ConnectionString;
TelemetrySource = opts.TelemetrySource;
Logger = opts.Logger;
RegisteredTelemeters = new List<ITelemeter>(5);
}
/// <summary>
/// Iterates through the registered telemeters and returns the telemetry data as dictionary.
/// It also adds metadata information such as: source and timestamp.
/// </summary>
/// <returns>A dictionary containing all telemetry data.</returns>
protected Dictionary<string, object> GetTelemetry()
{
var data = new List<Dictionary<string, object>>();
foreach (var telemeter in RegisteredTelemeters)
{
var telemetryData = telemeter.GetTelemetryData();
if (telemetryData == null)
{
Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!");
continue;
}
telemetryData["_id"] = telemeter.GetIdentifier();
data.Add(telemetryData);
}
var metadata = new Dictionary<string, object>
{
["source"] = TelemetrySource ?? nameof(TelemetryPublisher),
["timestamp"] = DateTime.Now,
["data"] = data.ToArray()
};
return metadata;
}
}
}

View file

@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@ -25,7 +24,7 @@ namespace NucuCar.Domain.Telemetry
throw;
}
Logger?.LogDebug("Started the AzureTelemetryPublisher!");
Logger?.LogDebug("Initialized the AzureTelemetryPublisher!");
}
/// <summary>
@ -83,31 +82,6 @@ namespace NucuCar.Domain.Telemetry
await PublishToCloudAsync(message, cancellationToken);
}
private Dictionary<string, object> GetTelemetry()
{
var data = new List<Dictionary<string, object>>();
foreach (var telemeter in RegisteredTelemeters)
{
var telemetryData = telemeter.GetTelemetryData();
if (telemetryData == null)
{
Logger?.LogWarning($"Warning! Data for {telemeter.GetIdentifier()} is null!");
continue;
}
telemetryData["_id"] = telemeter.GetIdentifier();
data.Add(telemetryData);
}
var metadata = new Dictionary<string, object>
{
["source"] = TelemetrySource ?? nameof(TelemetryPublisherAzure),
["timestamp"] = DateTime.Now,
["data"] = data.ToArray()
};
return metadata;
}
private async Task PublishToCloudAsync(Message message, CancellationToken cancellationToken, int maxRetries = 3)
{
var retry = 0;

View file

@ -0,0 +1,55 @@
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using NucuCar.Domain.Services;
namespace NucuCar.Domain.Telemetry
{
/// <summary>
/// The TelemetryPublisherDisk is used to publish telemetry data to a file residing on the disk.
/// </summary>
public class TelemetryPublisherDisk : TelemetryPublisher
{
private readonly FileStream _fileStream;
/// <summary>
/// Constructs an instance of <see cref="TelemetryPublisherDisk"/>.
/// <remarks>
/// The connection string must contain the following options:
/// Filename (required) - The path of the filename in which to log telemetry data.
/// BufferSize (optional) - The buffer size for the async writer. Default: 4096
/// </remarks>
/// </summary>
/// <param name="opts"></param>
public TelemetryPublisherDisk(TelemetryPublisherBuilderOptions opts) : base(opts)
{
var connectionStringParams = ConnectionStringParser.Parse(opts.ConnectionString);
var fileName = connectionStringParams.GetValueOrDefault("Filename");
var bufferSize = connectionStringParams.GetValueOrDefault("BufferSize", "4096");
_fileStream = new FileStream(fileName, FileMode.Append, FileAccess.Write,
FileShare.None, int.Parse(bufferSize), true);
Logger?.LogDebug("Initialized the TelemetryPublisherDisk!");
}
public override async Task PublishAsync(CancellationToken cancellationToken)
{
var data = GetTelemetry();
var messageString = JsonConvert.SerializeObject(data);
Logger?.LogDebug($"Telemetry message: {messageString}");
var encodedText = Encoding.Unicode.GetBytes(messageString);
await _fileStream.WriteAsync(encodedText, (int) _fileStream.Length,
encodedText.Length, cancellationToken);
}
public override void Dispose()
{
_fileStream.Close();
}
}
}