方法:アップロード(高度)

はじめに

信頼性の高いアセットアップロードは、あらゆる C2C 統合の核となる機能です。このガイドでは、過酷な環境でも優れたパフォーマンスを発揮する、堅牢で耐障害性に優れた効率的なアップロードシステムを構築するための、高度なテクニックとベストプラクティスを紹介します。

前提条件

まだの場合は、C2C の実装:セットアップガイドを読んでから先へ進んでください。

認証および承認プロセス中に、access_token の取得が必要になります。

基本アップロードガイドに記載されている同じテスト用アセットを引き続き使用します。

高度なアセットパラメーター

Frame.io でアセットを作成する場合は、いくつかの高度なパラメーターを使用して、アップロードの動作をカスタマイズできます。offset パラメーターは、適切な統合のために特に重要です。

offset - 一時停止デバイスの処理

正確な offset 値の指定が重要です。このパラメーターは、メディアがいつ作成されたかを指定し、共有すべきでないコンテンツをデバイスがアップロードしないようにします。Frame.io でデバイスが一時停止されている場合、ユーザーは一時停止中に作成されたメディアをアップロードしないように指示しています。詳細については、一時停止機能に関するガイドを参照してください。

offset パラメーターのその他の利点

offset パラメーターは、Frame.io 内でメディアを整理するうえで別の重要な利点を提供します。以前の日付でキャプチャされたコンテンツをアップロードする場合(例えば、再生中にユーザーが前週に撮影された写真を選択した場合)、offset パラメーターにより、そのメディアは現在のアップロード日ではなく、元のキャプチャ日に対応するフォルダーに配置されます。

この時系列の整理により、Frame.io プロジェクト構造内で論理タイムラインが維持されます。この offset パラメーターがないと、過去のメディアが現在のコンテンツに誤ってグループ化され、編集担当者や他の共同作業者を混乱させかねません。

この問題については、インターフェイスを通じてユーザーに選択肢を提供することをお勧めします。メディアのキャプチャ日時によらず、すべてのアップロードを現在の日時で整理することをユーザーが希望する場合は、offset パラメーターを省略すると実現できます。これが指定されていないと、デフォルトで 0 に設定されるからです。

この API では、デバイスが一時停止ステータスを追跡する必要はありません。代わりに、ファイルをアップロードする際に、ファイルが何秒前に作成されたかを指定します。サーバーはこれを一時停止期間と比較し、一時停止中に作成されたアップロードを拒否します。

この機能のデモを行うには、「C2C 接続」タブの 3 点メニューからデバイスを一時停止します。

この状態で、アセットのアップロードを試行します。

${
>curl -X POST https://api.frame.io/v2/devices/assets \
> --header 'Authorization: Bearer [access_token]' \
> --header 'Content-Type: application/json' \
> --header 'x-client-version: 2.0.0' \
> --data-binary @- <<'__JSON__'
> {
> "name": "C2C_TEST_CLIP.mp4",
> "filetype": "video/mp4",
> "filesize": 21136250,
> "offset": 0
> }
>__JSON__
>} | python -m json.tool
API endpoint specification

Documentation for /v2/devices/assets can be found here.

次のエラーが返されます。

1{
2 "code": 409,
3 "errors": [
4 {
5 "code": 409,
6 "detail": "The channel you're uploading from is currently paused.",
7 "status": 409,
8 "title": "Channel Paused"
9 }
10 ],
11 "message": "Channel Paused"
12}

デバイスを一時停止解除し、同じリクエストを再試行すると、アセットが作成されます。

ただし、アセットが一時停止期間中に作成されたものである場合は、offset を設定して、実際の作成日時を反映する必要があります。

${
>curl -X POST https://api.frame.io/v2/devices/assets \
> --header 'Authorization: Bearer [access_token]' \
> --header 'Content-Type: application/json' \
> --header 'x-client-version: 2.0.0' \
> --data-binary @- <<'__JSON__'
> {
> "name": "C2C_TEST_CLIP.mp4",
> "filetype": "video/mp4",
> "filesize": 21136250,
> "offset": 60
> }
>__JSON__
>} | python -m json.tool

これにより、アセットが 60 秒前(一時停止中)に作成されたことが Frame.io に通知され、Channel Paused エラーが適切にトリガーされます。

保護された知的財産、機密性の高い映像、その他の制限された素材など、機密性の高いコンテンツがユーザーの意図に反してアップロードされないようにするためには、正確な **offset 値が不可欠です。

