libstreaming局域网构建Android相机实时流媒体流程分析

这是一个涉及东西比较多的第三方库,里面的一些代码细节有点让人云里雾里。如果真的是这样,那说明了解的东西还太少,真正的去了解这些详细的东西,起码得知道相应的概念,如RTSP、RTP、H264、H264打包。我觉得首要的事是将大体的逻辑打通,然后再慢慢深入代码的细节,了解相关的技术与知识。奈何,这部分在

这是一个涉及东西比较多的第三方库,里面的一些代码细节有点让人云里雾里。如果真的是这样,那说明了解的东西还太少,真正的去了解这些详细的东西,起码得知道相应的概念,如RTSP、RTP、H264、H264打包。我觉得首要的事是将大体的逻辑打通,然后再慢慢深入代码的细节,了解相关的技术与知识。奈何,这部分在我遇到的项目DEMO并不是很重要,所以花的时间并不多,基本上解决了这里面出现的问题后,就不再关注一些细节,但是回过头来看,我觉得这部分的内容还是比较值得学习的。

原项目地址:github.com/fyhertz/libstreaming

了解清楚大致的流程后,可以选择相应的模块,继续深入了解。

粗略流程

图中的流程并没有包括一些其他的细枝末节,还有一些其他的如:如何获取一些所需参数、如何构建RTSP服务器等都不在此流程图中。因为一些显示效果的调整,只需要明白这样的一个流程就可以做出修改。因此此篇文章也是围绕此流程图来展开对大致流程的分析。

这里写图片描述

从何处开始

了解这个库之前,我先接触的是使用这个库实现了利用Android相机的实时数据作为一个直播源的应用:Endoscope

剖开这个应用的一些表面操作,背后其实就是libstreaming这个库的一个使用。而这个库的调用,出现的地方大概是只有一处,就是通过SeesionBuilder构造除了一个Session。后来看了libstreaming库的readme,发现也是同样的一段代码。但是仔细去看这个代码的话,前面都是一些设置参数的操作,后面的build()也是将这个前面的参数,塞到一个Session中。

public Session build() {
	Session session;
	session = new Session();
	session.setOrigin(mOrigin);
	session.setDestination(mDestination);
	session.setTimeToLive(mTimeToLive);
	session.setCallback(mCallback);
	switch (mAudioEncoder) {
		case AUDIO_AAC:
			AACStream stream = new AACStream();
			session.addAudioTrack(stream);
			if (mContext!=null) 
				stream.setPreferences(PreferenceManager.getDefaultSharedPreferences(mContext));
			break;
		case AUDIO_AMRNB:
			session.addAudioTrack(new AMRNBStream());
			break;
	}
	switch (mVideoEncoder) {
		case VIDEO_H263:
			session.addVideoTrack(new H263Stream(mCamera));
			break;
		case VIDEO_H264:
			H264Stream stream = new H264Stream(mCamera);
			if (mContext!=null) 
				stream.setPreferences(PreferenceManager.getDefaultSharedPreferences(mContext));
			session.addVideoTrack(stream);
			break;
	}
	if (session.getVideoTrack()!=null) {
		VideoStream video = session.getVideoTrack();
		video.setFlashState(mFlash);
		video.setVideoQuality(mVideoQuality);
		video.setSurfaceView(mSurfaceView);
		video.setPreviewOrientation(mOrientation);
		video.setDestinationPorts(5006);
	}
	if (session.getAudioTrack()!=null) {
		AudioStream audio = session.getAudioTrack();
		audio.setAudioQuality(mAudioQuality);
		audio.setDestinationPorts(5004);
	}
	return session;
}

仔细看,在前面选的编码器是H264,这里也看见了一个H264Stream。这个类与其他相关类的简单继承关系如下:

这里写图片描述

我们可以看到,在其之前有两个抽象类,已实现一些基本的共同的操作。在创建H264Stream的过程中,其构造函数中代码如下:

