TableLogger.cs
1 //
2 // Copyright (c) Microsoft. All rights reserved.
3 // Licensed under the MIT license.
4 //
5 // Microsoft Bot Framework: http://botframework.com
6 //
7 // Bot Builder SDK Github:
8 // https://github.com/Microsoft/BotBuilder
9 //
10 // Copyright (c) Microsoft Corporation
11 // All rights reserved.
12 //
13 // MIT License:
14 // Permission is hereby granted, free of charge, to any person obtaining
15 // a copy of this software and associated documentation files (the
16 // "Software"), to deal in the Software without restriction, including
17 // without limitation the rights to use, copy, modify, merge, publish,
18 // distribute, sublicense, and/or sell copies of the Software, and to
19 // permit persons to whom the Software is furnished to do so, subject to
20 // the following conditions:
21 //
22 // The above copyright notice and this permission notice shall be
23 // included in all copies or substantial portions of the Software.
24 //
25 // THE SOFTWARE IS PROVIDED ""AS IS"", WITHOUT WARRANTY OF ANY KIND,
26 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
29 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
30 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
31 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 //
33 using Autofac;
37 using Microsoft.Bot.Connector;
38 using Microsoft.WindowsAzure.Storage;
39 using Microsoft.WindowsAzure.Storage.Table;
40 using Microsoft.WindowsAzure.Storage.Table.Protocol;
41 using Newtonsoft.Json;
42 using System;
43 using System.Collections.Generic;
44 using System.IO;
45 using System.IO.Compression;
46 using System.Linq;
47 using System.Text;
48 using System.Threading;
49 using System.Threading.Tasks;
50 
51 namespace Microsoft.Bot.Builder.Azure
52 {
61  {
65  public class ActivityEntity : TableEntity
66  {
70  public ActivityEntity()
71  { }
72 
77  public ActivityEntity(IActivity activity)
78  {
79  PartitionKey = GeneratePartitionKey(activity.ChannelId, activity.Conversation.Id);
80  RowKey = GenerateRowKey(activity.Timestamp.Value);
81  From = activity.From.Id;
82  Recipient = activity.Recipient.Id;
83  Activity = activity;
84  Version = 3.0;
85  }
86 
90  public double Version { get; set; }
91 
95  public string From { get; set; }
96 
100  public string Recipient { get; set; }
101 
105  [IgnoreProperty]
106  public IActivity Activity { get; set; }
107 
108  private const int FieldLimit = 1 << 16;
109 
115  public override IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext)
116  {
117  var props = base.WriteEntity(operationContext);
118  var buffer = JsonConvert.SerializeObject(Activity).Compress();
119  var start = 0;
120  var blockid = 0;
121  while (start < buffer.Length)
122  {
123  var blockSize = Math.Min(buffer.Length - start, FieldLimit);
124  var block = new byte[blockSize];
125  Array.Copy(buffer, start, block, 0, blockSize);
126  props[$"Activity{blockid++}"] = new EntityProperty(block);
127  start += blockSize;
128  }
129  return props;
130  }
131 
137  public override void ReadEntity(IDictionary<string, EntityProperty> properties, OperationContext operationContext)
138  {
139  base.ReadEntity(properties, operationContext);
140  var blocks = 0;
141  var size = 0;
142  EntityProperty entityBlock;
143  while (properties.TryGetValue($"Activity{blocks}", out entityBlock))
144  {
145  ++blocks;
146  size += entityBlock.BinaryValue.Length;
147  }
148  var buffer = new byte[size];
149  for (var blockid = 0; blockid < blocks; ++blockid)
150  {
151  var block = properties[$"Activity{blockid}"].BinaryValue;
152  Array.Copy(block, 0, buffer, blockid * FieldLimit, block.Length);
153  }
154  Activity = JsonConvert.DeserializeObject<Activity>(buffer.Decompress());
155  }
156 
163  public static string GeneratePartitionKey(string channelId, string conversationId)
164  {
165  return $"{channelId}|{conversationId}";
166  }
167 
173  public static string GenerateRowKey(DateTime timestamp)
174  {
175  return $"{timestamp.Ticks:D19}";
176  }
177  }
178 
179  private CloudTable _table = null;
180 
185  public TableLogger(CloudTable table)
186  {
187  SetField.NotNull(out _table, nameof(_table), table);
188  }
189 
194  Task IActivityLogger.LogAsync(IActivity activity)
195  {
196  if (!activity.Timestamp.HasValue)
197  {
198  activity.Timestamp = DateTime.UtcNow;
199  }
200  return Write(_table, activity);
201  }
202 
203  IEnumerable<IActivity> IActivitySource.Activities(string channelId, string conversationId, DateTime oldest)
204  {
205  var query = BuildQuery(channelId, conversationId, oldest);
206  return _table.ExecuteQuery(query, (pkey, rkey, ts, properties, etag) =>
207  {
208  var entity = new ActivityEntity();
209  entity.ReadEntity(properties, null);
210  return entity.Activity;
211  });
212  }
213 
214  async Task IActivitySource.WalkActivitiesAsync(Func<IActivity, Task> function, string channelId, string conversationId, DateTime oldest, CancellationToken cancel)
215  {
216  var query = BuildQuery(channelId, conversationId, oldest);
217  TableContinuationToken continuationToken = null;
218  do
219  {
220  var results = await _table.ExecuteQuerySegmentedAsync(query,
221  (pKey, rowKey, timestamp, properties, etag) =>
222  {
223  var entity = new ActivityEntity();
224  entity.ReadEntity(properties, null);
225  return entity.Activity;
226  },
227  continuationToken, cancel);
228  foreach (var result in results)
229  {
230  await function(result);
231  }
232  continuationToken = results.ContinuationToken;
233  } while (continuationToken != null);
234  }
235 
236 #pragma warning disable CS1998,CS4014
237  async Task IActivityManager.DeleteConversationAsync(string channelId, string conversationId, CancellationToken cancel)
245  {
246  var pk = ActivityEntity.GeneratePartitionKey(channelId, conversationId);
247  var query = new TableQuery<ActivityEntity>()
248  .Where(TableQuery.GenerateFilterCondition(TableConstants.PartitionKey, QueryComparisons.Equal, pk))
249  .Select(new string[] { TableConstants.PartitionKey, TableConstants.RowKey });
250  await DeleteAsync(query, cancel);
251  }
252 
258  async Task IActivityManager.DeleteBeforeAsync(DateTime oldest, CancellationToken cancel)
259  {
260  var rowKey = ActivityEntity.GenerateRowKey(oldest);
261  var query = new TableQuery<ActivityEntity>()
262  .Where(TableQuery.GenerateFilterCondition(TableConstants.RowKey, QueryComparisons.LessThan, rowKey))
263  .Select(new string[] { TableConstants.PartitionKey, TableConstants.RowKey });
264  await DeleteAsync(query, cancel);
265  }
266 
267  async Task IActivityManager.DeleteUserActivitiesAsync(string userId, CancellationToken cancel)
268  {
269  var query = new TableQuery<ActivityEntity>().Select(new string[] { TableConstants.PartitionKey, TableConstants.RowKey, "From", "Recipient" });
270  await DeleteAsync(query, cancel, qs => (from r in qs where r.From == userId || r.Recipient == userId select r));
271  }
272 
273  private static TableQuery BuildQuery(string channelId, string conversationId, DateTime oldest)
274  {
275  var query = new TableQuery();
276  string filter = null;
277  if (channelId != null && conversationId != null)
278  {
279  var pkey = ActivityEntity.GeneratePartitionKey(channelId, conversationId);
280  filter = TableQuery.GenerateFilterCondition(TableConstants.PartitionKey, QueryComparisons.Equal, pkey);
281  }
282  else if (channelId != null)
283  {
284  var pkey = ActivityEntity.GeneratePartitionKey(channelId, "");
285  filter = TableQuery.GenerateFilterCondition(TableConstants.PartitionKey, QueryComparisons.GreaterThanOrEqual, pkey);
286  }
287  if (oldest != default(DateTime))
288  {
289  var rowKey = ActivityEntity.GenerateRowKey(oldest);
290  var rowFilter = TableQuery.GenerateFilterCondition(TableConstants.RowKey, QueryComparisons.GreaterThanOrEqual, rowKey);
291  if (filter == null)
292  {
293  filter = rowFilter;
294  }
295  else
296  {
297  filter = TableQuery.CombineFilters(filter, TableOperators.And, rowFilter);
298  }
299  }
300  if (filter != null)
301  {
302  query.Where(filter);
303  }
304  return query;
305  }
306 
307 #pragma warning restore CS1998,CS4014
308 
309  private async Task DeleteAsync(TableQuery<ActivityEntity> query, CancellationToken cancel, Func<IEnumerable<ActivityEntity>, IEnumerable<ActivityEntity>> filter = null)
310  {
311  TableContinuationToken continuationToken = null;
312  do
313  {
314  var results = await _table.ExecuteQuerySegmentedAsync(query, continuationToken, cancel);
315  var partitionKey = string.Empty;
316  var batch = new TableBatchOperation();
317  foreach (var result in filter == null ? results : filter(results))
318  {
319  if (result.PartitionKey == partitionKey)
320  {
321  if (batch.Count == 100)
322  {
323  await _table.ExecuteBatchAsync(batch, cancel);
324  batch = new TableBatchOperation();
325  }
326  }
327  else
328  {
329  if (batch.Count > 0)
330  {
331  await _table.ExecuteBatchAsync(batch, cancel);
332  batch = new TableBatchOperation();
333  }
334  partitionKey = result.PartitionKey;
335  }
336  batch.Add(TableOperation.Delete(result));
337  }
338  if (batch.Count > 0)
339  {
340  await _table.ExecuteBatchAsync(batch, cancel);
341  }
342  continuationToken = results.ContinuationToken;
343  } while (continuationToken != null);
344  }
345 
346  // Write out activity with auto-incrementing of timestamp for conflicts up to 5 times
347  private static Task Write(CloudTable table, IActivity activity, int retriesLeft = 5)
348  {
349  var insert = TableOperation.Insert(new ActivityEntity(activity));
350  return table.ExecuteAsync(insert).ContinueWith(t =>
351  {
352  if (--retriesLeft > 0 && t.IsFaulted)
353  {
354  var response = ((t.Exception.InnerException as StorageException)?.InnerException as System.Net.WebException)?.Response as System.Net.HttpWebResponse;
355  if (response != null && response.StatusCode == System.Net.HttpStatusCode.Conflict)
356  {
357  activity.Timestamp = activity.Timestamp.Value.AddTicks(1);
358  return TableLogger.Write(table, activity, retriesLeft);
359  }
360  }
361  t.Wait();
362  return t;
363  }).Unwrap();
364  }
365  }
366 
371  {
372  private CloudStorageAccount _account;
373  private string _tableName;
374 
380  public TableLoggerModule(CloudStorageAccount account, string tableName)
381  {
382  _account = account;
383  _tableName = tableName;
384  }
385 
390  protected override void Load(ContainerBuilder builder)
391  {
392  base.Load(builder);
393  builder.RegisterInstance(_account)
394  .AsSelf();
395  builder.Register(c => c.Resolve<CloudStorageAccount>().CreateCloudTableClient())
396  .AsSelf()
397  .SingleInstance();
398  builder.Register(c =>
399  {
400  var table = c.Resolve<CloudTableClient>().GetTableReference(_tableName);
401  table.CreateIfNotExists();
402  return table;
403  })
404  .AsSelf()
405  .SingleInstance();
406  builder.RegisterType<TableLogger>()
407  .AsImplementedInterfaces()
408  .SingleInstance();
409  }
410  }
411 
412  static partial class Extensions
413  {
419  public static byte[] Compress(this string str)
420  {
421  var bytes = Encoding.UTF8.GetBytes(str);
422 
423  using (var msi = new MemoryStream(bytes))
424  using (var mso = new MemoryStream())
425  {
426  using (var gs = new GZipStream(mso, CompressionMode.Compress))
427  {
428  msi.CopyTo(gs);
429  }
430  return mso.ToArray();
431  }
432  }
433 
439  public static string Decompress(this byte[] bytes)
440  {
441  using (var msi = new MemoryStream(bytes))
442  using (var mso = new MemoryStream())
443  {
444  using (var gs = new GZipStream(msi, CompressionMode.Decompress))
445  {
446  gs.CopyTo(mso);
447  }
448  return Encoding.UTF8.GetString(mso.ToArray());
449  }
450  }
451  }
452 }
override void Load(ContainerBuilder builder)
Update builder with registration for TableLogger.
Definition: TableLogger.cs:390
Root namespace for the Microsoft Bot Connector SDK.
Definition: ActionTypes.cs:7
Namespace for interfaces and classes for working with conversational history.
ConversationAccount Conversation
Address for the conversation that this activity is associated with
Definition: IActivity.cs:49
static string GeneratePartitionKey(string channelId, string conversationId)
Generate a partition key given channelId and conversationId .
Definition: TableLogger.cs:163
Log message activities between bots and users.
string ChannelId
Channel this activity is associated with
Definition: IActivity.cs:39
DateTime Timestamp
UTC Time when message was sent
Definition: IActivity.cs:29
Task DeleteConversationAsync(string channelId, string conversationId, CancellationToken cancel=default(CancellationToken))
Delete a specific conversation.
override void ReadEntity(IDictionary< string, EntityProperty > properties, OperationContext operationContext)
Read entity with distributed activity.
Definition: TableLogger.cs:137
Task DeleteBeforeAsync(DateTime oldest, CancellationToken cancel=default(CancellationToken))
Delete any conversation records older than oldest .
string Id
Channel id for the user or bot on this channel (Example: joe@smith.com, or or 123456) ...
ChannelAccount Recipient
Address that received the message
Definition: IActivity.cs:54
ActivityEntity(IActivity activity)
Construct from an IActivity.
Definition: TableLogger.cs:77
Interface for getting activities from some source.
Namespace for internal machinery that is not useful for most developers.
Module for registering a LoggerTable.
Definition: TableLogger.cs:370
IEnumerable< IActivity > Activities(string channelId, string conversationId, DateTime oldest=default(DateTime))
Produce an enumeration over conversation.
override IDictionary< string, EntityProperty > WriteEntity(OperationContext operationContext)
Write out entity with distributed activity.
Definition: TableLogger.cs:115
An Activity is the basic communication type for the Bot Framework 3.0 protocol
Definition: ActivityEx.cs:17
basic shared properties for all activities
Definition: IActivity.cs:9
ChannelAccount From
Sender address data
Definition: IActivity.cs:44
static string Decompress(this byte[] bytes)
Decompress a string from a byte array.
Definition: TableLogger.cs:439
Task DeleteUserActivitiesAsync(string userId, CancellationToken cancel=default(CancellationToken))
Delete all activities involving userId .
TableLogger(CloudTable table)
Create a table storage logger.
Definition: TableLogger.cs:185
Log conversation activities to Azure Table Storage.
Definition: TableLogger.cs:60
static string GenerateRowKey(DateTime timestamp)
Generate row key for ascending timestamp .
Definition: TableLogger.cs:173
string Id
Channel id for the user or bot on this channel (Example: joe@smith.com, or or 123456) ...
Namespace for the internal fibers machinery that is not useful for most developers.
Definition: Awaitable.cs:36
TableLoggerModule(CloudStorageAccount account, string tableName)
Create a TableLogger for a particular storage account and table name.
Definition: TableLogger.cs:380
static byte[] Compress(this string str)
Compress a string into a byte array.
Definition: TableLogger.cs:419
Core namespace for Dialogs and associated infrastructure.
Definition: Address.cs:40
Interface for managing activity history.
Root namespace for the Microsoft Bot Builder SDK.
Task WalkActivitiesAsync(Func< IActivity, Task > function, string channelId=null, string conversationId=null, DateTime oldest=default(DateTime), CancellationToken cancel=default(CancellationToken))
Walk over recorded activities and call a function on them.