Offset and retries

When retrying a failed asset creation call, remember to update the offset value. During extended retry periods, a static offset might drift out of the relevant pause window, potentially allowing uploads that should be blocked.

特定チャネルへのアップロード

デバイスに複数チャネルがある場合は、使用するチャネルを指定できます。

${
>curl -X POST https://api.frame.io/v2/devices/assets \
> --header 'Authorization: Bearer [access_token]' \
> --header 'Content-Type: application/json' \
> --header 'x-client-version: 2.0.0' \
> --data-binary @- <<'__JSON__'
> {
> "name": "C2C_TEST_CLIP.mp4",
> "filetype": "video/mp4",
> "filesize": 21136250,
> "offset": -10,
> "channel": 2
> }
>__JSON__
>} | python -m json.tool

指定しない場合、デフォルトのチャネルは 0 です。ほとんどの統合では、この値を変更する必要はありません。

カスタムチャンク数の要求

デフォルトでは、Frame.io のバックエンドはファイルを約 25 MB のチャンクに分割します。輻輳が多いネットワークでは、チャンクを小さくすることをお勧めします。parts パラメーターを使用して、特定数のチャンクを要求できます。

${
>curl -X POST https://api.frame.io/v2/devices/assets \
> --header 'Authorization: Bearer [access_token]' \
> --header 'Content-Type: application/json' \
> --header 'x-client-version: 2.0.0' \
> --data-binary @- <<'__JSON__'
> {
> "name": "C2C_TEST_CLIP.mp4",
> "filetype": "video/mp4",
> "filesize": 21136250,
> "offset": 0,
> "parts": 4
> }
>__JSON__
>} | python -m json.tool

応答には、次の 4 つのアップロード URL が含まれます。

1{
2 ...
3 "upload_urls": [
4 "https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-01-path]",
5 "https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-02-path]",
6 "https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-03-path]",
7 "https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-04-path]"
8 ],
9 ...
10}

チャンクサイズは次のようになります。

Python
1math.ceiling(float(21136250) / float(4))
2# 5284063 bytes

最後のチャンクは 5,284,061 バイト(21136250 - 5284063 * 3 として計算)になります。

カスタムチャンク数を要求する場合は、AWS S3 マルチパートアップロードの制限事項に留意してください。

  • 最後のパートを除き、各パートは少なくとも 5 MiB(5,242,880 バイト)である必要があります
  • 最大パート数は 10,000 です

要求がこれらの制約に違反している場合は、500: INTERNAL SERVER ERROR が表示されます。

{
"code": 500,
"errors": [
{
"code": 500,
"detail": "There was a problem with your request",
"status": 500,
"title": "Something went wrong"
}
],
"message": "Something went wrong"
}

カスタムパート数が S3 の要件に準拠していることを必ず確認してください。

効率的なアップロード

C2C デバイスは多くの場合、困難なネットワーク環境で動作するため、効率性が非常に重要です。スループットを最大化するための戦略をご紹介します。

TCP 接続の再利用/プーリング

暗号化された接続を確立するには、多大なネゴシエーションのオーバーヘッドが必要となります。効率的な運用のために、複数の要求を行う場合は TCP 接続を再利用します。ほとんどの HTTP ライブラリでは、永続的な接続を維持する Client または Session 抽象化が提供されます。

新しい HTTPS 接続のネゴシエーションプロセスには、暗号化ハンドシェイクと証明書の検証が含まれます。接続を再利用することで、このオーバーヘッドは各要求に対してではなく、オーバーヘッドを 1 回のみ実行します。

TCP handshake reference

For technical details on TLS handshake processes, see Cloudflare’s explanation.

curl を使用した接続の再利用をデモンストレーションするには、まず基本アップロードガイドの説明に従って、Frame.io で新しいアセットを作成します。

次に、テスト用にファイルを個別のチャンクに分割します。

$head -c 10568125 ~/Downloads/C2C_TEST_CLIP.mp4 > "C2C_TEST_CLIP-Chunk01"
$tail -c 10568125 ~/Downloads/C2C_TEST_CLIP.mp4 > "C2C_TEST_CLIP-Chunk02"

次は、curl の --next パラメーターを使用して、単一の TCP 接続を介して両方のチャンクをアップロードします。

$curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-1-path] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk01 \
>--next -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-2-path] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk02

これを個別の接続と比較します。

$curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-1-path] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk01 \
>&& curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-2-path] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk02
Reusing chunk URLs