public H264Stream(int cameraId) {
	super(cameraId);
	mMimeType = "video/avc";
	// 指定Camera中原始数据的类型
	mCameraImageFormat = ImageFormat.NV21;
	// 指定编码类型
	mVideoEncoder = MediaRecorder.VideoEncoder.H264;
	// 创建H264的打包器,使用RTP协议
	mPacketizer = new H264Packetizer();
	((H264Packetizer)mPacketizer).setListener(new H264Packetizer.OnFrameListener() {
		@Override
		public void onFrame() {
			if (Build.VERSION.SDK_INT >= 23) {
				if (System.currentTimeMillis()-timestamp>=1000) {
					timestamp=System.currentTimeMillis();
					Bundle params = new Bundle();
					params.putInt(MediaCodec.PARAMETER_KEY_REQUEST_SYNC_FRAME, 1);
					mMediaCodec.setParameters(params);
				}
			}
		}
	});
}

其中,有一个打包的类,与它相关的类的简单继承关系图如下:

这里写图片描述

相应的,这就是将H264的NALU数据,使用RTP协议进行封包,然后将其发送出去。**这个类对应的角色,就是前面流程图中右侧的部分,即不断地取数据,然后打包发送。**打开其中的类,我们粗略地过一下代码:

  1. 它继承了Runnable,其中的run()方法中有一个死循环:
这里写图片描述
  1. 其中的send()方法,内容比较多,可以看到其调用取数据的方法:
这里写图片描述
  1. 这个 is 就是从哪里来的? 它有一个设置方法,而且确实存在着设置is代码,所以我们先记着有这么一个输入流,等待后续的代码来说明。
这里写图片描述

至此,我们的初始化算是完成了。但是我们对于最前面的流程图,还有左边一部分是不清楚的。

所以,想想这个库的效果:当进入预览摄像头的界面时,开始是没有预览的,需要等到有对此播放流的请求后,才会开始预览(发送数据)。

因此,实现这样的操作的地方在哪里?这便是我们接下来需要思考的问题。

看使用libstreaming的应用Endoscope源代码里面,构建完Session后,还进行了RtspServer的初始化,即:

// StartStreamPresenter # startRtspServer()
private void startRtspServer() {
    rtspServer = new RtspServer();
    rtspServer.addCallbackListener(this);
    rtspServer.start();
}

所以,我们应该再去看看这个RtspServer长啥样。

public class RtspServer extends Service

继承自Service,然后再初始化完成后,还调用了start()方法,这个方法中,初始化了一个RequestListener

// RtspServer.java
public void start() {
	if (!mEnabled || mRestart) stop();
	if (mEnabled && mListenerThread == null) {
		try {
			// 这个将用来监听socket请求
			mListenerThread = new RequestListener();
		} catch (Exception e) {
			mListenerThread = null;
		}
	}
	mRestart = false;
}

这个RequestListener是一个内部类,继承自Thread。并且在构造函数中初始化了一个ServerSocket后,开启自己的线程,跑自己run()里面的逻辑。

class RequestListener extends Thread implements Runnable {
	private final ServerSocket mServer;
	public RequestListener() throws IOException {
		try {
			// 创建服务端socket
			mServer = new ServerSocket(mPort);
			// 开启自身线程
			start();
		} catch (BindException e) {
			Log.e(TAG,"Port already in use !");
			postError(e, ERROR_BIND_FAILED);
			throw e;
		}
	}
	public void run() {
		Log.i(TAG,"RTSP server listening on port "+mServer.getLocalPort());
		while (!Thread.interrupted()) {
			try {
				// 阻塞线程,监听socket,并将socket传给WorkThread
				new WorkerThread(mServer.accept()).start();
			} catch (SocketException e) {
				break;
			} catch (IOException e) {
				Log.e(TAG,e.getMessage());
				continue;
			}
		}
		Log.i(TAG,"RTSP server stopped !");
	}
	public void kill() {
		try {
			mServer.close();
		} catch (IOException e) {}
		try {
			this.join();
		} catch (InterruptedException ignore) {}
	}
}

那么这个WorkThread将进行什么样的操作呢?同样它也是一个内部类,继承自Thread。对于这个内部类,我们要想理清楚整体的逻辑,就必须先舍弃掉一些细节,不然就会卡在此处。