You can upload to the same chunk URL multiple times, so feel free to reuse URLs between examples.

テストでは通常、接続を再利用すると連続アップロードのパフォーマンスが 15~20% 向上します。

並列アップロード

スループットをさらに向上させるには、複数のチャンクを同時にアップロードします。

$curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-1-path] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk01 \
>& \
>curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[part-2-path]\
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk02 \
>&

十分な帯域幅があれば、並列アップロードは最も遅い個別アップロードとほぼ同じ時間で完了します。

CPU コアあたり 2 つの同時アップロードが、最適な並列処理を行うための大まかな目安です。この比率を超えると、リソースの競合やリターンの低下につながる可能性があります。

Parallel upload speeds

Network conditions significantly impact parallel upload performance. In some environments, sequential uploads may outperform parallel ones. Advanced implementations might monitor throughput and dynamically adjust concurrency. Always profile performance in your actual production environment rather than relying on example timing.

両方の手法を組み合わせる

効率を最大化するには、接続プーリングと並列アップロードを組み合わせます。複数のプロセスを作成し、それぞれで独自のアップロードシーケンスに接続プールを使用します。

$curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[asset01-chunk01] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk01 \
>--next -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[asset01-chunk02] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk02 \
>& \
>curl -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[asset02-chunk01] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk01 \
>--next -X PUT https://frameio-uploads-production.s3-accelerate.amazonaws.com/parts/[asset02-chunk02] \
> --include \
> --header 'content-type: video/mp4' \
> --header 'x-amz-acl: private' \
> --data-binary @C2C_TEST_CLIP-Chunk02 \
>&
HTTP library features

Most HTTP libraries provide abstractions for connection pooling and parallel requests. Experiment with your library’s options to determine the optimal configuration for your environment.

アップロードの進捗状況の追跡

統合では、ユーザーに基本的な進捗状況を示す必要があります。チャンクレベルの粒度は許容されます。3 つのチャンクのアップロードでは、各チャンクが完了するたびに進捗が 0% → 33% → 66% → 100% に増加する場合があります。

粒度が細かい進捗レポートは、HTTP ライブラリの機能によって異なります。より詳細な進捗追跡の実装に関するガイダンスが必要な場合は、当社チームまでお問い合わせください。

確実なアップロード

堅牢なエラー処理については、エラーガイドを参照してください。次のセクションでは、ここで説明するエラー処理戦略を実装したことを前提としています。

本番品質のアップローダーを作成するには、個々のリクエストエラーの処理以外にも考慮すべき点があります。

アップロードキューの作成

現実的なシナリオでは、デバイスによるメディア生成が速すぎてアップロードできなかったり、長時間の接続中断が発生したりする可能性があります。キューイングシステムを実装すると、メディアの作成とアップロードの管理が分離されます。

2 つのキューアーキテクチャを考えてみましょう。

  1. メディアキューで Frame.io にローカルファイルを登録
  2. チャンクキューで個々のファイルチャンクをアップロード

簡素化した実装を次に示します。

Python
1# Where we are going to queue new files.
2FILE_QUEUE = Queue()
3
4# Where we are going to queue new chunks.
5CHUNK_QUEUE = Queue()
6
7# The http session that will handle TCP
8# connection pooling for us.
9HTTP_SESSION = http.Session()
10
11def take_picture():
12 """Snaps a picture for the user."""
13
14 image = MY_DEVICE.capture()
15 file_path = MY_DEVICE.write_image(image)
16 FILE_QUEUE.add(file_path)
17
18def task_register_assets():
19 """
20 Pulls snapped pictures from the FILE_QUEUE, registers
21 with Frame.io, and adds the chunks to the CHUNK_QUEUE.
22 """
23 while True:
24 # Get the latest file added to the queue and register
25 # a C2C Asset for it.
26 new_file = FILE_QUEUE.get()
27 asset = c2c.crete_asset_for_file(HTTP_SESSION, new_file)
28
29 # Calculate the size for each chunk
30 chunk_size = c2c.calculate_chunk_size(asset, new_file)
31
32 # Create a message for each chunk with it's parameters
33 # and add it to the
34 # queue.
35 chunk_start = 0
36 for chunk_url in asset.upload_urls:
37 message = {
38 "file_path": new_file,
39 "chunk_url": chunk_url,
40 "chunk_start": chunk_start,
41 "chunk_size": chunk_size,
42 }
43
44 # Put the message in the queue and
45 CHUNK_QUEUE.put(message)
46 chunk_start += chunk_size
47
48def task_upload_chunk():
49 """Takes a chunks and uploads them."""
50
51 while True:
52 info = CHUNK_QUEUE.get()
53 c2c.upload_chunk(HTTP_SESSION, info)
54
55def launch_upload_tasks():
56 """Lauches our Frame.io upload tasks."""
57 # Create a list to hold all of our tasks.
58 tasks = list()
59
60 # Create one task for registering assets.
61 asset_task = run_task_in_thread(task_register_assets)
62 tasks.append(asset_task)
63
64 # Create 2 tasks per CPU core for uploading chunks.
65 for _ in range(0, GET_CPU_COUNT() * 2):
66 chunk_task = run_task_in_thread(task_upload_chunk)
67 tasks.append(chunk_task)
68
69 # Run these tasks until shutdown
70 run_forever(tasks)
Error handling