这个内部类所在的类,即RtspServer,顾名思义就是一个RTSP服务器,这个类前的注释中也有说明,说这是RTSP协议子集的一个实现(RFC 2326),所以我们可以将那些不能理解的都归于RTSP协议的实现。继续看逻辑:

这里写图片描述

那么拿着从socket那里获取到的输入与输出是要干什么呢?当然是进行通信。接着看其中的主要run()中的逻辑。

这里写图片描述

因此,主要的逻辑现在已经到达了对消息的处理中,即processRequest()中。

这个方法,挑重点看,我们可以看到其中对Session.syncConfigure()Session.syncStart()方法的调用。再看Session中的这个方法。我们不难发现,前者调用涉及到Stream.configure(),后者调用涉及到Stream.start()

因此,我们应该直接看H264相关的这两个方法。于是,又回到了之前的地方——H264Stream.java


经过这两个方法的对比,发现后者会调用前者。所以,我们直接看后者的逻辑即可。

这里写图片描述

这里关于 mConfig 的获取方式,如果配置信息以及存在了,就不会去通过一定的方式去获取,如果不存在,就需要进行额外的步骤。代码就不贴了,有点长。

在这里曾经遇到过一个bug,在这个第三方库上面的issue里面也有讨论,是The decoder did not decode anything.。原因是while中所给的时间太小,在规定的时间内得不到相应的帧数。

所以把时间稍微改大一些,就能够获取到足够的帧数,就不会报错,正常运行。

但是性能堪忧!

这两个方法都调用了父类的相应的方法,其中实现如下:

这里写图片描述
这里写图片描述

这个encodeWithMediaCodec()的实现在VideoStream.java中,如下:这其中,使用的是方法encodeWithMediaCodecMethod1()

/**
 * Video encoding is done by a MediaCodec.
 */
protected void encodeWithMediaCodec() throws RuntimeException, IOException {
	if (mMode == MODE_MEDIACODEC_API_2) {
		// Uses the method MediaCodec.createInputSurface to feed the encoder
		encodeWithMediaCodecMethod2();
	} else {
		// Uses dequeueInputBuffer to feed the encoder
		encodeWithMediaCodecMethod1();
	}
}	

接下来的方法中,干货满满,基本上可以覆盖最前面流程图左边的全部内容。

/**
 * Video encoding is done by a MediaCodec.
 */