In the above example, we assume that the functions invoked for c2c calls are handling errors as discussed in the errors guide.

パワーサイクルをまたぐ永続的なキューイング

メモリ内キューのアプローチは、デバイスの電源がオンの間はうまく機能しますが、アップロードの完了前に電源が失われるとどうなるでしょうか?本当の意味で耐障害性を備えた統合を実現するには、再起動後のデバイスが、中断したところから再開できるようにする必要があります。

そのためには、パワーサイクル間でキューの状態を保持する必要があります。SQLite などの埋め込みデータベースは、この機能の優れた基盤を提供します。

永続キュー実装は、次の主要な操作をサポートする必要があります。

  • 新規作成されたファイルのアップロードキューへの追加
  • Frame.io でアセットが正常に作成されたときの追跡
  • エラーが原因でアセットの作成に失敗した場合の記録
  • アップロードタスクのファイルチャンク情報の保存
  • アップロードする次のチャンクの取得
  • チャンクが正常にアップロードされた際のマーキング
  • チャンクアップロードエラーのログ
  • ユーザー表示用のファイルステータス情報の提供

ここでは、前述の例を適応して、永続ストレージシステムを使用する方法を説明します。

Python
1# Our persistence layer for queuing uploads, potentially using SQLite
2# or another embedded database
3C2C_UPLOAD_STORE = NewC2CUploadStore()
4
5# HTTP session for connection pooling
6HTTP_SESSION = http.Session()
7
8def take_picture():
9 """Captures an image and adds it to the upload queue."""
10 image = MY_DEVICE.capture()
11 file_path = MY_DEVICE.write_image(image)
12
13 # Register the file with our persistent store
14 C2C_UPLOAD_STORE.add_file(file_path)
15
16def task_register_assets():
17 """
18 Processes files from persistent storage and registers
19 them with Frame.io for upload.
20 """
21 while True:
22 # Get the next available file from our store
23 file_record = C2C_UPLOAD_STORE.get_file()
24
25 try:
26 # Register the asset with Frame.io
27 asset = c2c.create_asset_for_file(HTTP_SESSION, file_record)
28 chunk_size = c2c.calculate_chunk_size(asset, file_record)
29
30 # Create entries for each chunk in our persistent store
31 chunk_start = 0
32 for chunk_url in asset.upload_urls:
33 message = {
34 "file_path": file_record,
35 "chunk_url": chunk_url,
36 "chunk_start": chunk_start,
37 "chunk_size": chunk_size,
38 }
39
40 C2C_UPLOAD_STORE.new_chunk(message)
41 chunk_start += chunk_size
42
43 except BaseException as error:
44 # Record the error in our persistent store
45 C2C_UPLOAD_STORE.file_asset_create_error(file_record, error)
46 else:
47 # Mark the asset as successfully created
48 C2C_UPLOAD_STORE.file_asset_created(file_record)
49
50def task_upload_chunk():
51 """Uploads individual file chunks from the persistent queue."""
52 while True:
53 # Get the next chunk, marking it as "in progress" to prevent
54 # other tasks from processing it simultaneously
55 chunk_record = C2C_UPLOAD_STORE.get_chunk()
56
57 try:
58 c2c.upload_chunk(HTTP_SESSION, chunk_record)
59 except BaseException as error:
60 # Record the error for potential retry
61 C2C_UPLOAD_STORE.chunk_error(chunk_record, error)
62 else:
63 # Mark successful completion
64 C2C_UPLOAD_STORE.chunk_success(chunk_record)
65
66def launch_upload_tasks():
67 """Launches Frame.io upload processing tasks."""
68 tasks = []
69
70 # Asset registration task
71 asset_task = run_task_in_thread(task_register_assets)
72 tasks.append(asset_task)
73
74 # Multiple parallel chunk upload tasks
75 worker_count = GET_CPU_COUNT() * 2
76 for _ in range(worker_count):
77 chunk_task = run_task_in_thread(task_upload_chunk)
78 tasks.append(chunk_task)
79
80 # Run indefinitely
81 run_forever(tasks)

この永続ストレージ手法では、電源の中断に対する統合の耐障害性が向上します。デバイスが再起動すると、最後に保存された状態から処理が続行されます。このアーキテクチャでは、エラー追跡や停止したアップロードの検出などの、より高度な機能を実装するための基盤も提供されます。

アップロードエラーの追跡

堅牢なアップロードシステムでは、エラーを慎重に追跡する必要があります。エラーガイドの戦略を使用して操作を再試行後、これらのエラーを永続ストアに記録します。これにより、システムで次の操作を実行できます。

  1. 問題のあるアップロードの優先順位を下げて、キュー全体をブロックしないようにする
  2. 正確なステータス情報をユーザーに提供する
  3. 永続的な問題に対する管理者の介入を可能にする

致命的なエラーが発生した場合は、不要な再試行を防ぐためにアイテムをマークします。

停止したアップロードの管理

アップロードが無期限に停止しないように保護します。チャンクアップロードタスクを終了して再開するまでの最大時間(例:30 分)を設定します。これにより、すべてのアップロードワーカーが応答しない操作によってブロックされるシナリオを防止できます。

サイレント障害からの復旧

システムのクラッシュ、電源損失、またはプロセスの終了により、通常のエラー報告が妨げられる場合があります。キューからアイテムを取得する際は、チェックアウト時間を記録します。アイテムが妥当なしきい値(例:30 分)を超えて「進行中」状態のままで、成功または失敗を報告しない場合は、別のワーカーが処理するために利用可能なプールに自動的に返されます。

汚染されたアップロードの軽減

「汚染された」キューアイテムは、データまたは環境に固有の問題により、常に失敗します。これらのアイテムが継続的に再キューされると、アップロードシステム全体が効果的にブロックされる可能性があります。このようなケースに対処するために、次の戦略の検討をお勧めします。

  • 複数の失敗が発生した後、新しいコンテンツが続行できるようにアイテムの優先順位を下げる
  • 明示的なエラーと処理試行回数の両方を追跡する
  • 接続と認可のベストプラクティスに従って、一時的な環境の問題と本質的なファイルの問題を区別する
  • 再試行制限数のエスカレーションを実装する(例:3 回のジョブ試行ごとに個々の操作を 10 回、合計 30 回の試行)
  • 環境問題の解決後、問題のあるアップロードを手動でリセットするためのユーザーインターフェイスを提供する

汚染されたアップロードの原因には、以下が含まれます。

  • 破損したファイルデータが原因で I/O エラーが発生している
  • エラーレポートを妨げる致命的なプロセス障害
  • 永続的な根本的状況によってトリガーされる通常再試行可能なエラー

システムの再起動後に再試行する

問題のあるアップロードを永続的に破棄する前に、次回のシステム再起動後に最後の再試行を 1 回行うようにフラグを設定します。これにより、メモリ、ドライバー、またはリソース割り当てに関する一時的なシステム状態の問題が原因でアップロードが失敗するケースに対処します。クリーン再起動後もアップロードが失敗し続ける場合は、永続的な問題としてマークできます。

キューのクリア

使用できないファイルは、キューから必ず削除してください。メディアが物理的に削除された場合、またはファイルが削除された場合は、不要なエラーを防ぐために、対応するエントリをアップロードキューからクリアします。

*重要なのは、新しいプロジェクトに接続する際に、アップロードキューをクリアすることです。*あるプロジェクトのキューに入れられたメディアが、別のプロジェクトに表示されることはありません。ユーザーがデバイスを別のプロジェクトとペアリングする場合は、プロジェクトが変更されているかどうかを確認し、変更されている場合は既存のキューを完全にクリアします。

次のステップ

不明点がありましたら、当社チームまでお問い合わせください。そのうえで、アップロード(高度)ガイドに進んでください。お客様の統合の進捗を支援できる機会を楽しみにしています。