@SuppressLint("NewApi")
protected void encodeWithMediaCodecMethod1() throws RuntimeException, IOException {
	// Updates the parameters of the camera if needed
	createCamera();// 设置相机参数并开启相机
	updateCamera();// 更新参数

	// Estimates the frame rate of the camera
	measureFramerate();

	// Starts the preview if needed
	if (!mPreviewStarted) {
		try {
			mCamera.startPreview();
			mPreviewStarted = true;
		} catch (RuntimeException e) {
			destroyCamera();
			throw e;
		}
	}

	EncoderDebugger debugger = EncoderDebugger.debug(mSettings, mQuality.resX, mQuality.resY);
	// 将NV21(yuv420sp)转换成yuv420p(H264编码要求此颜色格式)
	final NV21Convertor convertor = debugger.getNV21Convertor();
	// H264编码器
	mMediaCodec = MediaCodec.createByCodecName(debugger.getEncoderName());
	MediaFormat mediaFormat;
	if (EncoderDebugger.ROTATE) {
		mediaFormat = MediaFormat.createVideoFormat("video/avc", mQuality.resY, mQuality.resX);
	} else {
		mediaFormat = MediaFormat.createVideoFormat("video/avc", mQuality.resX, mQuality.resY);
	}
	mediaFormat.setInteger(MediaFormat.KEY_BIT_RATE, mQuality.bitrate);
	mediaFormat.setInteger(MediaFormat.KEY_FRAME_RATE, mQuality.framerate);	
	mediaFormat.setInteger(MediaFormat.KEY_COLOR_FORMAT,debugger.getEncoderColorFormat());
	mediaFormat.setInteger(MediaFormat.KEY_I_FRAME_INTERVAL, 1);
	mMediaCodec.configure(mediaFormat, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE);
	mMediaCodec.start();
	// Camera每一帧的回调
	Camera.PreviewCallback callback = new Camera.PreviewCallback() {
		long now = System.nanoTime()/1000, oldnow = now, i=0;
		ByteBuffer[] inputBuffers = mMediaCodec.getInputBuffers();
		@Override
		public void onPreviewFrame(final byte[] data, final Camera camera) {
			   // 做一些处理
               oldnow = now;
               now = System.nanoTime() / 1000;
               if (i++ > 3) {
                   i = 0;
               }
               try {
                   int bufferIndex = mMediaCodec.dequeueInputBuffer(500000);
                   if (bufferIndex >= 0) {
                       inputBuffers[bufferIndex].clear();
                       if (data == null)
                           Log.e(TAG, "Symptom of the \"Callback buffer was to small\" problem...");
                       else {

                           Camera.CameraInfo camInfo = new Camera.CameraInfo();
                           Camera.getCameraInfo(mCameraId, camInfo);
                           Camera.Size previewSize = camera.getParameters().getPreviewSize();
                           int cameraRotationOffset = camInfo.orientation, mHeight = previewSize.height, mWidth = previewSize.width;

                           Log.e("DEBUG", "orientation = " + cameraRotationOffset + ", width = " + previewSize.width + ", height = " + previewSize.height);
                           // Cancel mirror effect for blink camera.
                           byte tempData;
                           for (int i = 0; i < mHeight * 3 / 2; i++) {
                               for (int j = 0; j < mWidth / 2; j++) {
                                   tempData = data[i * mWidth + j];
                                   data[i * mWidth + j] = data[(i + 1) * mWidth - 1 - j];
                                   data[(i + 1) * mWidth - 1 - j] = tempData;
                               }
                           }
							// TODO: 2018/6/4 modify pic's attributes
                           // mirror
                           Util.yuvRotate(data, 1, previewSize.width, previewSize.height, 90);
                           convertor.convert(data, inputBuffers[bufferIndex]);
                       }
                       // 塞进一个地方,让它去编码
                       mMediaCodec.queueInputBuffer(bufferIndex, 0, inputBuffers[bufferIndex].position(), now, 0);
                   } else {
                       Log.e(TAG, "No buffer available ! ");
                   }
               } finally {
                   mCamera.addCallbackBuffer(data);
               }
           }
	};
	
	for (int i=0;i<10;i++) mCamera.addCallbackBuffer(new byte[convertor.getBufferSize()]);
	mCamera.setPreviewCallbackWithBuffer(callback);

	// The packetizer encapsulates the bit stream in an RTP stream and send it over the network
	// 这里便回答了上面的一个问题,就是关于打包类的is来源问题
	// 这里使用其中自己写的类进行了相应的封装,本质上还是MediaCodec的
	mPacketizer.setInputStream(new MediaCodecInputStream(mMediaCodec));
	// 开启打包线程
	mPacketizer.start();
	mStreaming = true;
}

至此,粗略地分析大致结束。

但是如果仔细看一下源代码的话,你可能会对这个Session产生一些疑问。如果真的有,可以仔细去看一下源代码。我的疑问已经通过看一些详细的代码解决。

我的问题是:在RtspServer中,每有一个socket传过来,都会创建了一个Session,这个session与我们之前创建的那个session有什么区别呢?

实现的地方是在对请求的处理函数中,有几个操作会去给session重新赋值,那么来处当然应该也是之前创建的吧。

粗浅分析,欢迎批评指正!

Read more

Volcano 与 Kubernetes GPU 调度学习笔记

本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。 目录 * 第一部分:Volcano 入门 * 1. Volcano 是什么 * 2. 安装与快速使用 * 3. 核心特性一览 * 第二部分:Volcano 整体架构 * 4. Volcano 解决的核心问题 * 5. 整体架构与数据流 * 6. 三层抽象模型 * 第三部分:Volcano 核心实现原理 * 7. Session 机制 * 8. Gang Scheduling 实现 * 9. Queue 与 DRF 公平调度

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd